fix(core): Prevent worker from recovering finished executions (#16094)

This commit is contained in:
Iván Ovejero
2025-06-10 11:12:17 +02:00
committed by GitHub
parent 9d12b741ff
commit 53b6812592
3 changed files with 48 additions and 4 deletions

View File

@@ -158,8 +158,6 @@ export class MessageEventBus extends EventEmitter {
}
if (unfinishedExecutionIds.length > 0) {
this.logger.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
this.logger.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await this.workflowRepository.find({
where: { active: true },
select: ['id', 'name'],
@@ -181,11 +179,25 @@ export class MessageEventBus extends EventEmitter {
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
const recoveredIds: string[] = [];
for (const executionId of unfinishedExecutionIds) {
const logMesssages = unsentAndUnfinished.unfinishedExecutions[executionId];
await this.recoveryService.recoverFromLogs(executionId, logMesssages ?? []);
const recoveredExecution = await this.recoveryService.recoverFromLogs(
executionId,
logMesssages ?? [],
);
if (recoveredExecution) recoveredIds.push(executionId);
}
if (recoveredIds.length > 0) {
this.logger.warn(`Found unfinished executions: ${recoveredIds.join(', ')}`);
this.logger.info(
'This could be due to a crash of an active workflow or a restart of n8n',
);
}
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}

View File

@@ -152,6 +152,31 @@ describe('ExecutionRecoveryService', () => {
expect(amendedExecution).toBeNull();
});
test('for errored dataful execution, should return `null`', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow();
const execution = await createExecution(
{ status: 'error', data: stringify({ runData: { foo: 'bar' } }) },
workflow,
);
const messages = setupMessages(execution.id, 'Some workflow');
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
expect(amendedExecution).toBeNull();
});
test('should return `null` if no execution found', async () => {
/**
* Arrange

View File

@@ -72,7 +72,14 @@ export class ExecutionRecoveryService {
unflattenData: true,
});
if (!execution || (execution.status === 'success' && execution.data)) return null;
/**
* The event bus is unable to correctly identify unfinished executions in workers,
* because execution lifecycle hooks cause worker event logs to be partitioned.
* Hence we need to filter out finished executions here.
* */
if (!execution || (['success', 'error'].includes(execution.status) && execution.data)) {
return null;
}
const runExecutionData = execution.data ?? { resultData: { runData: {} } };