diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index cb9096429c..a51d959603 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -21,6 +21,7 @@ export const LOG_SCOPES = [ 'ssh-client', 'cron', 'community-nodes', + 'legacy-sqlite-execution-recovery', ] as const; export type LogScope = (typeof LOG_SCOPES)[number]; diff --git a/packages/@n8n/db/src/entities/execution-data.ts b/packages/@n8n/db/src/entities/execution-data.ts index b0cc7f6fed..275047525b 100644 --- a/packages/@n8n/db/src/entities/execution-data.ts +++ b/packages/@n8n/db/src/entities/execution-data.ts @@ -1,4 +1,4 @@ -import { Column, Entity, ManyToOne, PrimaryColumn } from '@n8n/typeorm'; +import { Column, Entity, JoinColumn, OneToOne, PrimaryColumn } from '@n8n/typeorm'; import { IWorkflowBase } from 'n8n-workflow'; import { JsonColumn } from './abstract-entity'; @@ -21,8 +21,11 @@ export class ExecutionData { @PrimaryColumn({ transformer: idStringifier }) executionId: string; - @ManyToOne('ExecutionEntity', 'data', { + @OneToOne('ExecutionEntity', 'executionData', { onDelete: 'CASCADE', }) + @JoinColumn({ + name: 'executionId', + }) execution: ExecutionEntity; } diff --git a/packages/@n8n/db/src/repositories/execution.repository.ts b/packages/@n8n/db/src/repositories/execution.repository.ts index 22b0bbc19b..c86feea5d9 100644 --- a/packages/@n8n/db/src/repositories/execution.repository.ts +++ b/packages/@n8n/db/src/repositories/execution.repository.ts @@ -139,6 +139,26 @@ export class ExecutionRepository extends Repository { super(ExecutionEntity, dataSource.manager); } + // Find all executions that are in the 'new' state but do not have associated execution data. + // These executions are considered invalid and will be marked as 'crashed'. + // Since there is no join in this query the returned ids are unique. + async findQueuedExecutionsWithoutData(): Promise { + return await this.createQueryBuilder('execution') + .where('execution.status = :status', { status: 'new' }) + .andWhere( + 'NOT EXISTS (' + + this.manager + .createQueryBuilder() + .select('1') + .from(ExecutionData, 'execution_data') + .where('execution_data.executionId = execution.id') + .getQuery() + + ')', + ) + .select('execution.id') + .getMany(); + } + async findMultipleExecutions( queryParams: FindManyOptions, options?: { @@ -219,7 +239,10 @@ export class ExecutionRepository extends Repository { this.errorReporter.error( new UnexpectedError('Found executions without executionData', { - extra: { executionIds: executions.map(({ id }) => id) }, + extra: { + executionIds: executions.map(({ id }) => id), + isLegacySqlite: this.globalConfig.database.isLegacySqlite, + }, }), ); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 2bb409468b..97a1904660 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -315,6 +315,15 @@ export class Start extends BaseCommand> { ); } + if (this.globalConfig.database.isLegacySqlite) { + // Employ lazy loading to avoid unnecessary imports in the CLI + // and to ensure that the legacy recovery service is only used when needed. + const { LegacySqliteExecutionRecoveryService } = await import( + '@/executions/legacy-sqlite-execution-recovery.service' + ); + await Container.get(LegacySqliteExecutionRecoveryService).cleanupWorkflowExecutions(); + } + await this.server.start(); Container.get(ExecutionsPruningService).init(); diff --git a/packages/cli/src/deprecation/deprecation.service.ts b/packages/cli/src/deprecation/deprecation.service.ts index 6e87920264..22cbe522ca 100644 --- a/packages/cli/src/deprecation/deprecation.service.ts +++ b/packages/cli/src/deprecation/deprecation.service.ts @@ -47,6 +47,12 @@ export class DeprecationService { message: 'MySQL and MariaDB are deprecated. Please migrate to PostgreSQL.', checkValue: (value: string) => ['mysqldb', 'mariadb'].includes(value), }, + { + envVar: 'DB_SQLITE_POOL_SIZE', + message: + 'Running SQLite without a pool of read connections is deprecated. Please set `DB_SQLITE_POOL_SIZE` to a value higher than zero. See: https://docs.n8n.io/hosting/configuration/environment-variables/database/#sqlite', + checkValue: (_: string) => this.globalConfig.database.isLegacySqlite, + }, { envVar: 'N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN', message: `n8n no longer deregisters webhooks at startup and shutdown. ${SAFE_TO_REMOVE}`, diff --git a/packages/cli/src/executions/legacy-sqlite-execution-recovery.service.ts b/packages/cli/src/executions/legacy-sqlite-execution-recovery.service.ts new file mode 100644 index 0000000000..34c4385303 --- /dev/null +++ b/packages/cli/src/executions/legacy-sqlite-execution-recovery.service.ts @@ -0,0 +1,50 @@ +import { Logger } from '@n8n/backend-common'; +import { GlobalConfig } from '@n8n/config'; +import { DbConnection, ExecutionRepository } from '@n8n/db'; +import { Service } from '@n8n/di'; +import assert from 'assert'; + +/** + * Service for recovering executions that are missing execution data, this should only happen + * for sqlite legacy databases. + */ +@Service() +export class LegacySqliteExecutionRecoveryService { + private readonly logger: Logger; + constructor( + logger: Logger, + private readonly executionRepository: ExecutionRepository, + private readonly globalConfig: GlobalConfig, + private readonly dbConnection: DbConnection, + ) { + this.logger = logger.scoped('legacy-sqlite-execution-recovery'); + } + + /** + * Remove workflow executions that are in the `new` state but have no associated execution data. + * This is a legacy recovery operation for SQLite databases where executions might be left + * in an inconsistent state due to missing execution data. + * It marks these executions as `crashed` to prevent them from being processed further. + * This method should only be called when we are in legacy SQLite mode. + */ + async cleanupWorkflowExecutions() { + assert(this.globalConfig.database.isLegacySqlite, 'Only usable when on legacy SQLite driver'); + assert( + this.dbConnection.connectionState.connected && this.dbConnection.connectionState.migrated, + 'The database connection must be connected and migrated before running cleanupWorkflowExecutions', + ); + + this.logger.debug('Starting legacy SQLite execution recovery...'); + + const invalidExecutions = await this.executionRepository.findQueuedExecutionsWithoutData(); + + if (invalidExecutions.length > 0) { + await this.executionRepository.markAsCrashed(invalidExecutions.map((e) => e.id)); + this.logger.debug( + `Marked ${invalidExecutions.length} executions as crashed due to missing execution data.`, + ); + } + + this.logger.debug('Legacy SQLite execution recovery completed.'); + } +} diff --git a/packages/cli/test/integration/executions/legacy-recovery.test.ts b/packages/cli/test/integration/executions/legacy-recovery.test.ts new file mode 100644 index 0000000000..41e19b38f0 --- /dev/null +++ b/packages/cli/test/integration/executions/legacy-recovery.test.ts @@ -0,0 +1,78 @@ +import { LegacySqliteExecutionRecoveryService } from '@/executions/legacy-sqlite-execution-recovery.service'; +import { Logger } from '@n8n/backend-common'; +import { testDb } from '@n8n/backend-test-utils'; +import { GlobalConfig } from '@n8n/config'; +import { DbConnection, ExecutionRepository, WorkflowRepository } from '@n8n/db'; +import { Container } from '@n8n/di'; + +const globalConfig = Container.get(GlobalConfig); + +if (globalConfig.database.isLegacySqlite) { + beforeAll(async () => { + await testDb.init(); + }); + + describe('Legacy SQLite Execution Recovery Service', () => { + let legacySqliteExecutionRecoveryService: LegacySqliteExecutionRecoveryService; + let executionRepository: ExecutionRepository; + let dbConnection: DbConnection; + + beforeEach(async () => { + await testDb.truncate(['WorkflowEntity', 'ExecutionEntity', 'ExecutionData']); + executionRepository = Container.get(ExecutionRepository); + dbConnection = Container.get(DbConnection); + legacySqliteExecutionRecoveryService = new LegacySqliteExecutionRecoveryService( + Container.get(Logger), + executionRepository, + globalConfig, + dbConnection, + ); + + const workflowRepository = Container.get(WorkflowRepository); + + const workflow = workflowRepository.create({ + id: 'test-workflow-id', + name: 'Test Workflow', + active: true, + nodes: [], + connections: {}, + settings: {}, + createdAt: new Date(), + updatedAt: new Date(), + }); + + await workflowRepository.save(workflow); + }); + + afterAll(async () => { + await testDb.terminate(); + }); + + it('should recover executions without data', async () => { + // Arrange + let execution = executionRepository.create({ + status: 'new', + mode: 'manual', + workflowId: 'test-workflow-id', + createdAt: new Date(), + finished: false, + }); + + execution = await executionRepository.save(execution); + + // Act + await legacySqliteExecutionRecoveryService.cleanupWorkflowExecutions(); + + const executionMarkedAsCrashed = await executionRepository.findOneBy({ id: execution.id }); + // Assert + expect(executionMarkedAsCrashed?.id).toBe(execution.id); + expect(executionMarkedAsCrashed?.status).toBe('crashed'); + }); + }); +} else { + describe('Legacy SQLite Execution Recovery Service', () => { + it('should not run on non-legacy SQLite databases', () => { + // We need an empty test here to ensure that the test suite is not empty + }); + }); +}