diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.node.ts index 9efd6bb625..f74e13431f 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.node.ts @@ -39,6 +39,7 @@ export class ToolWorkflowV2 implements INodeType { const description = this.getNodeParameter('description', itemIndex) as string; const tool = await workflowToolService.createTool({ + ctx: this, name, description, itemIndex, diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts index 6205e1d6d9..0253220e09 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts @@ -13,6 +13,10 @@ import { WorkflowToolService } from './utils/WorkflowToolService'; // Mock ISupplyDataFunctions interface function createMockContext(overrides?: Partial): ISupplyDataFunctions { + let runIndex = 0; + const getNextRunIndex = jest.fn(() => { + return runIndex++; + }); return { runIndex: 0, getNodeParameter: jest.fn(), @@ -26,6 +30,7 @@ function createMockContext(overrides?: Partial): ISupplyDa getInputData: jest.fn(), getMode: jest.fn(), getRestApiUrl: jest.fn(), + getNextRunIndex, getTimezone: jest.fn(), getWorkflow: jest.fn(), getWorkflowStaticData: jest.fn(), @@ -56,6 +61,7 @@ describe('WorkflowTool::WorkflowToolService', () => { describe('createTool', () => { it('should create a basic dynamic tool when schema is not used', async () => { const toolParams = { + ctx: context, name: 'TestTool', description: 'Test Description', itemIndex: 0, @@ -70,6 +76,7 @@ describe('WorkflowTool::WorkflowToolService', () => { it('should create a tool that can handle successful execution', async () => { const toolParams = { + ctx: context, name: 'TestTool', description: 'Test Description', itemIndex: 0, @@ -112,6 +119,7 @@ describe('WorkflowTool::WorkflowToolService', () => { it('should handle errors during tool execution', async () => { const toolParams = { + ctx: context, name: 'TestTool', description: 'Test Description', itemIndex: 0, diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts index 2c7d34374b..52e784bd28 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts @@ -60,17 +60,21 @@ export class WorkflowToolService { // Creates the tool based on the provided parameters async createTool({ + ctx, name, description, itemIndex, }: { + ctx: ISupplyDataFunctions; name: string; description: string; itemIndex: number; }): Promise { - let runIndex = 0; // Handler for the tool execution, will be called when the tool is executed // This function will execute the sub-workflow and return the response + // We get the runIndex from the context to handle multiple executions + // of the same tool when the tool is used in a loop or in a parallel execution. + let runIndex: number = ctx.getNextRunIndex(); const toolHandler = async ( query: string | IDataObject, runManager?: CallbackManagerForToolRun, diff --git a/packages/core/src/execution-engine/node-execution-context/__tests__/supply-data-context.test.ts b/packages/core/src/execution-engine/node-execution-context/__tests__/supply-data-context.test.ts index c2bc248c73..436d7f983c 100644 --- a/packages/core/src/execution-engine/node-execution-context/__tests__/supply-data-context.test.ts +++ b/packages/core/src/execution-engine/node-execution-context/__tests__/supply-data-context.test.ts @@ -15,6 +15,7 @@ import type { ICredentialDataDecryptedObject, NodeConnectionType, } from 'n8n-workflow'; +import type { IRunData } from 'n8n-workflow'; import { ApplicationError, NodeConnectionTypes } from 'n8n-workflow'; import { describeCommonTests } from './shared-tests'; @@ -186,4 +187,36 @@ describe('SupplyDataContext', () => { expect(clone).not.toBe(supplyDataContext); }); }); + + describe('getNextRunIndex', () => { + it('should return 0 as the default latest run index', () => { + const latestRunIndex = supplyDataContext.getNextRunIndex(); + expect(latestRunIndex).toBe(0); + }); + + it('should return the length of the run execution data for the node', () => { + const runData = mock(); + const runExecutionData = mock({ + resultData: { runData: { [node.name]: [runData, runData] } }, + }); + const supplyDataContext = new SupplyDataContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + connectionType, + executeData, + [closeFn], + abortSignal, + ); + + const latestRunIndex = supplyDataContext.getNextRunIndex(); + + expect(latestRunIndex).toBe(2); + }); + }); }); diff --git a/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts b/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts index c6e4d35823..48d29ee3d6 100644 --- a/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/supply-data-context.ts @@ -167,16 +167,18 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData return super.getInputItems(inputIndex, connectionType) ?? []; } + getNextRunIndex(): number { + const nodeName = this.node.name; + return this.runExecutionData.resultData.runData[nodeName]?.length ?? 0; + } + /** @deprecated create a context object with inputData for every runIndex */ addInputData( connectionType: AINodeConnectionType, data: INodeExecutionData[][], ): { index: number } { const nodeName = this.node.name; - let currentNodeRunIndex = 0; - if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; - } + const currentNodeRunIndex = this.getNextRunIndex(); this.addExecutionDataFunctions( 'input', diff --git a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts index 6280aecd94..0047d23ae8 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts @@ -11,7 +11,9 @@ import type { INodeType, INodeTypes, IExecuteFunctions, + IRunData, } from 'n8n-workflow'; +import type { ITaskData } from 'n8n-workflow'; import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import { ExecuteContext } from '../../execute-context'; @@ -377,11 +379,19 @@ describe('makeHandleToolInvocation', () => { execute, }); const contextFactory = jest.fn(); + const taskData = mock(); + + let runExecutionData = mock({ + resultData: { + runData: mock(), + }, + }); const toolArgs = { key: 'value' }; beforeEach(() => { jest.clearAllMocks(); }); + it('should return stringified results when execution is successful', async () => { const mockContext = mock(); contextFactory.mockReturnValue(mockContext); @@ -393,6 +403,7 @@ describe('makeHandleToolInvocation', () => { contextFactory, connectedNode, connectedNodeType, + runExecutionData, ); const result = await handleToolInvocation(toolArgs); @@ -405,7 +416,6 @@ describe('makeHandleToolInvocation', () => { it('should handle binary data and return a warning message', async () => { const mockContext = mock(); contextFactory.mockReturnValue(mockContext); - const mockResult = [[{ json: {}, binary: { file: 'data' } }]]; execute.mockResolvedValueOnce(mockResult); @@ -413,6 +423,7 @@ describe('makeHandleToolInvocation', () => { contextFactory, connectedNode, connectedNodeType, + runExecutionData, ); const result = await handleToolInvocation(toolArgs); @@ -439,7 +450,6 @@ describe('makeHandleToolInvocation', () => { }, }); contextFactory.mockReturnValue(mockContext); - const mockResult = [[{ json: { a: 3 }, binary: { file: 'data' } }]]; execute.mockResolvedValueOnce(mockResult); @@ -447,6 +457,7 @@ describe('makeHandleToolInvocation', () => { contextFactory, connectedNode, connectedNodeType, + runExecutionData, ); const result = await handleToolInvocation(toolArgs); @@ -466,7 +477,6 @@ describe('makeHandleToolInvocation', () => { it('should handle execution errors and return an error message', async () => { const mockContext = mock(); contextFactory.mockReturnValue(mockContext); - const error = new Error('Execution failed'); execute.mockRejectedValueOnce(error); @@ -474,6 +484,7 @@ describe('makeHandleToolInvocation', () => { contextFactory, connectedNode, connectedNodeType, + runExecutionData, ); const result = await handleToolInvocation(toolArgs); @@ -489,14 +500,42 @@ describe('makeHandleToolInvocation', () => { const mockContext = mock(); contextFactory.mockReturnValue(mockContext); - const handleToolInvocation = makeHandleToolInvocation( + let handleToolInvocation = makeHandleToolInvocation( contextFactory, connectedNode, connectedNodeType, + runExecutionData, ); + await handleToolInvocation(toolArgs); + runExecutionData = mock({ + resultData: { + runData: { + [connectedNode.name]: [taskData], + }, + }, + }); + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); await handleToolInvocation(toolArgs); - await handleToolInvocation(toolArgs); + + runExecutionData = mock({ + resultData: { + runData: { + [connectedNode.name]: [taskData, taskData], + }, + }, + }); + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); await handleToolInvocation(toolArgs); expect(contextFactory).toHaveBeenCalledWith(0); diff --git a/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts b/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts index 9123284f55..89309a3d56 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts @@ -28,19 +28,28 @@ import type { ExecuteContext, WebhookContext } from '../../node-execution-contex // eslint-disable-next-line import/no-cycle import { SupplyDataContext } from '../../node-execution-context/supply-data-context'; +function getNextRunIndex(runExecutionData: IRunExecutionData, nodeName: string) { + return runExecutionData.resultData.runData[nodeName]?.length ?? 0; +} + export function makeHandleToolInvocation( contextFactory: (runIndex: number) => ISupplyDataFunctions, node: INode, nodeType: INodeType, + runExecutionData: IRunExecutionData, ) { /** * This keeps track of how many times this specific AI tool node has been invoked. * It is incremented on every invocation of the tool to keep the output of each invocation separate from each other. */ - let toolRunIndex = 0; + // We get the runIndex from the context to handle multiple executions + // of the same tool when the tool is used in a loop or in a parallel execution. + let runIndex = getNextRunIndex(runExecutionData, node.name); + return async (toolArgs: IDataObject) => { - const runIndex = toolRunIndex++; - const context = contextFactory(runIndex); + // Increment the runIndex for the next invocation + const localRunIndex = runIndex++; + const context = contextFactory(localRunIndex); context.addInputData(NodeConnectionTypes.AiTool, [[{ json: toolArgs }]]); try { @@ -64,13 +73,13 @@ export function makeHandleToolInvocation( } // Add output data to the context - context.addOutputData(NodeConnectionTypes.AiTool, runIndex, [[{ json: { response } }]]); + context.addOutputData(NodeConnectionTypes.AiTool, localRunIndex, [[{ json: { response } }]]); // Return the stringified results return JSON.stringify(response); } catch (error) { const nodeError = new NodeOperationError(node, error as Error); - context.addOutputData(NodeConnectionTypes.AiTool, runIndex, nodeError); + context.addOutputData(NodeConnectionTypes.AiTool, localRunIndex, nodeError); return 'Error during node execution: ' + (nodeError.description ?? nodeError.message); } }; @@ -153,6 +162,7 @@ export async function getInputConnectionData( (i) => contextFactory(i, {}), connectedNode, connectedNodeType, + runExecutionData, ), }); nodes.push(supplyData); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 40d5cb3664..ef9939d602 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -992,6 +992,7 @@ export type ISupplyDataFunctions = ExecuteFunctions.GetNodeParameterFn & | 'sendMessageToUI' | 'helpers' > & { + getNextRunIndex(): number; continueOnFail(): boolean; evaluateExpression(expression: string, itemIndex: number): NodeParameterValueType; getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;