From f5fb33a3fac16b17b19552df07f0a8be54834f64 Mon Sep 17 00:00:00 2001 From: Andreas Fitzek Date: Wed, 9 Jul 2025 11:01:57 +0200 Subject: [PATCH] fix(Execute Sub-workflow Node): Improve paired item handling for child workflows (#17065) --- .../src/workflow-execute-additional-data.ts | 9 + .../base-execute-context.ts | 4 + .../ExecuteWorkflow/ExecuteWorkflow.node.ts | 19 +- .../utils/workflow-backtracking.test.ts | 532 ++++++++++++++++++ .../nodes-base/utils/workflow-backtracking.ts | 114 ++++ packages/workflow/src/interfaces.ts | 2 + 6 files changed, 679 insertions(+), 1 deletion(-) create mode 100644 packages/nodes-base/utils/workflow-backtracking.test.ts create mode 100644 packages/nodes-base/utils/workflow-backtracking.ts diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 171feb2821..b1db74005c 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -392,6 +392,15 @@ export async function getBase( userId, setExecutionStatus, variables, + async getRunExecutionData(executionId) { + const executionRepository = Container.get(ExecutionRepository); + const executionData = await executionRepository.findSingleExecution(executionId, { + unflattenData: true, + includeData: true, + }); + + return executionData?.data; + }, externalSecretsProxy: Container.get(ExternalSecretsProxy), async startRunnerTask( additionalData: IWorkflowExecuteAdditionalData, diff --git a/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts b/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts index c3d22faed3..56188152f7 100644 --- a/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/base-execute-context.ts @@ -142,6 +142,10 @@ export class BaseExecuteContext extends NodeExecutionContext { return { ...result, data }; } + async getExecutionDataById(executionId: string): Promise { + return await this.additionalData.getRunExecutionData(executionId); + } + protected getInputItems(inputIndex: number, connectionType: NodeConnectionType) { const inputData = this.inputData[connectionType]; if (inputData.length < inputIndex) { diff --git a/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflow/ExecuteWorkflow.node.ts b/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflow/ExecuteWorkflow.node.ts index c24e51284f..e73cc5c2c8 100644 --- a/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflow/ExecuteWorkflow.node.ts +++ b/packages/nodes-base/nodes/ExecuteWorkflow/ExecuteWorkflow/ExecuteWorkflow.node.ts @@ -7,10 +7,12 @@ import type { INodeTypeDescription, } from 'n8n-workflow'; +import { findPairedItemTroughWorkflowData } from './../../../utils/workflow-backtracking'; import { getWorkflowInfo } from './GenericFunctions'; import { localResourceMapping } from './methods'; import { generatePairedItemData } from '../../../utils/utilities'; import { getCurrentWorkflowInputData } from '../../../utils/workflowInputsResourceMapping/GenericFunctions'; + export class ExecuteWorkflow implements INodeType { description: INodeTypeDescription = { displayName: 'Execute Sub-workflow', @@ -425,6 +427,8 @@ export class ExecuteWorkflow implements INodeType { return [items]; } + const workflowRunData = await this.getExecutionDataById(executionResult.executionId); + const workflowResult = executionResult.data as INodeExecutionData[][]; const fallbackPairedItemData = generatePairedItemData(items.length); @@ -433,7 +437,20 @@ export class ExecuteWorkflow implements INodeType { const sameLength = output.length === items.length; for (const [itemIndex, item] of output.entries()) { - if (item.pairedItem) continue; + if (item.pairedItem) { + // If the item already has a paired item, we need to follow these to the start of the child workflow + if (workflowRunData !== undefined) { + const pairedItem = findPairedItemTroughWorkflowData( + workflowRunData, + item, + itemIndex, + ); + if (pairedItem !== undefined) { + item.pairedItem = pairedItem; + } + } + continue; + } if (sameLength) { item.pairedItem = { item: itemIndex }; diff --git a/packages/nodes-base/utils/workflow-backtracking.test.ts b/packages/nodes-base/utils/workflow-backtracking.test.ts new file mode 100644 index 0000000000..8045e2a551 --- /dev/null +++ b/packages/nodes-base/utils/workflow-backtracking.test.ts @@ -0,0 +1,532 @@ +import type { + INodeExecutionData, + IPairedItemData, + IRunExecutionData, + ISourceData, + ITaskData, +} from 'n8n-workflow'; + +import { previousTaskData, findPairedItemTroughWorkflowData } from './workflow-backtracking'; + +describe('backtracking.ts', () => { + describe('previousTaskData', () => { + it('should return undefined when source is empty', () => { + const runData = {}; + const currentRunData: ITaskData = { + source: [], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when source is undefined', () => { + const runData = {}; + const currentRunData: ITaskData = { + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + } as unknown as ITaskData; // Type assertion to match the expected type + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when previousNode is undefined', () => { + const runData = {}; + const currentRunData: ITaskData = { + source: [{} as unknown as ISourceData], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when run data for previousNode does not exist', () => { + const runData = {}; + const currentRunData: ITaskData = { + source: [{ previousNode: 'node1' }], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when run data for previousNode is empty', () => { + const runData = { + node1: [], + }; + const currentRunData: ITaskData = { + source: [{ previousNode: 'node1' }], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBeUndefined(); + }); + + it('should return the correct task data from previousNode', () => { + const expectedTaskData: ITaskData = { + data: { main: [[{ json: { test: 'value' } }]] }, + executionTime: 100, + executionStatus: 'success', + executionIndex: 0, + startTime: 1000, + } as unknown as ITaskData; + + const runData = { + node1: [expectedTaskData], + }; + const currentRunData: ITaskData = { + source: [{ previousNode: 'node1' }], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBe(expectedTaskData); + }); + + it('should return correct task data using previousNodeRun index', () => { + const taskData1: ITaskData = { + data: { main: [[{ json: { run: 1 } }]] }, + executionTime: 100, + executionStatus: 'success', + executionIndex: 0, + startTime: 1000, + } as unknown as ITaskData; + + const taskData2: ITaskData = { + data: { main: [[{ json: { run: 2 } }]] }, + executionTime: 200, + executionStatus: 'success', + executionIndex: 0, + startTime: 2000, + } as unknown as ITaskData; + + const runData = { + node1: [taskData1, taskData2], + }; + const currentRunData: ITaskData = { + source: [{ previousNode: 'node1', previousNodeRun: 1 }], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBe(taskData2); + }); + + it('should default to index 0 when previousNodeRun is undefined', () => { + const taskData1: ITaskData = { + data: { main: [[{ json: { run: 1 } }]] }, + executionTime: 100, + executionStatus: 'success', + executionIndex: 0, + startTime: 1000, + } as unknown as ITaskData; + + const taskData2: ITaskData = { + data: { main: [[{ json: { run: 2 } }]] }, + executionTime: 200, + executionStatus: 'success', + executionIndex: 0, + startTime: 2000, + } as unknown as ITaskData; + + const runData = { + node1: [taskData1, taskData2], + }; + const currentRunData: ITaskData = { + source: [{ previousNode: 'node1' }], + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + executionIndex: 0, + startTime: 0, + }; + + const result = previousTaskData(runData, currentRunData); + + expect(result).toBe(taskData1); + }); + }); + + describe('findPairedItemTroughWorkflowData', () => { + it('should return undefined when lastNodeExecuted is undefined', () => { + const workflowRunData: IRunExecutionData = { + resultData: { + runData: {}, + lastNodeExecuted: undefined, + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when no run data exists for lastNodeExecuted', () => { + const workflowRunData: IRunExecutionData = { + resultData: { + runData: {}, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when run data is empty', () => { + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [], + }, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBeUndefined(); + }); + + it('should return undefined when task data is undefined', () => { + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [undefined as any], + }, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBeUndefined(); + }); + + it('should return paired item when no previous task data exists', () => { + const expectedPairedItem: IPairedItemData = { item: 0 }; + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + data: { main: [[]] }, + executionTime: 0, + executionStatus: 'success', + startTime: 0, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: expectedPairedItem, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBe(expectedPairedItem); + }); + + it('should backtrack through workflow data with simple paired item', () => { + const finalPairedItem: IPairedItemData = { item: 5 }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + source: [{ previousNode: 'node2' }], + data: { main: [[item]] }, + executionTime: 100, + executionStatus: 'success', + executionIndex: 0, + startTime: 1000, + }, + ], + node2: [ + { + data: { main: [[{ json: { value: 2 }, pairedItem: finalPairedItem }]] }, + executionTime: 200, + executionStatus: 'success', + executionIndex: 0, + startTime: 2000, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBe(finalPairedItem); + }); + + it('should backtrack through workflow data with object paired item', () => { + const finalPairedItem: IPairedItemData = { item: 3, input: 1 }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0, input: 1 }, + }; + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + source: [{ previousNode: 'node2' }], + data: { main: [[item]] }, + executionTime: 100, + executionStatus: 'success', + startTime: 1000, + } as unknown as ITaskData, + ], + node2: [ + { + data: { main: [[], [{ json: { value: 2 }, pairedItem: finalPairedItem }]] }, + executionTime: 200, + executionStatus: 'success', + startTime: 2000, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBe(finalPairedItem); + }); + + it('should use itemIndex parameter when paired item is numeric', () => { + const finalPairedItem: IPairedItemData = { item: 7 }; + + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: 2, // Numeric paired item + }; + + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + source: [{ previousNode: 'node2' }], + data: { main: [[item]] }, + executionTime: 100, + executionStatus: 'success', + startTime: 1000, + } as unknown as ITaskData, + ], + node2: [ + { + data: { + main: [ + [ + { json: {} }, + { json: {} }, + { json: { value: 2 }, pairedItem: finalPairedItem }, + ], + ], + }, + executionTime: 200, + executionStatus: 'success', + startTime: 2000, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 5); + + expect(result).toBe(finalPairedItem); + }); + + it('should handle multiple levels of backtracking', () => { + const finalPairedItem: IPairedItemData = { item: 10 }; + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + source: [{ previousNode: 'node2' }], + data: { main: [[{ json: { value: 1 }, pairedItem: { item: 0 } }]] }, + executionTime: 100, + executionStatus: 'success', + startTime: 1000, + } as unknown as ITaskData, + ], + node2: [ + { + source: [{ previousNode: 'node3' }], + data: { main: [[{ json: { value: 2 }, pairedItem: { item: 1 } }]] }, + executionTime: 200, + executionStatus: 'success', + startTime: 2000, + } as unknown as ITaskData, + ], + node3: [ + { + data: { main: [[null, { json: { value: 3 }, pairedItem: finalPairedItem }]] }, + executionTime: 300, + executionStatus: 'success', + startTime: 3000, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBe(finalPairedItem); + }); + + it('should use last run data when multiple runs exist', () => { + const finalPairedItem: IPairedItemData = { item: 15 }; + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + source: [{ previousNode: 'node2' }], + data: { main: [[{ json: { value: 1 }, pairedItem: { item: 0 } }]] }, + executionTime: 100, + executionStatus: 'success', + startTime: 1000, + } as unknown as ITaskData, + { + source: [{ previousNode: 'node2' }], + data: { main: [[{ json: { value: 2 }, pairedItem: { item: 0 } }]] }, + executionTime: 150, + executionStatus: 'success', + startTime: 1500, + } as unknown as ITaskData, + ], + node2: [ + { + data: { main: [[{ json: { value: 3 }, pairedItem: finalPairedItem }]] }, + executionTime: 200, + executionStatus: 'success', + startTime: 2000, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBe(finalPairedItem); + }); + + it('should handle missing nodeInformationArray gracefully', () => { + const workflowRunData: IRunExecutionData = { + resultData: { + runData: { + node1: [ + { + source: [{ previousNode: 'node2' }], + data: {}, + executionTime: 100, + executionStatus: 'success', + startTime: 1000, + } as unknown as ITaskData, + ], + node2: [ + { + data: { main: [[]] }, + executionTime: 200, + executionStatus: 'success', + startTime: 2000, + } as unknown as ITaskData, + ], + }, + lastNodeExecuted: 'node1', + }, + }; + const item: INodeExecutionData = { + json: { test: 'value' }, + pairedItem: { item: 0 }, + }; + + const result = findPairedItemTroughWorkflowData(workflowRunData, item, 0); + + expect(result).toBeUndefined(); + }); + }); +}); diff --git a/packages/nodes-base/utils/workflow-backtracking.ts b/packages/nodes-base/utils/workflow-backtracking.ts new file mode 100644 index 0000000000..43e20d1e58 --- /dev/null +++ b/packages/nodes-base/utils/workflow-backtracking.ts @@ -0,0 +1,114 @@ +import type { + INodeExecutionData, + IPairedItemData, + IRunExecutionData, + ITaskData, +} from 'n8n-workflow'; + +/* + * These functions do not cover all possible edge cases for backtracking through workflow run data. + * They are designed to work for a simple and linear workflow execution. + * If the workflow has branches or complex execution paths, additional logic may be needed. + * We should follow up on this and improve the logic in the future. + */ + +/* + * If we cannot backtrack correctly, we return undefined to fallback to the current paired item behavior + * failing in these functions will cause the parent workflow to fail + */ + +/** + * This function retrieves the previous task data for a given task in the workflow run data. + * Until there is no more source set + */ +export function previousTaskData( + runData: IRunExecutionData['resultData']['runData'], + currentRunData: ITaskData, +): ITaskData | undefined { + const nextNodeName = currentRunData.source?.[0]?.previousNode; + if (!nextNodeName) { + return undefined; // No next node + } + + const nextRunData = runData[nextNodeName]; + if (!nextRunData || nextRunData.length === 0) { + // We don't expect this case to happen in practice, but if for some reason it happens, we fallback to undefined + return undefined; // No run data for the next node + } + + const nextRunIndex = currentRunData.source?.[0]?.previousNodeRun ?? 0; + + return nextRunData[nextRunIndex]; // Return the first run data for the next node +} + +export function findPairedItemTroughWorkflowData( + workflowRunData: IRunExecutionData, + item: INodeExecutionData, + itemIndex: number, +): IPairedItemData | IPairedItemData[] | number | undefined { + // The provided item is already the item of the last node executed in this workflow run + // So the item.pairedItem is the paired item of the last node executed and is therefore referencing + // a node in the previous task data + + const currentNodeName = workflowRunData.resultData.lastNodeExecuted; + if (!currentNodeName) { + // If no node name is available, then we don't know where to start backtracking + return undefined; + } + + // This is the run data of the last node executed in the workflow run + const runData = workflowRunData.resultData.runData[currentNodeName]; + + if (!runData) { + // No run data available for the last node executed + return undefined; + } + + // Since we are backtracking through the workflow, we start with the last run data + const runIndex = runData.length - 1; + + const taskData = runData[runIndex]; + + if (!taskData) { + // If no run data is available, then the workflow did not run at all + return undefined; + } + + // Now we are getting the second last task data, because our initial pairedItem points to this. + let runDataItem = previousTaskData(workflowRunData.resultData.runData, taskData); + + let pairedItem = item.pairedItem; + + // move the runDataItem to the previous node in the in the workflow execution data + // and find the paired item of the current item in the previous task data + // We do this until we reach the first task data of the workflow run + + while (runDataItem !== undefined) { + // We find the output items for the current run data item + const nodeInformationArray = runDataItem.data?.['main']; + + // We find and fallback to 0 for the input index and item index + // The input index is the run the node was executed in case it was executed multiple times + // The item index is the index of the paired item we are looking for + let inputIndex = 0; + let nodeIndex = itemIndex; + if (typeof pairedItem === 'object') { + inputIndex = (pairedItem as IPairedItemData).input ?? 0; + nodeIndex = (pairedItem as IPairedItemData).item ?? itemIndex; + } else if (typeof pairedItem === 'number') { + // If the paired item is a number, we use it as the node index + nodeIndex = pairedItem; + // and fallback to 0 for the input index + inputIndex = 0; + } + + // We found the paired item of the current run data item, this points to the node in the previous task data + pairedItem = nodeInformationArray?.[inputIndex]?.[nodeIndex]?.pairedItem; + + // We move the runDataItem to the previous task data + runDataItem = previousTaskData(workflowRunData.resultData.runData, runDataItem); + } + + // This is the paired item that was in the first task data when the workflow was executed + return pairedItem; +} diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 45b50fba25..69619a40bb 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -908,6 +908,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & parentExecution?: RelatedExecution; }, ): Promise; + getExecutionDataById(executionId: string): Promise; getInputConnectionData( connectionType: AINodeConnectionType, itemIndex: number, @@ -2390,6 +2391,7 @@ export interface IWorkflowExecuteAdditionalData { additionalData: IWorkflowExecuteAdditionalData, options: ExecuteWorkflowOptions, ) => Promise; + getRunExecutionData: (executionId: string) => Promise; executionId?: string; restartExecutionId?: string; currentNodeExecutionIndex: number;