diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index 73300ad492..e061fd45c5 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -158,8 +158,6 @@ export class MessageEventBus extends EventEmitter { } if (unfinishedExecutionIds.length > 0) { - this.logger.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); - this.logger.info('This could be due to a crash of an active workflow or a restart of n8n.'); const activeWorkflows = await this.workflowRepository.find({ where: { active: true }, select: ['id', 'name'], @@ -181,11 +179,25 @@ export class MessageEventBus extends EventEmitter { } else { // start actual recovery process and write recovery process flag file this.logWriter?.startRecoveryProcess(); + const recoveredIds: string[] = []; + for (const executionId of unfinishedExecutionIds) { const logMesssages = unsentAndUnfinished.unfinishedExecutions[executionId]; - await this.recoveryService.recoverFromLogs(executionId, logMesssages ?? []); + const recoveredExecution = await this.recoveryService.recoverFromLogs( + executionId, + logMesssages ?? [], + ); + if (recoveredExecution) recoveredIds.push(executionId); + } + + if (recoveredIds.length > 0) { + this.logger.warn(`Found unfinished executions: ${recoveredIds.join(', ')}`); + this.logger.info( + 'This could be due to a crash of an active workflow or a restart of n8n', + ); } } + // remove the recovery process flag file this.logWriter?.endRecoveryProcess(); } diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index d17b411451..dfa026e6d5 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -152,6 +152,31 @@ describe('ExecutionRecoveryService', () => { expect(amendedExecution).toBeNull(); }); + test('for errored dataful execution, should return `null`', async () => { + /** + * Arrange + */ + const workflow = await createWorkflow(); + const execution = await createExecution( + { status: 'error', data: stringify({ runData: { foo: 'bar' } }) }, + workflow, + ); + const messages = setupMessages(execution.id, 'Some workflow'); + + /** + * Act + */ + const amendedExecution = await executionRecoveryService.recoverFromLogs( + execution.id, + messages, + ); + + /** + * Assert + */ + expect(amendedExecution).toBeNull(); + }); + test('should return `null` if no execution found', async () => { /** * Arrange diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 1064fafce9..428dac46f5 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -72,7 +72,14 @@ export class ExecutionRecoveryService { unflattenData: true, }); - if (!execution || (execution.status === 'success' && execution.data)) return null; + /** + * The event bus is unable to correctly identify unfinished executions in workers, + * because execution lifecycle hooks cause worker event logs to be partitioned. + * Hence we need to filter out finished executions here. + * */ + if (!execution || (['success', 'error'].includes(execution.status) && execution.data)) { + return null; + } const runExecutionData = execution.data ?? { resultData: { runData: {} } };