diff --git a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts index 8dee8dff1b..f2a99fdb92 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts @@ -442,4 +442,126 @@ describe('findStartNodes', () => { expect(startNodes.size).toBe(1); expect(startNodes).toContainEqual(node2); }); + + describe('custom loop logic', () => { + test('if the last run of loop node has no data (null) on the done output, then the loop is the start node', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 1, to: inLoop }, + { from: inLoop, to: loop }, + { from: loop, to: afterLoop }, + ); + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { name: 'trigger' } }])], + [loop.name]: [ + // only output on the `loop` branch, but no output on the `done` + // branch + toITaskData([{ outputIndex: 1, data: { name: 'loop' } }]), + ], + [inLoop.name]: [toITaskData([{ data: { name: 'inLoop' } }])], + }; + + // ACT + const startNodes = findStartNodes({ + graph, + trigger, + destination: afterLoop, + runData, + pinData: {}, + }); + + // ASSERT + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(loop); + }); + + test('if the last run of loop node has no data (empty array) on the done output, then the loop is the start node', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 1, to: inLoop }, + { from: inLoop, to: loop }, + { from: loop, to: afterLoop }, + ); + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { name: 'trigger' } }])], + [loop.name]: [ + // This is handcrafted because `toITaskData` does not allow inserting + // an empty array like the first element of `main` below. But the + // execution engine creates ITaskData like this. + { + executionStatus: 'success', + executionTime: 0, + startTime: 0, + source: [], + data: { main: [[], [{ json: { name: 'loop' } }]] }, + }, + ], + [inLoop.name]: [toITaskData([{ data: { name: 'inLoop' } }])], + }; + + // ACT + const startNodes = findStartNodes({ + graph, + trigger, + destination: afterLoop, + runData, + pinData: {}, + }); + + // ASSERT + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(loop); + }); + + test('if the loop has data on the done output in the last run it does not become a start node', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 1, to: inLoop }, + { from: inLoop, to: loop }, + { from: loop, to: afterLoop }, + ); + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { name: 'trigger' } }])], + [loop.name]: [ + toITaskData([{ outputIndex: 1, data: { name: 'loop' } }]), + toITaskData([{ outputIndex: 0, data: { name: 'done' } }]), + ], + [inLoop.name]: [toITaskData([{ data: { name: 'inLoop' } }])], + }; + + // ACT + const startNodes = findStartNodes({ + graph, + trigger, + destination: afterLoop, + runData, + pinData: {}, + }); + + // ASSERT + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(afterLoop); + }); + }); }); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts b/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts index 6a6c8a88db..74976bba3e 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts @@ -5,7 +5,7 @@ interface StubNode { name: string; parameters?: INodeParameters; disabled?: boolean; - type?: string; + type?: 'n8n-nodes-base.manualTrigger' | 'n8n-nodes-base.splitInBatches' | (string & {}); } export function createNodeData(stubData: StubNode): INode { diff --git a/packages/core/src/PartialExecutionUtils/findStartNodes.ts b/packages/core/src/PartialExecutionUtils/findStartNodes.ts index b3f4f95399..1c1c0b9fc7 100644 --- a/packages/core/src/PartialExecutionUtils/findStartNodes.ts +++ b/packages/core/src/PartialExecutionUtils/findStartNodes.ts @@ -1,7 +1,7 @@ -import type { INode, IPinData, IRunData } from 'n8n-workflow'; +import { NodeConnectionType, type INode, type IPinData, type IRunData } from 'n8n-workflow'; import type { DirectedGraph } from './DirectedGraph'; -import { getIncomingData } from './getIncomingData'; +import { getIncomingData, getIncomingDataFromAnyRun } from './getIncomingData'; /** * A node is dirty if either of the following is true: @@ -73,6 +73,25 @@ function findStartNodesRecursive( return startNodes; } + // If the current node is a loop node, check if the `done` output has data on + // the last run. If it doesn't the loop wasn't fully executed and needs to be + // re-run from the start. Thus the loop node become the start node. + if (current.type === 'n8n-nodes-base.splitInBatches') { + const nodeRunData = getIncomingData( + runData, + current.name, + // last run + -1, + NodeConnectionType.Main, + 0, + ); + + if (nodeRunData === null || nodeRunData.length === 0) { + startNodes.add(current); + return startNodes; + } + } + // If we detect a cycle stop following the branch, there is no start node on // this branch. if (seen.has(current)) { @@ -82,19 +101,16 @@ function findStartNodesRecursive( // Recurse with every direct child that is part of the sub graph. const outGoingConnections = graph.getDirectChildConnections(current); for (const outGoingConnection of outGoingConnections) { - const nodeRunData = getIncomingData( + const nodeRunData = getIncomingDataFromAnyRun( runData, outGoingConnection.from.name, - // NOTE: It's always 0 until I fix the bug that removes the run data for - // old runs. The FE only sends data for one run for each node. - 0, outGoingConnection.type, outGoingConnection.outputIndex, ); // If the node has multiple outputs, only follow the outputs that have run data. const hasNoRunData = - nodeRunData === null || nodeRunData === undefined || nodeRunData.length === 0; + nodeRunData === null || nodeRunData === undefined || nodeRunData.data.length === 0; if (hasNoRunData) { continue; } diff --git a/packages/core/src/PartialExecutionUtils/getIncomingData.ts b/packages/core/src/PartialExecutionUtils/getIncomingData.ts index acac8ad22d..a6a66ee25b 100644 --- a/packages/core/src/PartialExecutionUtils/getIncomingData.ts +++ b/packages/core/src/PartialExecutionUtils/getIncomingData.ts @@ -1,4 +1,3 @@ -import * as a from 'assert'; import type { INodeExecutionData, IRunData, NodeConnectionType } from 'n8n-workflow'; export function getIncomingData( @@ -7,18 +6,8 @@ export function getIncomingData( runIndex: number, connectionType: NodeConnectionType, outputIndex: number, -): INodeExecutionData[] | null | undefined { - a.ok(runData[nodeName], `Can't find node with name '${nodeName}' in runData.`); - a.ok( - runData[nodeName][runIndex], - `Can't find a run for index '${runIndex}' for node name '${nodeName}'`, - ); - a.ok( - runData[nodeName][runIndex].data, - `Can't find data for index '${runIndex}' for node name '${nodeName}'`, - ); - - return runData[nodeName][runIndex].data[connectionType][outputIndex]; +): INodeExecutionData[] | null { + return runData[nodeName]?.at(runIndex)?.data?.[connectionType].at(outputIndex) ?? null; } function getRunIndexLength(runData: IRunData, nodeName: string) { diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 515f58a657..be4e1d70f3 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -367,7 +367,7 @@ export class WorkflowExecute { startNodes = handleCycles(graph, startNodes, trigger); // 6. Clean Run Data - const newRunData: IRunData = cleanRunData(runData, graph, startNodes); + runData = cleanRunData(runData, graph, startNodes); // 7. Recreate Execution Stack const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = @@ -381,7 +381,7 @@ export class WorkflowExecute { runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), }, resultData: { - runData: newRunData, + runData, pinData, }, executionData: { diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index 71be9a4891..cafc26269d 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -9,6 +9,7 @@ // XX denotes that the node is disabled // PD denotes that the node has pinned data +import { pick } from 'lodash'; import type { IPinData, IRun, IRunData, WorkflowTestData } from 'n8n-workflow'; import { ApplicationError, @@ -18,6 +19,7 @@ import { } from 'n8n-workflow'; import { DirectedGraph } from '@/PartialExecutionUtils'; +import * as partialExecutionUtils from '@/PartialExecutionUtils'; import { createNodeData, toITaskData } from '@/PartialExecutionUtils/__tests__/helpers'; import { WorkflowExecute } from '@/WorkflowExecute'; @@ -324,5 +326,72 @@ describe('WorkflowExecute', () => { expect(nodes).toContain(node2.name); expect(nodes).not.toContain(node1.name); }); + + // ►► + // ┌────┐0 ┌─────────┐ + //┌───────┐1 │ ├──────►afterLoop│ + //│trigger├───┬──►loop│1 └─────────┘ + //└───────┘ │ │ ├─┐ + // │ └────┘ │ + // │ │ ┌──────┐1 + // │ └─►inLoop├─┐ + // │ └──────┘ │ + // └────────────────────┘ + test('passes filtered run data to `recreateNodeExecutionStack`', async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const nodeExecutionOrder: string[] = []; + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder); + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const workflow = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, to: afterLoop }, + { from: loop, to: inLoop, outputIndex: 1 }, + { from: inLoop, to: loop }, + ) + .toWorkflow({ name: '', active: false, nodeTypes }); + + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [loop.name]: [toITaskData([{ data: { nodeName: loop.name }, outputIndex: 1 }])], + [inLoop.name]: [toITaskData([{ data: { nodeName: inLoop.name } }])], + }; + const dirtyNodeNames: string[] = []; + + jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn()); + const recreateNodeExecutionStackSpy = jest.spyOn( + partialExecutionUtils, + 'recreateNodeExecutionStack', + ); + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + afterLoop.name, + ); + + // ASSERT + expect(recreateNodeExecutionStackSpy).toHaveBeenNthCalledWith( + 1, + expect.any(DirectedGraph), + expect.any(Set), + // The run data should only contain the trigger node because the loop + // node has no data on the done branch. That means we have to rerun the + // whole loop, because we don't know how many iterations would be left. + pick(runData, trigger.name), + expect.any(Object), + ); + }); }); }); diff --git a/packages/core/test/helpers/constants.ts b/packages/core/test/helpers/constants.ts index 3043f455e9..ac8c366866 100644 --- a/packages/core/test/helpers/constants.ts +++ b/packages/core/test/helpers/constants.ts @@ -11,6 +11,7 @@ import { ManualTrigger } from '../../../nodes-base/dist/nodes/ManualTrigger/Manu import { Merge } from '../../../nodes-base/dist/nodes/Merge/Merge.node'; import { NoOp } from '../../../nodes-base/dist/nodes/NoOp/NoOp.node'; import { Set } from '../../../nodes-base/dist/nodes/Set/Set.node'; +import { SplitInBatches } from '../../../nodes-base/dist/nodes/SplitInBatches/SplitInBatches.node'; import { Start } from '../../../nodes-base/dist/nodes/Start/Start.node'; export const predefinedNodesTypes: INodeTypeData = { @@ -38,6 +39,10 @@ export const predefinedNodesTypes: INodeTypeData = { type: new ManualTrigger(), sourcePath: '', }, + 'n8n-nodes-base.splitInBatches': { + type: new SplitInBatches(), + sourcePath: '', + }, 'n8n-nodes-base.versionTest': { sourcePath: '', type: {