diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts index 9c2b1cb46d..a348cc7df9 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute-process-process-run-execution-data.test.ts @@ -1,5 +1,6 @@ import { mock } from 'jest-mock-extended'; import type { + IDataObject, IRunExecutionData, IWorkflowExecuteAdditionalData, WorkflowExecuteMode, @@ -7,7 +8,7 @@ import type { import { ApplicationError } from 'n8n-workflow'; import { DirectedGraph } from '../partial-execution-utils'; -import { createNodeData } from '../partial-execution-utils/__tests__/helpers'; +import { createNodeData, toITaskData } from '../partial-execution-utils/__tests__/helpers'; import { WorkflowExecute } from '../workflow-execute'; import { types, nodeTypes } from './mock-node-types'; @@ -142,4 +143,44 @@ describe('processRunExecutionData', () => { expect(runHook).toHaveBeenNthCalledWith(5, 'nodeExecuteAfter', expect.any(Array)); expect(runHook).toHaveBeenNthCalledWith(6, 'workflowExecuteAfter', expect.any(Array)); }); + + describe('runExecutionData.waitTill', () => { + test('handles waiting state properly when waitTill is set', async () => { + // ARRANGE + const node = createNodeData({ name: 'waitingNode', type: types.passThrough }); + const workflow = new DirectedGraph() + .addNodes(node) + .toWorkflow({ name: '', active: false, nodeTypes, settings: { executionOrder: 'v1' } }); + + const data: IDataObject = { foo: 1 }; + const executionData: IRunExecutionData = { + startData: { startNodes: [{ name: node.name, sourceData: null }] }, + resultData: { + runData: { waitingNode: [toITaskData([{ data }], { executionStatus: 'waiting' })] }, + lastNodeExecuted: 'waitingNode', + }, + executionData: { + contextData: {}, + nodeExecutionStack: [{ data: { main: [[{ json: data }]] }, node, source: null }], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + waitTill: new Date('2024-01-01'), + }; + + const workflowExecute = new WorkflowExecute(additionalData, executionMode, executionData); + + // ACT + const result = await workflowExecute.processRunExecutionData(workflow); + + // ASSERT + expect(result.waitTill).toBeUndefined(); + // The waiting state handler should have removed the last entry from + // runData, but execution adds a new one, so we should have 1 entry. + expect(result.data.resultData.runData.waitingNode).toHaveLength(1); + // the status was `waiting` before + expect(result.data.resultData.runData.waitingNode[0].executionStatus).toEqual('success'); + }); + }); }); diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index de4682b68c..da1773f652 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -1476,6 +1476,43 @@ export class WorkflowExecute { ); } + private assertExecutionDataExists( + this: WorkflowExecute, + executionData: IRunExecutionData['executionData'], + workflow: Workflow, + ): asserts executionData is NonNullable { + if (!executionData) { + throw new UnexpectedError('Failed to run workflow due to missing execution data', { + extra: { + workflowId: workflow.id, + executionId: this.additionalData.executionId, + mode: this.mode, + }, + }); + } + } + + /** + * Handles executions that have been waiting by + * 1. unsetting the `waitTill` + * 2. disabling the currently executing node (which should be the node that + * put the execution into waiting) making sure it won't be executed again + * 3. Removing the last run for the last executed node (which also should be + * the node that put the execution into waiting) to make sure the node + * does not show up as having run twice + */ + private handleWaitingState(workflow: Workflow) { + if (this.runExecutionData.waitTill) { + this.runExecutionData.waitTill = undefined; + + this.assertExecutionDataExists(this.runExecutionData.executionData, workflow); + this.runExecutionData.executionData.nodeExecutionStack[0].node.disabled = true; + + const lastNodeExecuted = this.runExecutionData.resultData.lastNodeExecuted as string; + this.runExecutionData.resultData.runData[lastNodeExecuted].pop(); + } + } + /** * Runs the given execution data. * @@ -1533,12 +1570,7 @@ export class WorkflowExecute { this.runExecutionData.startData = {}; } - if (this.runExecutionData.waitTill) { - const lastNodeExecuted = this.runExecutionData.resultData.lastNodeExecuted as string; - this.runExecutionData.executionData.nodeExecutionStack[0].node.disabled = true; - this.runExecutionData.waitTill = undefined; - this.runExecutionData.resultData.runData[lastNodeExecuted].pop(); - } + this.handleWaitingState(workflow); let currentExecutionTry = ''; let lastExecutionTry = '';