refactor(core): Migrate DB setup to use DI (#15324)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-05-13 13:28:41 +02:00
committed by GitHub
parent c061acc01c
commit 8591c2e0d1
35 changed files with 782 additions and 378 deletions

View File

@@ -1,6 +1,7 @@
import { Config, Env } from '@n8n/config';
import path from 'node:path';
import { Config, Env } from '../decorators';
@Config
export class InstanceSettingsConfig {
/**

View File

@@ -30,6 +30,8 @@ import { WorkflowsConfig } from './configs/workflows.config';
import { Config, Env, Nested } from './decorators';
export { Config, Env, Nested } from './decorators';
export { DatabaseConfig } from './configs/database.config';
export { InstanceSettingsConfig } from './configs/instance-settings-config';
export { TaskRunnersConfig } from './configs/runners.config';
export { SecurityConfig } from './configs/security.config';
export { ExecutionsConfig } from './configs/executions.config';

View File

@@ -11,7 +11,8 @@ import { Logger } from 'n8n-core';
import config from '@/config';
import { N8N_VERSION, TEMPLATES_DIR, inDevelopment, inTest } from '@/constants';
import * as Db from '@/db';
import { DbConnection } from '@/databases/db-connection';
import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error';
import { ExternalHooks } from '@/external-hooks';
import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
import { send, sendErrorResponse } from '@/response-helper';
@@ -21,8 +22,6 @@ import { WaitingForms } from '@/webhooks/waiting-forms';
import { WaitingWebhooks } from '@/webhooks/waiting-webhooks';
import { createWebhookHandlerFor } from '@/webhooks/webhook-request-handler';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';
@Service()
export abstract class AbstractServer {
protected logger: Logger;
@@ -35,6 +34,8 @@ export abstract class AbstractServer {
protected globalConfig = Container.get(GlobalConfig);
protected dbConnection = Container.get(DbConnection);
protected sslKey: string;
protected sslCert: string;
@@ -126,8 +127,10 @@ export abstract class AbstractServer {
res.send({ status: 'ok' });
});
const { connectionState } = this.dbConnection;
this.app.get('/healthz/readiness', (_req, res) => {
const { connected, migrated } = Db.connectionState;
const { connected, migrated } = connectionState;
if (connected && migrated) {
res.status(200).send({ status: 'ok' });
} else {
@@ -135,7 +138,6 @@ export abstract class AbstractServer {
}
});
const { connectionState } = Db;
this.app.use((_req, res, next) => {
if (connectionState.connected) {
if (connectionState.migrated) next();

View File

@@ -7,6 +7,7 @@ import { mock } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import { ActiveExecutions } from '@/active-executions';
import { DbConnection } from '@/databases/db-connection';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { DeprecationService } from '@/deprecation/deprecation.service';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
@@ -35,12 +36,9 @@ const posthogClient = mockInstance(PostHogClient);
const telemetryEventRelay = mockInstance(TelemetryEventRelay);
const externalHooks = mockInstance(ExternalHooks);
jest.mock('@/db', () => ({
init: jest.fn().mockResolvedValue(undefined),
migrate: jest.fn().mockResolvedValue(undefined),
connectionState: { connected: false },
close: jest.fn().mockResolvedValue(undefined),
}));
const dbConnection = mockInstance(DbConnection);
dbConnection.init.mockResolvedValue(undefined);
dbConnection.migrate.mockResolvedValue(undefined);
test('should start a task runner when task runners are enabled', async () => {
// arrange

View File

@@ -6,6 +6,7 @@ import { mock } from 'jest-mock-extended';
import type { IRun } from 'n8n-workflow';
import { ActiveExecutions } from '@/active-executions';
import { DbConnection } from '@/databases/db-connection';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { DeprecationService } from '@/deprecation/deprecation.service';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
@@ -34,12 +35,9 @@ const posthogClient = mockInstance(PostHogClient);
const telemetryEventRelay = mockInstance(TelemetryEventRelay);
const externalHooks = mockInstance(ExternalHooks);
jest.mock('@/db', () => ({
init: jest.fn().mockResolvedValue(undefined),
migrate: jest.fn().mockResolvedValue(undefined),
connectionState: { connected: false },
close: jest.fn().mockResolvedValue(undefined),
}));
const dbConnection = mockInstance(DbConnection);
dbConnection.init.mockResolvedValue(undefined);
dbConnection.migrate.mockResolvedValue(undefined);
test('should start a task runner when task runners are enabled', async () => {
// arrange

View File

@@ -19,7 +19,7 @@ import type { AbstractServer } from '@/abstract-server';
import config from '@/config';
import { N8N_VERSION, N8N_RELEASE_DATE, inDevelopment, inTest } from '@/constants';
import * as CrashJournal from '@/crash-journal';
import * as Db from '@/db';
import { DbConnection } from '@/databases/db-connection';
import { getDataDeduplicationService } from '@/deduplication';
import { DeprecationService } from '@/deprecation/deprecation.service';
import { TestRunnerService } from '@/evaluation.ee/test-runner/test-runner.service.ee';
@@ -42,6 +42,8 @@ import { WorkflowHistoryManager } from '@/workflows/workflow-history.ee/workflow
export abstract class BaseCommand extends Command {
protected logger = Container.get(Logger);
protected dbConnection = Container.get(DbConnection);
protected errorReporter: ErrorReporter;
protected externalHooks?: ExternalHooks;
@@ -122,9 +124,12 @@ export abstract class BaseCommand extends Command {
this.nodeTypes = Container.get(NodeTypes);
await Container.get(LoadNodesAndCredentials).init();
await Db.init().catch(
async (error: Error) => await this.exitWithCrash('There was an error initializing DB', error),
);
await this.dbConnection
.init()
.catch(
async (error: Error) =>
await this.exitWithCrash('There was an error initializing DB', error),
);
// This needs to happen after DB.init() or otherwise DB Connection is not
// available via the dependency Container that services depend on.
@@ -134,10 +139,12 @@ export abstract class BaseCommand extends Command {
await this.server?.init();
await Db.migrate().catch(
async (error: Error) =>
await this.exitWithCrash('There was an error running database migrations', error),
);
await this.dbConnection
.migrate()
.catch(
async (error: Error) =>
await this.exitWithCrash('There was an error running database migrations', error),
);
Container.get(DeprecationService).warn();
@@ -180,7 +187,7 @@ export abstract class BaseCommand extends Command {
protected async exitSuccessFully() {
try {
await Promise.all([CrashJournal.cleanup(), Db.close()]);
await Promise.all([CrashJournal.cleanup(), this.dbConnection.close()]);
} finally {
process.exit();
}
@@ -287,9 +294,9 @@ export abstract class BaseCommand extends Command {
async finally(error: Error | undefined) {
if (error?.message) this.logger.error(error.message);
if (inTest || this.id === 'start') return;
if (Db.connectionState.connected) {
if (this.dbConnection.connectionState.connected) {
await sleep(100); // give any in-flight query some time to finish
await Db.close();
await this.dbConnection.close();
}
const exitCode = error instanceof Errors.ExitError ? error.oclif.exit : error ? 1 : 0;
this.exit(exitCode);

View File

@@ -6,7 +6,7 @@ import { MigrationExecutor, DataSource as Connection } from '@n8n/typeorm';
import { Command, Flags } from '@oclif/core';
import { Logger } from 'n8n-core';
import { getConnectionOptions } from '@/databases/config';
import { DbConnectionOptions } from '@/databases/db-connection-options';
import type { Migration } from '@/databases/types';
import { wrapMigration } from '@/databases/utils/migration-helpers';
@@ -80,7 +80,7 @@ export class DbRevertMigrationCommand extends Command {
async run() {
const connectionOptions: ConnectionOptions = {
...getConnectionOptions(),
...Container.get(DbConnectionOptions).getOptions(),
subscribers: [],
synchronize: false,
migrationsRun: false,

View File

@@ -10,7 +10,6 @@ import type { ICredentialsEncrypted } from 'n8n-workflow';
import { jsonParse, UserError } from 'n8n-workflow';
import { UM_FIX_INSTRUCTION } from '@/constants';
import * as Db from '@/db';
import { BaseCommand } from '../base-command';
@@ -69,7 +68,8 @@ export class ImportCredentialsCommand extends BaseCommand {
const credentials = await this.readCredentials(flags.input, flags.separate);
await Db.getConnection().transaction(async (transactionManager) => {
const { manager: dbManager } = Container.get(ProjectRepository);
await dbManager.transaction(async (transactionManager) => {
this.transactionManager = transactionManager;
const project = await this.getProject(flags.userId, flags.projectId);

View File

@@ -26,7 +26,6 @@ import { deepCopy } from 'n8n-workflow';
import type { ICredentialDataDecryptedObject } from 'n8n-workflow';
import { z } from 'zod';
import * as Db from '@/db';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
@@ -317,7 +316,8 @@ export class CredentialsController {
let amountRemoved: number | null = null;
let newShareeIds: string[] = [];
await Db.transaction(async (trx) => {
const { manager: dbManager } = this.sharedCredentialsRepository;
await dbManager.transaction(async (trx) => {
const currentProjectIds = credential.shared
.filter((sc) => sc.role === 'credential:user')
.map((sc) => sc.projectId);

View File

@@ -29,7 +29,6 @@ import { CREDENTIAL_BLANKING_VALUE } from '@/constants';
import { CredentialTypes } from '@/credential-types';
import { createCredentialsFromCredentialsEntity } from '@/credentials-helper';
import { UserRepository } from '@/databases/repositories/user.repository';
import * as Db from '@/db';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { ExternalHooks } from '@/external-hooks';
@@ -406,7 +405,8 @@ export class CredentialsService {
await this.externalHooks.run('credentials.create', [encryptedData]);
const result = await Db.transaction(async (transactionManager) => {
const { manager: dbManager } = this.credentialsRepository;
const result = await dbManager.transaction(async (transactionManager) => {
const savedCredential = await transactionManager.save<CredentialsEntity>(newCredential);
savedCredential.data = newCredential.data;

View File

@@ -0,0 +1,216 @@
import type { GlobalConfig, InstanceSettingsConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import path from 'path';
import { DbConnectionOptions } from '../db-connection-options';
import { mysqlMigrations } from '../migrations/mysqldb';
import { postgresMigrations } from '../migrations/postgresdb';
import { sqliteMigrations } from '../migrations/sqlite';
describe('DbConnectionOptions', () => {
const dbConfig = mock<GlobalConfig['database']>({
tablePrefix: 'test_prefix_',
logging: {
enabled: false,
maxQueryExecutionTime: 0,
},
});
const n8nFolder = '/test/n8n';
const instanceSettingsConfig = mock<InstanceSettingsConfig>({ n8nFolder });
const dbConnectionOptions = new DbConnectionOptions(dbConfig, instanceSettingsConfig);
beforeEach(() => jest.resetAllMocks());
const commonOptions = {
entityPrefix: 'test_prefix_',
entities: expect.any(Array),
subscribers: expect.any(Array),
migrationsTableName: 'test_prefix_migrations',
migrationsRun: false,
synchronize: false,
maxQueryExecutionTime: 0,
logging: false,
};
describe('getOptions', () => {
it('should throw an error for unsupported database types', () => {
// @ts-expect-error invalid type
dbConfig.type = 'unsupported';
expect(() => dbConnectionOptions.getOptions()).toThrow(
'Database type currently not supported',
);
});
describe('for SQLite', () => {
beforeEach(() => {
dbConfig.type = 'sqlite';
dbConfig.sqlite = {
database: 'test.sqlite',
poolSize: 0,
enableWAL: false,
executeVacuumOnStartup: false,
};
});
it('should return SQLite connection options when type is sqlite', () => {
const result = dbConnectionOptions.getOptions();
expect(result).toEqual({
type: 'sqlite',
enableWAL: false,
...commonOptions,
database: path.resolve(n8nFolder, 'test.sqlite'),
migrations: sqliteMigrations,
});
});
it('should return SQLite connection options with pooling when poolSize > 0', () => {
dbConfig.sqlite.poolSize = 5;
const result = dbConnectionOptions.getOptions();
expect(result).toEqual({
type: 'sqlite-pooled',
poolSize: 5,
enableWAL: true,
acquireTimeout: 60_000,
destroyTimeout: 5_000,
...commonOptions,
database: path.resolve(n8nFolder, 'test.sqlite'),
migrations: sqliteMigrations,
});
});
});
describe('PostgreSQL', () => {
beforeEach(() => {
dbConfig.type = 'postgresdb';
dbConfig.postgresdb = {
database: 'test_db',
host: 'localhost',
port: 5432,
user: 'postgres',
password: 'password',
schema: 'public',
poolSize: 2,
connectionTimeoutMs: 20000,
ssl: {
enabled: false,
ca: '',
cert: '',
key: '',
rejectUnauthorized: true,
},
};
});
it('should return PostgreSQL connection options when type is postgresdb', () => {
const result = dbConnectionOptions.getOptions();
expect(result).toEqual({
type: 'postgres',
...commonOptions,
database: 'test_db',
host: 'localhost',
port: 5432,
username: 'postgres',
password: 'password',
schema: 'public',
poolSize: 2,
migrations: postgresMigrations,
connectTimeoutMS: 20000,
ssl: false,
});
});
it('should configure SSL options for PostgreSQL when SSL settings are provided', () => {
const ssl = {
ca: 'ca-content',
cert: 'cert-content',
key: 'key-content',
rejectUnauthorized: false,
};
dbConfig.postgresdb.ssl = { enabled: true, ...ssl };
const result = dbConnectionOptions.getOptions();
expect(result).toMatchObject({ ssl });
});
});
describe('for MySQL / MariaDB', () => {
beforeEach(() => {
dbConfig.mysqldb = {
database: 'test_db',
host: 'localhost',
port: 3306,
user: 'root',
password: 'password',
};
});
it('should return MySQL connection options when type is mysqldb', () => {
dbConfig.type = 'mysqldb';
const result = dbConnectionOptions.getOptions();
expect(result).toEqual({
type: 'mysql',
...commonOptions,
database: 'test_db',
host: 'localhost',
port: 3306,
username: 'root',
password: 'password',
migrations: mysqlMigrations,
timezone: 'Z',
});
});
it('should return MariaDB connection options when type is mariadb', () => {
dbConfig.type = 'mariadb';
const result = dbConnectionOptions.getOptions();
expect(result).toEqual({
type: 'mariadb',
...commonOptions,
database: 'test_db',
host: 'localhost',
port: 3306,
username: 'root',
password: 'password',
migrations: mysqlMigrations,
timezone: 'Z',
});
});
});
describe('logging', () => {
beforeEach(() => {
dbConfig.type = 'sqlite';
dbConfig.sqlite = mock<GlobalConfig['database']['sqlite']>({ database: 'test.sqlite' });
});
it('should not configure logging by default', () => {
const result = dbConnectionOptions.getOptions();
expect(result.logging).toBe(false);
});
it('should configure logging when it is enabled', () => {
dbConfig.logging = {
enabled: true,
options: 'all',
maxQueryExecutionTime: 1000,
};
const result = dbConnectionOptions.getOptions();
expect(result.logging).toBe('all');
expect(result.maxQueryExecutionTime).toBe(1000);
});
});
});
});

View File

@@ -0,0 +1,178 @@
import { DataSource, type DataSourceOptions } from '@n8n/typeorm';
import { mock, mockDeep } from 'jest-mock-extended';
import type { ErrorReporter } from 'n8n-core';
import { DbConnectionTimeoutError } from 'n8n-workflow';
import { DbConnection } from '@/databases/db-connection';
import type { DbConnectionOptions } from '@/databases/db-connection-options';
import type { Migration } from '@/databases/types';
import * as migrationHelper from '@/databases/utils/migration-helpers';
jest.mock('@n8n/typeorm', () => ({
DataSource: jest.fn(),
...jest.requireActual('@n8n/typeorm'),
}));
describe('DbConnection', () => {
let dbConnection: DbConnection;
const migrations = [{ name: 'TestMigration1' }, { name: 'TestMigration2' }] as Migration[];
const errorReporter = mock<ErrorReporter>();
const dataSource = mockDeep<DataSource>({ options: { migrations } });
const connectionOptions = mockDeep<DbConnectionOptions>();
const postgresOptions: DataSourceOptions = {
type: 'postgres',
host: 'localhost',
port: 5432,
username: 'user',
password: 'password',
database: 'n8n',
migrations,
};
beforeEach(() => {
jest.resetAllMocks();
connectionOptions.getOptions.mockReturnValue(postgresOptions);
(DataSource as jest.Mock) = jest.fn().mockImplementation(() => dataSource);
dbConnection = new DbConnection(errorReporter, connectionOptions);
});
describe('init', () => {
it('should initialize the data source', async () => {
dataSource.initialize.mockResolvedValue(dataSource);
await dbConnection.init();
expect(dataSource.initialize).toHaveBeenCalled();
expect(dbConnection.connectionState.connected).toBe(true);
});
it('should not reinitialize if already connected', async () => {
dataSource.initialize.mockResolvedValue(dataSource);
dbConnection.connectionState.connected = true;
await dbConnection.init();
expect(dataSource.initialize).not.toHaveBeenCalled();
});
it('should wrap postgres connection timeout errors', async () => {
const originalError = new Error('Connection terminated due to connection timeout');
dataSource.initialize.mockRejectedValue(originalError);
connectionOptions.getOptions.mockReturnValue({
type: 'postgres',
connectTimeoutMS: 10000,
});
await expect(dbConnection.init()).rejects.toThrow(DbConnectionTimeoutError);
});
it('should rethrow other errors', async () => {
// Arrange
const error = new Error('Some other error');
dataSource.initialize.mockRejectedValue(error);
// Act & Assert
await expect(dbConnection.init()).rejects.toThrow('Some other error');
});
});
describe('migrate', () => {
it('should wrap migrations and run them', async () => {
dataSource.runMigrations.mockResolvedValue([]);
const wrapMigrationSpy = jest.spyOn(migrationHelper, 'wrapMigration').mockImplementation();
expect(dataSource.runMigrations).not.toHaveBeenCalled();
expect(dbConnection.connectionState.migrated).toBe(false);
await dbConnection.migrate();
expect(wrapMigrationSpy).toHaveBeenCalledTimes(2);
expect(dataSource.runMigrations).toHaveBeenCalledWith({ transaction: 'each' });
expect(dbConnection.connectionState.migrated).toBe(true);
});
});
describe('close', () => {
it('should clear the ping timer', async () => {
const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout');
// @ts-expect-error private property
dbConnection.pingTimer = setTimeout(() => {}, 1000);
await dbConnection.close();
expect(clearTimeoutSpy).toHaveBeenCalled();
// @ts-expect-error private property
expect(dbConnection.pingTimer).toBeUndefined();
});
it('should destroy the data source if initialized', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
await dbConnection.close();
expect(dataSource.destroy).toHaveBeenCalled();
});
it('should not try to destroy the data source if not initialized', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = false;
await dbConnection.close();
expect(dataSource.destroy).not.toHaveBeenCalled();
});
});
describe('ping', () => {
it('should update connection state on successful ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.query.mockResolvedValue([{ '1': 1 }]);
dbConnection.connectionState.connected = false;
// @ts-expect-error private property
await dbConnection.ping();
expect(dataSource.query).toHaveBeenCalledWith('SELECT 1');
expect(dbConnection.connectionState.connected).toBe(true);
});
it('should report errors on failed ping', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
const error = new Error('Connection error');
dataSource.query.mockRejectedValue(error);
// @ts-expect-error private property
await dbConnection.ping();
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should schedule next ping after execution', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = true;
dataSource.query.mockResolvedValue([{ '1': 1 }]);
const scheduleNextPingSpy = jest.spyOn(dbConnection as any, 'scheduleNextPing');
// @ts-expect-error private property
await dbConnection.ping();
expect(scheduleNextPingSpy).toHaveBeenCalled();
});
it('should not query if data source is not initialized', async () => {
// @ts-expect-error readonly property
dataSource.isInitialized = false;
// @ts-expect-error private property
await dbConnection.ping();
expect(dataSource.query).not.toHaveBeenCalled();
});
});
});

View File

@@ -1,144 +0,0 @@
import { GlobalConfig } from '@n8n/config';
import { entities } from '@n8n/db';
import { Container } from '@n8n/di';
import type { DataSourceOptions, LoggerOptions } from '@n8n/typeorm';
import type { MysqlConnectionOptions } from '@n8n/typeorm/driver/mysql/MysqlConnectionOptions';
import type { PostgresConnectionOptions } from '@n8n/typeorm/driver/postgres/PostgresConnectionOptions';
import type { SqliteConnectionOptions } from '@n8n/typeorm/driver/sqlite/SqliteConnectionOptions';
import type { SqlitePooledConnectionOptions } from '@n8n/typeorm/driver/sqlite-pooled/SqlitePooledConnectionOptions';
import { InstanceSettings } from 'n8n-core';
import { UserError } from 'n8n-workflow';
import path from 'path';
import type { TlsOptions } from 'tls';
import { InsightsByPeriod } from '@/modules/insights/database/entities/insights-by-period';
import { InsightsMetadata } from '@/modules/insights/database/entities/insights-metadata';
import { InsightsRaw } from '@/modules/insights/database/entities/insights-raw';
import { mysqlMigrations } from './migrations/mysqldb';
import { postgresMigrations } from './migrations/postgresdb';
import { sqliteMigrations } from './migrations/sqlite';
import { subscribers } from './subscribers';
const getCommonOptions = () => {
const { tablePrefix: entityPrefix, logging: loggingConfig } =
Container.get(GlobalConfig).database;
let loggingOption: LoggerOptions = loggingConfig.enabled;
if (loggingOption) {
const optionsString = loggingConfig.options.replace(/\s+/g, '');
if (optionsString === 'all') {
loggingOption = optionsString;
} else {
loggingOption = optionsString.split(',') as LoggerOptions;
}
}
return {
entityPrefix,
entities: [...Object.values(entities), InsightsRaw, InsightsByPeriod, InsightsMetadata],
subscribers: Object.values(subscribers),
migrationsTableName: `${entityPrefix}migrations`,
migrationsRun: false,
synchronize: false,
maxQueryExecutionTime: loggingConfig.maxQueryExecutionTime,
logging: loggingOption,
};
};
export const getOptionOverrides = (dbType: 'postgresdb' | 'mysqldb') => {
const globalConfig = Container.get(GlobalConfig);
const dbConfig = globalConfig.database[dbType];
return {
database: dbConfig.database,
host: dbConfig.host,
port: dbConfig.port,
username: dbConfig.user,
password: dbConfig.password,
};
};
const getSqliteConnectionOptions = (): SqliteConnectionOptions | SqlitePooledConnectionOptions => {
const globalConfig = Container.get(GlobalConfig);
const sqliteConfig = globalConfig.database.sqlite;
const commonOptions = {
...getCommonOptions(),
database: path.resolve(Container.get(InstanceSettings).n8nFolder, sqliteConfig.database),
migrations: sqliteMigrations,
};
if (sqliteConfig.poolSize > 0) {
return {
type: 'sqlite-pooled',
poolSize: sqliteConfig.poolSize,
enableWAL: true,
acquireTimeout: 60_000,
destroyTimeout: 5_000,
...commonOptions,
};
} else {
return {
type: 'sqlite',
enableWAL: sqliteConfig.enableWAL,
...commonOptions,
};
}
};
const getPostgresConnectionOptions = (): PostgresConnectionOptions => {
const postgresConfig = Container.get(GlobalConfig).database.postgresdb;
const {
ssl: { ca: sslCa, cert: sslCert, key: sslKey, rejectUnauthorized: sslRejectUnauthorized },
} = postgresConfig;
let ssl: TlsOptions | boolean = postgresConfig.ssl.enabled;
if (sslCa !== '' || sslCert !== '' || sslKey !== '' || !sslRejectUnauthorized) {
ssl = {
ca: sslCa || undefined,
cert: sslCert || undefined,
key: sslKey || undefined,
rejectUnauthorized: sslRejectUnauthorized,
};
}
return {
type: 'postgres',
...getCommonOptions(),
...getOptionOverrides('postgresdb'),
schema: postgresConfig.schema,
poolSize: postgresConfig.poolSize,
migrations: postgresMigrations,
connectTimeoutMS: postgresConfig.connectionTimeoutMs,
ssl,
};
};
const getMysqlConnectionOptions = (dbType: 'mariadb' | 'mysqldb'): MysqlConnectionOptions => ({
type: dbType === 'mysqldb' ? 'mysql' : 'mariadb',
...getCommonOptions(),
...getOptionOverrides('mysqldb'),
migrations: mysqlMigrations,
timezone: 'Z', // set UTC as default
});
export function getConnectionOptions(): DataSourceOptions {
const globalConfig = Container.get(GlobalConfig);
const { type: dbType } = globalConfig.database;
switch (dbType) {
case 'sqlite':
return getSqliteConnectionOptions();
case 'postgresdb':
return getPostgresConnectionOptions();
case 'mariadb':
case 'mysqldb':
return getMysqlConnectionOptions(dbType);
default:
throw new UserError('Database type currently not supported', { extra: { dbType } });
}
}
export function arePostgresOptions(
options: DataSourceOptions,
): options is PostgresConnectionOptions {
return options.type === 'postgres';
}

View File

@@ -0,0 +1,145 @@
import { DatabaseConfig, InstanceSettingsConfig } from '@n8n/config';
import { entities } from '@n8n/db';
import { Service } from '@n8n/di';
import type { DataSourceOptions, LoggerOptions } from '@n8n/typeorm';
import type { MysqlConnectionOptions } from '@n8n/typeorm/driver/mysql/MysqlConnectionOptions';
import type { PostgresConnectionOptions } from '@n8n/typeorm/driver/postgres/PostgresConnectionOptions';
import type { SqliteConnectionOptions } from '@n8n/typeorm/driver/sqlite/SqliteConnectionOptions';
import type { SqlitePooledConnectionOptions } from '@n8n/typeorm/driver/sqlite-pooled/SqlitePooledConnectionOptions';
import { UserError } from 'n8n-workflow';
import path from 'path';
import type { TlsOptions } from 'tls';
import { InsightsByPeriod } from '@/modules/insights/database/entities/insights-by-period';
import { InsightsMetadata } from '@/modules/insights/database/entities/insights-metadata';
import { InsightsRaw } from '@/modules/insights/database/entities/insights-raw';
import { mysqlMigrations } from './migrations/mysqldb';
import { postgresMigrations } from './migrations/postgresdb';
import { sqliteMigrations } from './migrations/sqlite';
import { subscribers } from './subscribers';
@Service()
export class DbConnectionOptions {
constructor(
private readonly config: DatabaseConfig,
private readonly instanceSettingsConfig: InstanceSettingsConfig,
) {}
getOverrides(dbType: 'postgresdb' | 'mysqldb') {
const dbConfig = this.config[dbType];
return {
database: dbConfig.database,
host: dbConfig.host,
port: dbConfig.port,
username: dbConfig.user,
password: dbConfig.password,
};
}
getOptions(): DataSourceOptions {
const { type: dbType } = this.config;
switch (dbType) {
case 'sqlite':
return this.getSqliteConnectionOptions();
case 'postgresdb':
return this.getPostgresConnectionOptions();
case 'mariadb':
case 'mysqldb':
return this.getMysqlConnectionOptions(dbType);
default:
throw new UserError('Database type currently not supported', { extra: { dbType } });
}
}
private getCommonOptions() {
const { tablePrefix: entityPrefix, logging: loggingConfig } = this.config;
let loggingOption: LoggerOptions = loggingConfig.enabled;
if (loggingOption) {
const optionsString = loggingConfig.options.replace(/\s+/g, '');
if (optionsString === 'all') {
loggingOption = optionsString;
} else {
loggingOption = optionsString.split(',') as LoggerOptions;
}
}
return {
entityPrefix,
entities: [...Object.values(entities), InsightsRaw, InsightsByPeriod, InsightsMetadata],
subscribers: Object.values(subscribers),
migrationsTableName: `${entityPrefix}migrations`,
migrationsRun: false,
synchronize: false,
maxQueryExecutionTime: loggingConfig.maxQueryExecutionTime,
logging: loggingOption,
};
}
private getSqliteConnectionOptions(): SqliteConnectionOptions | SqlitePooledConnectionOptions {
const { sqlite: sqliteConfig } = this.config;
const { n8nFolder } = this.instanceSettingsConfig;
const commonOptions = {
...this.getCommonOptions(),
database: path.resolve(n8nFolder, sqliteConfig.database),
migrations: sqliteMigrations,
};
if (sqliteConfig.poolSize > 0) {
return {
type: 'sqlite-pooled',
poolSize: sqliteConfig.poolSize,
enableWAL: true,
acquireTimeout: 60_000,
destroyTimeout: 5_000,
...commonOptions,
};
} else {
return {
type: 'sqlite',
enableWAL: sqliteConfig.enableWAL,
...commonOptions,
};
}
}
private getPostgresConnectionOptions(): PostgresConnectionOptions {
const { postgresdb: postgresConfig } = this.config;
const {
ssl: { ca: sslCa, cert: sslCert, key: sslKey, rejectUnauthorized: sslRejectUnauthorized },
} = postgresConfig;
let ssl: TlsOptions | boolean = postgresConfig.ssl.enabled;
if (sslCa !== '' || sslCert !== '' || sslKey !== '' || !sslRejectUnauthorized) {
ssl = {
ca: sslCa || undefined,
cert: sslCert || undefined,
key: sslKey || undefined,
rejectUnauthorized: sslRejectUnauthorized,
};
}
return {
type: 'postgres',
...this.getCommonOptions(),
...this.getOverrides('postgresdb'),
schema: postgresConfig.schema,
poolSize: postgresConfig.poolSize,
migrations: postgresMigrations,
connectTimeoutMS: postgresConfig.connectionTimeoutMs,
ssl,
};
}
private getMysqlConnectionOptions(dbType: 'mariadb' | 'mysqldb'): MysqlConnectionOptions {
return {
type: dbType === 'mysqldb' ? 'mysql' : 'mariadb',
...this.getCommonOptions(),
...this.getOverrides('mysqldb'),
migrations: mysqlMigrations,
timezone: 'Z', // set UTC as default
};
}
}

View File

@@ -0,0 +1,102 @@
import { Memoized } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { DataSource } from '@n8n/typeorm';
import { ErrorReporter } from 'n8n-core';
import { DbConnectionTimeoutError, ensureError } from 'n8n-workflow';
import { inTest } from '@/constants';
import { DbConnectionOptions } from './db-connection-options';
import type { Migration } from './types';
import { wrapMigration } from './utils/migration-helpers';
type ConnectionState = {
connected: boolean;
migrated: boolean;
};
@Service()
export class DbConnection {
private dataSource: DataSource;
private pingTimer: NodeJS.Timer | undefined;
readonly connectionState: ConnectionState = {
connected: false,
migrated: false,
};
constructor(
private readonly errorReporter: ErrorReporter,
private readonly connectionOptions: DbConnectionOptions,
) {
this.dataSource = new DataSource(this.options);
Container.set(DataSource, this.dataSource);
}
@Memoized
get options() {
return this.connectionOptions.getOptions();
}
async init(): Promise<void> {
const { connectionState, options } = this;
if (connectionState.connected) return;
try {
await this.dataSource.initialize();
} catch (e) {
let error = ensureError(e);
if (
options.type === 'postgres' &&
error.message === 'Connection terminated due to connection timeout'
) {
error = new DbConnectionTimeoutError({
cause: error,
configuredTimeoutInMs: options.connectTimeoutMS!,
});
}
throw error;
}
connectionState.connected = true;
if (!inTest) this.scheduleNextPing();
}
async migrate() {
const { dataSource, connectionState } = this;
(dataSource.options.migrations as Migration[]).forEach(wrapMigration);
await dataSource.runMigrations({ transaction: 'each' });
connectionState.migrated = true;
}
async close() {
if (this.pingTimer) {
clearTimeout(this.pingTimer);
this.pingTimer = undefined;
}
if (this.dataSource.isInitialized) {
await this.dataSource.destroy();
this.connectionState.connected = false;
}
}
/** Ping DB connection every 2 seconds */
private scheduleNextPing() {
this.pingTimer = setTimeout(async () => await this.ping(), 2000);
}
private async ping() {
if (!this.dataSource.isInitialized) return;
try {
await this.dataSource.query('SELECT 1');
this.connectionState.connected = true;
return;
} catch (error) {
this.connectionState.connected = false;
this.errorReporter.error(error);
} finally {
this.scheduleNextPing();
}
}
}

View File

@@ -1,91 +0,0 @@
import { Container } from '@n8n/di';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import type { EntityManager } from '@n8n/typeorm';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { DataSource as Connection } from '@n8n/typeorm';
import { ErrorReporter } from 'n8n-core';
import { DbConnectionTimeoutError, ensureError } from 'n8n-workflow';
import { inTest } from '@/constants';
import { getConnectionOptions, arePostgresOptions } from '@/databases/config';
import type { Migration } from '@/databases/types';
import { wrapMigration } from '@/databases/utils/migration-helpers';
let connection: Connection;
export const getConnection = () => connection!;
type ConnectionState = {
connected: boolean;
migrated: boolean;
};
export const connectionState: ConnectionState = {
connected: false,
migrated: false,
};
// Ping DB connection every 2 seconds
let pingTimer: NodeJS.Timer | undefined;
if (!inTest) {
const pingDBFn = async () => {
if (connection?.isInitialized) {
try {
await connection.query('SELECT 1');
connectionState.connected = true;
return;
} catch (error) {
Container.get(ErrorReporter).error(error);
} finally {
pingTimer = setTimeout(pingDBFn, 2000);
}
}
connectionState.connected = false;
};
pingTimer = setTimeout(pingDBFn, 2000);
}
export async function transaction<T>(fn: (entityManager: EntityManager) => Promise<T>): Promise<T> {
return await connection.transaction(fn);
}
export async function init(): Promise<void> {
if (connectionState.connected) return;
const connectionOptions = getConnectionOptions();
connection = new Connection(connectionOptions);
Container.set(Connection, connection);
try {
await connection.initialize();
} catch (e) {
let error = ensureError(e);
if (
arePostgresOptions(connectionOptions) &&
error.message === 'Connection terminated due to connection timeout'
) {
error = new DbConnectionTimeoutError({
cause: error,
configuredTimeoutInMs: connectionOptions.connectTimeoutMS!,
});
}
throw error;
}
connectionState.connected = true;
}
export async function migrate() {
(connection.options.migrations as Migration[]).forEach(wrapMigration);
await connection.runMigrations({ transaction: 'each' });
connectionState.migrated = true;
}
export const close = async () => {
if (pingTimer) {
clearTimeout(pingTimer);
pingTimer = undefined;
}
if (connection.isInitialized) await connection.destroy();
};

View File

@@ -28,10 +28,6 @@ import { mockNodeTypesData } from '@test-integration/utils/node-types-data';
import { TestRunnerService } from '../test-runner.service.ee';
jest.mock('@/db', () => ({
transaction: (cb: any) => cb(),
}));
const wfUnderTestJson = JSON.parse(
readFileSync(path.join(__dirname, './mock-data/workflow.under-test.json'), { encoding: 'utf-8' }),
);
@@ -231,7 +227,9 @@ async function mockLongExecutionPromise(data: IRun, delay: number): Promise<IRun
}
describe('TestRunnerService', () => {
const executionRepository = mock<ExecutionRepository>();
const executionRepository = mock<ExecutionRepository>({
manager: { transaction: (cb: any) => cb() },
});
const workflowRepository = mock<WorkflowRepository>();
const workflowRunner = mock<WorkflowRunner>();
const activeExecutions = mock<ActiveExecutions>();

View File

@@ -24,7 +24,6 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { EVALUATION_METRICS_NODE } from '@/constants';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import * as Db from '@/db';
import { TestCaseExecutionError, TestRunError } from '@/evaluation.ee/test-runner/errors.ee';
import { NodeTypes } from '@/node-types';
import { Telemetry } from '@/telemetry';
@@ -379,6 +378,8 @@ export class TestRunnerService {
let testRunEndStatusForTelemetry;
const abortSignal = abortController.signal;
const { manager: dbManager } = this.executionRepository;
try {
// Get the evaluation workflow
const evaluationWorkflow = await this.workflowRepository.findById(test.evaluationWorkflowId);
@@ -482,7 +483,7 @@ export class TestRunnerService {
// In case of a permission check issue, the test case execution will be undefined.
// If that happens, or if the test case execution produced an error, mark the test case as failed.
if (!testCaseExecution || testCaseExecution.data.resultData.error) {
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.incrementFailed(testRun.id, trx);
await this.testCaseExecutionRepository.markAsFailed({
testRunId: testRun.id,
@@ -525,7 +526,7 @@ export class TestRunnerService {
);
if (evalExecution.data.resultData.error) {
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.incrementFailed(testRun.id, trx);
await this.testCaseExecutionRepository.markAsFailed({
testRunId: testRun.id,
@@ -535,7 +536,7 @@ export class TestRunnerService {
});
});
} else {
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.incrementPassed(testRun.id, trx);
await this.testCaseExecutionRepository.markAsCompleted({
@@ -548,7 +549,7 @@ export class TestRunnerService {
}
} catch (e) {
// In case of an unexpected error, increment the failed count and continue with the next test case
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.incrementFailed(testRun.id, trx);
if (e instanceof TestCaseExecutionError) {
@@ -576,7 +577,7 @@ export class TestRunnerService {
// Mark the test run as completed or cancelled
if (abortSignal.aborted) {
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.markAsCancelled(testRun.id, trx);
await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRun.id, trx);
@@ -598,7 +599,7 @@ export class TestRunnerService {
stoppedOn: e.extra?.executionId,
});
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.markAsCancelled(testRun.id, trx);
await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRun.id, trx);
});
@@ -642,8 +643,9 @@ export class TestRunnerService {
abortController.abort();
this.abortControllers.delete(testRunId);
} else {
const { manager: dbManager } = this.executionRepository;
// If there is no abort controller - just mark the test run and all its' pending test case executions as cancelled
await Db.transaction(async (trx) => {
await dbManager.transaction(async (trx) => {
await this.testRunRepository.markAsCancelled(testRunId, trx);
await this.testCaseExecutionRepository.markAllPendingAsCancelled(testRunId, trx);
});

View File

@@ -13,7 +13,6 @@ import { randomString } from 'n8n-workflow';
import config from '@/config';
import { UserRepository } from '@/databases/repositories/user.repository';
import * as Db from '@/db';
import { License } from '@/license';
import {
@@ -176,7 +175,8 @@ export const processUsers = async (
toDisableUsers: string[],
): Promise<void> => {
const userRepository = Container.get(UserRepository);
await Db.transaction(async (transactionManager) => {
const { manager: dbManager } = userRepository;
await dbManager.transaction(async (transactionManager) => {
return await Promise.all([
...toCreateUsers.map(async ([ldapId, user]) => {
const { user: savedUser } = await userRepository.createUserWithProject(

View File

@@ -15,7 +15,6 @@ import type {
INodePropertyOptions,
} from 'n8n-workflow';
import * as Db from '@/db';
import { EventService } from '@/events/event.service';
import { ExternalHooks } from '@/external-hooks';
import type { CredentialRequest } from '@/requests';
@@ -54,14 +53,16 @@ export async function saveCredential(
user: User,
encryptedData: ICredentialsDb,
): Promise<CredentialsEntity> {
const result = await Db.transaction(async (transactionManager) => {
const projectRepository = Container.get(ProjectRepository);
const { manager: dbManager } = projectRepository;
const result = await dbManager.transaction(async (transactionManager) => {
const savedCredential = await transactionManager.save<CredentialsEntity>(credential);
savedCredential.data = credential.data;
const newSharedCredential = new SharedCredentials();
const personalProject = await Container.get(ProjectRepository).getPersonalProjectForUserOrFail(
const personalProject = await projectRepository.getPersonalProjectForUserOrFail(
user.id,
transactionManager,
);

View File

@@ -12,7 +12,6 @@ import type { Scope, WorkflowSharingRole } from '@n8n/permissions';
import type { WorkflowId } from 'n8n-workflow';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import * as Db from '@/db';
import { License } from '@/license';
import { WorkflowSharingService } from '@/workflows/workflow-sharing.service';
@@ -67,7 +66,8 @@ export async function createWorkflow(
personalProject: Project,
role: WorkflowSharingRole,
): Promise<WorkflowEntity> {
return await Db.transaction(async (transactionManager) => {
const { manager: dbManager } = Container.get(SharedWorkflowRepository);
return await dbManager.transaction(async (transactionManager) => {
const newWorkflow = new WorkflowEntity();
Object.assign(newWorkflow, workflow);
const savedWorkflow = await transactionManager.save<WorkflowEntity>(newWorkflow);
@@ -122,8 +122,9 @@ export async function getWorkflowTags(workflowId: string) {
});
}
export async function updateTags(workflowId: string, newTags: string[]): Promise<any> {
await Db.transaction(async (transactionManager) => {
export async function updateTags(workflowId: string, newTags: string[]): Promise<void> {
const { manager: dbManager } = Container.get(SharedWorkflowRepository);
await dbManager.transaction(async (transactionManager) => {
const oldTags = await transactionManager.findBy(WorkflowTagMapping, { workflowId });
if (oldTags.length > 0) {
await transactionManager.delete(WorkflowTagMapping, oldTags);

View File

@@ -5,6 +5,7 @@ import type { InstanceSettings } from 'n8n-core';
import { AssertionError } from 'node:assert';
import * as http from 'node:http';
import type { DbConnection } from '@/databases/db-connection';
import type { ExternalHooks } from '@/external-hooks';
import type { PrometheusMetricsService } from '@/metrics/prometheus-metrics.service';
import { bodyParser, rawBodyReader } from '@/middlewares';
@@ -30,6 +31,19 @@ describe('WorkerServer', () => {
const externalHooks = mock<ExternalHooks>();
const instanceSettings = mock<InstanceSettings>({ instanceType: 'worker' });
const prometheusMetricsService = mock<PrometheusMetricsService>();
const dbConnection = mock<DbConnection>();
const newWorkerServer = () =>
new WorkerServer(
globalConfig,
mockLogger(),
dbConnection,
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
beforeEach(() => {
globalConfig = mock<GlobalConfig>({
@@ -50,6 +64,7 @@ describe('WorkerServer', () => {
new WorkerServer(
globalConfig,
mockLogger(),
dbConnection,
mock(),
externalHooks,
mock<InstanceSettings>({ instanceType: 'webhook' }),
@@ -61,7 +76,7 @@ describe('WorkerServer', () => {
it('should exit if port taken', async () => {
const server = mock<http.Server>();
const procesExitSpy = jest
const processExitSpy = jest
.spyOn(process, 'exit')
.mockImplementation(() => undefined as never);
@@ -72,19 +87,11 @@ describe('WorkerServer', () => {
return server;
});
new WorkerServer(
globalConfig,
mockLogger(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
newWorkerServer();
expect(procesExitSpy).toHaveBeenCalledWith(1);
expect(processExitSpy).toHaveBeenCalledWith(1);
procesExitSpy.mockRestore();
processExitSpy.mockRestore();
});
});
@@ -99,15 +106,7 @@ describe('WorkerServer', () => {
return server;
});
const workerServer = new WorkerServer(
globalConfig,
mockLogger(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
const workerServer = newWorkerServer();
const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites';
globalConfig.credentials.overwrite.endpoint = CREDENTIALS_OVERWRITE_ENDPOINT;
@@ -134,15 +133,7 @@ describe('WorkerServer', () => {
return server;
});
const workerServer = new WorkerServer(
globalConfig,
mockLogger(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
const workerServer = newWorkerServer();
await workerServer.init({ health: true, overwrites: false, metrics: true });
@@ -155,15 +146,7 @@ describe('WorkerServer', () => {
const server = mock<http.Server>();
jest.spyOn(http, 'createServer').mockReturnValue(server);
const workerServer = new WorkerServer(
globalConfig,
mockLogger(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
const workerServer = newWorkerServer();
await expect(
workerServer.init({ health: false, overwrites: false, metrics: false }),
).rejects.toThrowError(AssertionError);
@@ -173,15 +156,7 @@ describe('WorkerServer', () => {
const server = mock<http.Server>();
jest.spyOn(http, 'createServer').mockReturnValue(server);
const workerServer = new WorkerServer(
globalConfig,
mockLogger(),
mock(),
externalHooks,
instanceSettings,
prometheusMetricsService,
mock(),
);
const workerServer = newWorkerServer();
server.listen.mockImplementation((...args: unknown[]) => {
const callback = args.find((arg) => typeof arg === 'function');

View File

@@ -8,7 +8,7 @@ import http from 'node:http';
import type { Server } from 'node:http';
import { CredentialsOverwrites } from '@/credentials-overwrites';
import * as Db from '@/db';
import { DbConnection } from '@/databases/db-connection';
import { CredentialsOverwritesAlreadySetError } from '@/errors/credentials-overwrites-already-set.error';
import { NonJsonBodyError } from '@/errors/non-json-body.error';
import { ExternalHooks } from '@/external-hooks';
@@ -49,6 +49,7 @@ export class WorkerServer {
constructor(
private readonly globalConfig: GlobalConfig,
private readonly logger: Logger,
private readonly dbConnection: DbConnection,
private readonly credentialsOverwrites: CredentialsOverwrites,
private readonly externalHooks: ExternalHooks,
private readonly instanceSettings: InstanceSettings,
@@ -122,9 +123,10 @@ export class WorkerServer {
}
private async readiness(_req: express.Request, res: express.Response) {
const { connectionState } = this.dbConnection;
const isReady =
Db.connectionState.connected &&
Db.connectionState.migrated &&
connectionState.connected &&
connectionState.migrated &&
this.redisClientService.isConnected();
return isReady

View File

@@ -9,7 +9,6 @@ import { randomString } from 'n8n-workflow';
import type { OpenAPIV3 } from 'openapi-types';
import { UserRepository } from '@/databases/repositories/user.repository';
import { getConnection } from '@/db';
import type { EventService } from '@/events/event.service';
import { getOwnerOnlyApiKeyScopes } from '@/public-api/permissions.ee';
import type { AuthenticatedRequest } from '@/requests';
@@ -50,8 +49,8 @@ describe('PublicApiKeyService', () => {
beforeAll(async () => {
await testDb.init();
userRepository = new UserRepository(getConnection());
apiKeyRepository = new ApiKeyRepository(getConnection());
userRepository = Container.get(UserRepository);
apiKeyRepository = Container.get(ApiKeyRepository);
});
afterAll(async () => {

View File

@@ -12,7 +12,6 @@ import { Logger } from 'n8n-core';
import { type INode, type INodeCredentialsDetails, type IWorkflowBase } from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import * as Db from '@/db';
import { replaceInvalidCredentials } from '@/workflow-helpers';
@Service()
@@ -47,7 +46,8 @@ export class ImportService {
if (hasInvalidCreds) await this.replaceInvalidCreds(workflow);
}
await Db.transaction(async (tx) => {
const { manager: dbManager } = this.credentialsRepository;
await dbManager.transaction(async (tx) => {
for (const workflow of workflows) {
if (workflow.active) {
workflow.active = false;

View File

@@ -2,20 +2,22 @@ import type { ExecutionsConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { DbConnection } from '@/databases/db-connection';
import { mockLogger } from '@test/mocking';
import { ExecutionsPruningService } from '../executions-pruning.service';
jest.mock('@/db', () => ({
connectionState: { migrated: true },
}));
describe('PruningService', () => {
const dbConnection = mock<DbConnection>({
connectionState: { migrated: true },
});
describe('init', () => {
it('should start pruning on main instance that is the leader', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true, isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock(),
@@ -31,6 +33,7 @@ describe('PruningService', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: false, isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock(),
@@ -48,6 +51,7 @@ describe('PruningService', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock<ExecutionsConfig>({ pruneData: true }),
@@ -60,6 +64,7 @@ describe('PruningService', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock<ExecutionsConfig>({ pruneData: false }),
@@ -72,6 +77,7 @@ describe('PruningService', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker', isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock<ExecutionsConfig>({ pruneData: true }),
@@ -89,6 +95,7 @@ describe('PruningService', () => {
instanceType: 'main',
isMultiMain: true,
}),
dbConnection,
mock(),
mock(),
mock<ExecutionsConfig>({ pruneData: true }),
@@ -103,6 +110,7 @@ describe('PruningService', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock<ExecutionsConfig>({ pruneData: false }),
@@ -127,6 +135,7 @@ describe('PruningService', () => {
const pruningService = new ExecutionsPruningService(
mockLogger(),
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
dbConnection,
mock(),
mock(),
mock<ExecutionsConfig>({ pruneData: true }),

View File

@@ -7,7 +7,7 @@ import { ensureError } from 'n8n-workflow';
import { strict } from 'node:assert';
import { Time } from '@/constants';
import { connectionState as dbConnectionState } from '@/db';
import { DbConnection } from '@/databases/db-connection';
/**
* Responsible for deleting old executions from the database and deleting their
@@ -43,6 +43,7 @@ export class ExecutionsPruningService {
constructor(
private readonly logger: Logger,
private readonly instanceSettings: InstanceSettings,
private readonly dbConnection: DbConnection,
private readonly executionRepository: ExecutionRepository,
private readonly binaryDataService: BinaryDataService,
private readonly executionsConfig: ExecutionsConfig,
@@ -66,7 +67,8 @@ export class ExecutionsPruningService {
@OnLeaderTakeover()
startPruning() {
if (!this.isEnabled || !dbConnectionState.migrated || this.isShuttingDown) return;
const { connectionState } = this.dbConnection;
if (!this.isEnabled || !connectionState.migrated || this.isShuttingDown) return;
this.scheduleRollingSoftDeletions();
this.scheduleNextHardDeletion();

View File

@@ -35,7 +35,6 @@ import { UnexpectedError } from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import * as Db from '@/db';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { ForbiddenError } from '@/errors/response-errors/forbidden.error';
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
@@ -135,8 +134,10 @@ export class WorkflowsController {
}
}
const { manager: dbManager } = this.projectRepository;
let project: Project | null;
const savedWorkflow = await Db.transaction(async (transactionManager) => {
const savedWorkflow = await dbManager.transaction(async (transactionManager) => {
const workflow = await transactionManager.save<WorkflowEntity>(newWorkflow);
const { projectId, parentFolderId } = req.body;
@@ -496,7 +497,8 @@ export class WorkflowsController {
}
let newShareeIds: string[] = [];
await Db.transaction(async (trx) => {
const { manager: dbManager } = this.projectRepository;
await dbManager.transaction(async (trx) => {
const currentPersonalProjectIDs = workflow.shared
.filter((sw) => sw.role === 'workflow:editor')
.map((sw) => sw.projectId);

View File

@@ -6,6 +6,7 @@ import { BinaryDataService, InstanceSettings } from 'n8n-core';
import type { ExecutionStatus, IWorkflowBase } from 'n8n-workflow';
import { Time } from '@/constants';
import { DbConnection } from '@/databases/db-connection';
import { ExecutionsPruningService } from '@/services/pruning/executions-pruning.service';
import {
@@ -34,6 +35,7 @@ describe('softDeleteOnPruningCycle()', () => {
pruningService = new ExecutionsPruningService(
mockLogger(),
instanceSettings,
Container.get(DbConnection),
Container.get(ExecutionRepository),
mockInstance(BinaryDataService),
executionsConfig,

View File

@@ -21,7 +21,6 @@ import {
newWorkflow,
} from './shared/db/workflows';
import * as testDb from './shared/test-db';
import { mockInstance } from '../shared/mocking';
describe('ImportService', () => {
let importService: ImportService;
@@ -37,9 +36,7 @@ describe('ImportService', () => {
tagRepository = Container.get(TagRepository);
const credentialsRepository = mockInstance(CredentialsRepository);
credentialsRepository.find.mockResolvedValue([]);
const credentialsRepository = Container.get(CredentialsRepository);
importService = new ImportService(mock(), credentialsRepository, tagRepository);
});

View File

@@ -5,8 +5,8 @@ import type { DataSourceOptions } from '@n8n/typeorm';
import { DataSource as Connection } from '@n8n/typeorm';
import { randomString } from 'n8n-workflow';
import { getOptionOverrides } from '@/databases/config';
import * as Db from '@/db';
import { DbConnection } from '@/databases/db-connection';
import { DbConnectionOptions } from '@/databases/db-connection-options';
export const testDbPrefix = 'n8n_test_';
@@ -34,20 +34,23 @@ export async function init() {
globalConfig.database.mysqldb.database = testDbName;
}
await Db.init();
await Db.migrate();
const dbConnection = Container.get(DbConnection);
await dbConnection.init();
await dbConnection.migrate();
}
export function isReady() {
return Db.connectionState.connected && Db.connectionState.migrated;
const { connectionState } = Container.get(DbConnection);
return connectionState.connected && connectionState.migrated;
}
/**
* Drop test DB, closing bootstrap connection if existing.
*/
export async function terminate() {
await Db.close();
Db.connectionState.connected = false;
const dbConnection = Container.get(DbConnection);
await dbConnection.close();
dbConnection.connectionState.connected = false;
}
type EntityName = keyof typeof entities | 'InsightsRaw' | 'InsightsByPeriod' | 'InsightsMetadata';
@@ -71,7 +74,7 @@ export const getBootstrapDBOptions = (dbType: 'postgresdb' | 'mysqldb'): DataSou
const type = dbType === 'postgresdb' ? 'postgres' : 'mysql';
return {
type,
...getOptionOverrides(dbType),
...Container.get(DbConnectionOptions).getOverrides(dbType),
database: type,
entityPrefix: globalConfig.database.tablePrefix,
schema: dbType === 'postgresdb' ? globalConfig.database.postgresdb.schema : undefined,

View File

@@ -1,3 +1,4 @@
import { InstanceSettingsConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
jest.mock('node:fs', () => mock<typeof fs>());
import * as fs from 'node:fs';
@@ -5,7 +6,6 @@ import * as fs from 'node:fs';
import type { Logger } from '@/logging/logger';
import { InstanceSettings } from '../instance-settings';
import { InstanceSettingsConfig } from '../instance-settings-config';
import { WorkerMissingEncryptionKey } from '../worker-missing-encryption-key.error';
describe('InstanceSettings', () => {

View File

@@ -1,3 +1,4 @@
import { InstanceSettingsConfig } from '@n8n/config';
import { Memoized } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { createHash, randomBytes } from 'crypto';
@@ -8,7 +9,6 @@ import path from 'path';
import { Logger } from '@/logging/logger';
import { InstanceSettingsConfig } from './instance-settings-config';
import { WorkerMissingEncryptionKey } from './worker-missing-encryption-key.error';
const nanoid = customAlphabet(ALPHABET, 16);

View File

@@ -3,12 +3,10 @@ jest.mock('n8n-workflow', () => ({
LoggerProxy: { init: jest.fn() },
}));
import type { GlobalConfig } from '@n8n/config';
import type { GlobalConfig, InstanceSettingsConfig } from '@n8n/config';
import { mock } from 'jest-mock-extended';
import { LoggerProxy } from 'n8n-workflow';
import type { InstanceSettingsConfig } from '@/instance-settings/instance-settings-config';
import { Logger } from '../logger';
describe('Logger', () => {

View File

@@ -1,5 +1,5 @@
import type { LogScope } from '@n8n/config';
import { GlobalConfig } from '@n8n/config';
import { GlobalConfig, InstanceSettingsConfig } from '@n8n/config';
import { Service } from '@n8n/di';
import callsites from 'callsites';
import type { TransformableInfo } from 'logform';
@@ -15,7 +15,6 @@ import pc from 'picocolors';
import winston from 'winston';
import { inDevelopment, inProduction } from '@/constants';
import { InstanceSettingsConfig } from '@/instance-settings/instance-settings-config';
import { isObjectLiteral } from '@/utils/is-object-literal';
const noOp = () => {};