mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 10:02:05 +00:00
fix(core): Mark invalid enqueued executions as crashed during startup for legacy SQLite driver (#17629)
This commit is contained in:
@@ -21,6 +21,7 @@ export const LOG_SCOPES = [
|
||||
'ssh-client',
|
||||
'cron',
|
||||
'community-nodes',
|
||||
'legacy-sqlite-execution-recovery',
|
||||
] as const;
|
||||
|
||||
export type LogScope = (typeof LOG_SCOPES)[number];
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Column, Entity, ManyToOne, PrimaryColumn } from '@n8n/typeorm';
|
||||
import { Column, Entity, JoinColumn, OneToOne, PrimaryColumn } from '@n8n/typeorm';
|
||||
import { IWorkflowBase } from 'n8n-workflow';
|
||||
|
||||
import { JsonColumn } from './abstract-entity';
|
||||
@@ -21,8 +21,11 @@ export class ExecutionData {
|
||||
@PrimaryColumn({ transformer: idStringifier })
|
||||
executionId: string;
|
||||
|
||||
@ManyToOne('ExecutionEntity', 'data', {
|
||||
@OneToOne('ExecutionEntity', 'executionData', {
|
||||
onDelete: 'CASCADE',
|
||||
})
|
||||
@JoinColumn({
|
||||
name: 'executionId',
|
||||
})
|
||||
execution: ExecutionEntity;
|
||||
}
|
||||
|
||||
@@ -139,6 +139,26 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||
super(ExecutionEntity, dataSource.manager);
|
||||
}
|
||||
|
||||
// Find all executions that are in the 'new' state but do not have associated execution data.
|
||||
// These executions are considered invalid and will be marked as 'crashed'.
|
||||
// Since there is no join in this query the returned ids are unique.
|
||||
async findQueuedExecutionsWithoutData(): Promise<ExecutionEntity[]> {
|
||||
return await this.createQueryBuilder('execution')
|
||||
.where('execution.status = :status', { status: 'new' })
|
||||
.andWhere(
|
||||
'NOT EXISTS (' +
|
||||
this.manager
|
||||
.createQueryBuilder()
|
||||
.select('1')
|
||||
.from(ExecutionData, 'execution_data')
|
||||
.where('execution_data.executionId = execution.id')
|
||||
.getQuery() +
|
||||
')',
|
||||
)
|
||||
.select('execution.id')
|
||||
.getMany();
|
||||
}
|
||||
|
||||
async findMultipleExecutions(
|
||||
queryParams: FindManyOptions<ExecutionEntity>,
|
||||
options?: {
|
||||
@@ -219,7 +239,10 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||
|
||||
this.errorReporter.error(
|
||||
new UnexpectedError('Found executions without executionData', {
|
||||
extra: { executionIds: executions.map(({ id }) => id) },
|
||||
extra: {
|
||||
executionIds: executions.map(({ id }) => id),
|
||||
isLegacySqlite: this.globalConfig.database.isLegacySqlite,
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -315,6 +315,15 @@ export class Start extends BaseCommand<z.infer<typeof flagsSchema>> {
|
||||
);
|
||||
}
|
||||
|
||||
if (this.globalConfig.database.isLegacySqlite) {
|
||||
// Employ lazy loading to avoid unnecessary imports in the CLI
|
||||
// and to ensure that the legacy recovery service is only used when needed.
|
||||
const { LegacySqliteExecutionRecoveryService } = await import(
|
||||
'@/executions/legacy-sqlite-execution-recovery.service'
|
||||
);
|
||||
await Container.get(LegacySqliteExecutionRecoveryService).cleanupWorkflowExecutions();
|
||||
}
|
||||
|
||||
await this.server.start();
|
||||
|
||||
Container.get(ExecutionsPruningService).init();
|
||||
|
||||
@@ -47,6 +47,12 @@ export class DeprecationService {
|
||||
message: 'MySQL and MariaDB are deprecated. Please migrate to PostgreSQL.',
|
||||
checkValue: (value: string) => ['mysqldb', 'mariadb'].includes(value),
|
||||
},
|
||||
{
|
||||
envVar: 'DB_SQLITE_POOL_SIZE',
|
||||
message:
|
||||
'Running SQLite without a pool of read connections is deprecated. Please set `DB_SQLITE_POOL_SIZE` to a value higher than zero. See: https://docs.n8n.io/hosting/configuration/environment-variables/database/#sqlite',
|
||||
checkValue: (_: string) => this.globalConfig.database.isLegacySqlite,
|
||||
},
|
||||
{
|
||||
envVar: 'N8N_SKIP_WEBHOOK_DEREGISTRATION_SHUTDOWN',
|
||||
message: `n8n no longer deregisters webhooks at startup and shutdown. ${SAFE_TO_REMOVE}`,
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { DbConnection, ExecutionRepository } from '@n8n/db';
|
||||
import { Service } from '@n8n/di';
|
||||
import assert from 'assert';
|
||||
|
||||
/**
|
||||
* Service for recovering executions that are missing execution data, this should only happen
|
||||
* for sqlite legacy databases.
|
||||
*/
|
||||
@Service()
|
||||
export class LegacySqliteExecutionRecoveryService {
|
||||
private readonly logger: Logger;
|
||||
constructor(
|
||||
logger: Logger,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly dbConnection: DbConnection,
|
||||
) {
|
||||
this.logger = logger.scoped('legacy-sqlite-execution-recovery');
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove workflow executions that are in the `new` state but have no associated execution data.
|
||||
* This is a legacy recovery operation for SQLite databases where executions might be left
|
||||
* in an inconsistent state due to missing execution data.
|
||||
* It marks these executions as `crashed` to prevent them from being processed further.
|
||||
* This method should only be called when we are in legacy SQLite mode.
|
||||
*/
|
||||
async cleanupWorkflowExecutions() {
|
||||
assert(this.globalConfig.database.isLegacySqlite, 'Only usable when on legacy SQLite driver');
|
||||
assert(
|
||||
this.dbConnection.connectionState.connected && this.dbConnection.connectionState.migrated,
|
||||
'The database connection must be connected and migrated before running cleanupWorkflowExecutions',
|
||||
);
|
||||
|
||||
this.logger.debug('Starting legacy SQLite execution recovery...');
|
||||
|
||||
const invalidExecutions = await this.executionRepository.findQueuedExecutionsWithoutData();
|
||||
|
||||
if (invalidExecutions.length > 0) {
|
||||
await this.executionRepository.markAsCrashed(invalidExecutions.map((e) => e.id));
|
||||
this.logger.debug(
|
||||
`Marked ${invalidExecutions.length} executions as crashed due to missing execution data.`,
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.debug('Legacy SQLite execution recovery completed.');
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
import { LegacySqliteExecutionRecoveryService } from '@/executions/legacy-sqlite-execution-recovery.service';
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { testDb } from '@n8n/backend-test-utils';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { DbConnection, ExecutionRepository, WorkflowRepository } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
|
||||
const globalConfig = Container.get(GlobalConfig);
|
||||
|
||||
if (globalConfig.database.isLegacySqlite) {
|
||||
beforeAll(async () => {
|
||||
await testDb.init();
|
||||
});
|
||||
|
||||
describe('Legacy SQLite Execution Recovery Service', () => {
|
||||
let legacySqliteExecutionRecoveryService: LegacySqliteExecutionRecoveryService;
|
||||
let executionRepository: ExecutionRepository;
|
||||
let dbConnection: DbConnection;
|
||||
|
||||
beforeEach(async () => {
|
||||
await testDb.truncate(['WorkflowEntity', 'ExecutionEntity', 'ExecutionData']);
|
||||
executionRepository = Container.get(ExecutionRepository);
|
||||
dbConnection = Container.get(DbConnection);
|
||||
legacySqliteExecutionRecoveryService = new LegacySqliteExecutionRecoveryService(
|
||||
Container.get(Logger),
|
||||
executionRepository,
|
||||
globalConfig,
|
||||
dbConnection,
|
||||
);
|
||||
|
||||
const workflowRepository = Container.get(WorkflowRepository);
|
||||
|
||||
const workflow = workflowRepository.create({
|
||||
id: 'test-workflow-id',
|
||||
name: 'Test Workflow',
|
||||
active: true,
|
||||
nodes: [],
|
||||
connections: {},
|
||||
settings: {},
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
await workflowRepository.save(workflow);
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await testDb.terminate();
|
||||
});
|
||||
|
||||
it('should recover executions without data', async () => {
|
||||
// Arrange
|
||||
let execution = executionRepository.create({
|
||||
status: 'new',
|
||||
mode: 'manual',
|
||||
workflowId: 'test-workflow-id',
|
||||
createdAt: new Date(),
|
||||
finished: false,
|
||||
});
|
||||
|
||||
execution = await executionRepository.save(execution);
|
||||
|
||||
// Act
|
||||
await legacySqliteExecutionRecoveryService.cleanupWorkflowExecutions();
|
||||
|
||||
const executionMarkedAsCrashed = await executionRepository.findOneBy({ id: execution.id });
|
||||
// Assert
|
||||
expect(executionMarkedAsCrashed?.id).toBe(execution.id);
|
||||
expect(executionMarkedAsCrashed?.status).toBe('crashed');
|
||||
});
|
||||
});
|
||||
} else {
|
||||
describe('Legacy SQLite Execution Recovery Service', () => {
|
||||
it('should not run on non-legacy SQLite databases', () => {
|
||||
// We need an empty test here to ensure that the test suite is not empty
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user