From 9e61d0b9c0383c086a25f33f9987be8aaf33d2ed Mon Sep 17 00:00:00 2001 From: Benjamin Schroth <68321970+schrothbn@users.noreply.github.com> Date: Thu, 26 Jun 2025 13:11:41 +0200 Subject: [PATCH] fix(core): Add retry mechanism to tools (#16667) --- .../ToolWorkflow/v2/ToolWorkflowV2.test.ts | 453 +++++++++++++++++- .../v2/utils/WorkflowToolService.ts | 162 ++++--- .../get-input-connection-data.test.ts | 338 +++++++++++++ .../utils/get-input-connection-data.ts | 105 ++-- packages/workflow/src/index.ts | 1 + packages/workflow/src/utils.ts | 18 + packages/workflow/test/utils.test.ts | 67 +++ 7 files changed, 1056 insertions(+), 88 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts index 0253220e09..c983bc9982 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/ToolWorkflowV2.test.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/dot-notation */ // Disabled to allow access to private methods import { DynamicTool } from '@langchain/core/tools'; -import { NodeOperationError } from 'n8n-workflow'; +import { ApplicationError, NodeOperationError } from 'n8n-workflow'; import type { ISupplyDataFunctions, INodeExecutionData, @@ -11,13 +11,33 @@ import type { import { WorkflowToolService } from './utils/WorkflowToolService'; -// Mock ISupplyDataFunctions interface +// Mock the sleep functions +jest.mock('n8n-workflow', () => ({ + ...jest.requireActual('n8n-workflow'), + sleep: jest.fn().mockResolvedValue(undefined), + sleepWithAbort: jest.fn().mockResolvedValue(undefined), +})); + +function createMockClonedContext( + baseContext: ISupplyDataFunctions, + executeWorkflowMock?: jest.MockedFunction, +): ISupplyDataFunctions { + return { + ...baseContext, + addOutputData: jest.fn(), + getNodeParameter: baseContext.getNodeParameter, + getWorkflowDataProxy: baseContext.getWorkflowDataProxy, + executeWorkflow: executeWorkflowMock || baseContext.executeWorkflow, + getNode: baseContext.getNode, + } as ISupplyDataFunctions; +} + function createMockContext(overrides?: Partial): ISupplyDataFunctions { let runIndex = 0; const getNextRunIndex = jest.fn(() => { return runIndex++; }); - return { + const context = { runIndex: 0, getNodeParameter: jest.fn(), getWorkflowDataProxy: jest.fn(), @@ -34,7 +54,6 @@ function createMockContext(overrides?: Partial): ISupplyDa getTimezone: jest.fn(), getWorkflow: jest.fn(), getWorkflowStaticData: jest.fn(), - cloneWith: jest.fn(), logger: { debug: jest.fn(), error: jest.fn(), @@ -43,6 +62,8 @@ function createMockContext(overrides?: Partial): ISupplyDa }, ...overrides, } as ISupplyDataFunctions; + context.cloneWith = jest.fn().mockImplementation((_) => createMockClonedContext(context)); + return context; } describe('WorkflowTool::WorkflowToolService', () => { @@ -331,4 +352,428 @@ describe('WorkflowTool::WorkflowToolService', () => { ).rejects.toThrow(NodeOperationError); }); }); + + describe('retry functionality', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should not retry when retryOnFail is false', async () => { + const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error')); + const contextWithNonRetryNode = createMockContext({ + getNode: jest.fn().mockReturnValue({ + name: 'Test Tool', + parameters: { workflowInputs: { schema: [] } }, + retryOnFail: false, + }), + getNodeParameter: jest.fn().mockImplementation((name) => { + if (name === 'source') return 'database'; + if (name === 'workflowId') return { value: 'test-workflow-id' }; + if (name === 'fields.values') return []; + return {}; + }), + executeWorkflow: executeWorkflowMock, + addOutputData: jest.fn(), + }); + contextWithNonRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({ + ...createMockClonedContext(contextWithNonRetryNode, executeWorkflowMock), + getWorkflowDataProxy: jest.fn().mockReturnValue({ + $execution: { id: 'exec-id' }, + $workflow: { id: 'workflow-id' }, + }), + getNodeParameter: contextWithNonRetryNode.getNodeParameter, + ...cloneOverrides, + })); + + service = new WorkflowToolService(contextWithNonRetryNode); + const tool = await service.createTool({ + ctx: contextWithNonRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(executeWorkflowMock).toHaveBeenCalledTimes(1); + expect(result).toContain('There was an error'); + }); + + it('should retry up to maxTries when retryOnFail is true', async () => { + const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error')); + const contextWithRetryNode = createMockContext({ + getNode: jest.fn().mockReturnValue({ + name: 'Test Tool', + parameters: { workflowInputs: { schema: [] } }, + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 0, + }), + getNodeParameter: jest.fn().mockImplementation((name) => { + if (name === 'source') return 'database'; + if (name === 'workflowId') return { value: 'test-workflow-id' }; + if (name === 'fields.values') return []; + return {}; + }), + executeWorkflow: executeWorkflowMock, + addOutputData: jest.fn(), + }); + contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({ + ...createMockClonedContext(contextWithRetryNode, executeWorkflowMock), + getWorkflowDataProxy: jest.fn().mockReturnValue({ + $execution: { id: 'exec-id' }, + $workflow: { id: 'workflow-id' }, + }), + getNodeParameter: contextWithRetryNode.getNodeParameter, + ...cloneOverrides, + })); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(executeWorkflowMock).toHaveBeenCalledTimes(3); + expect(result).toContain('There was an error'); + }); + + it('should succeed on retry after initial failure', async () => { + const mockSuccessResponse = { + data: [[{ json: { result: 'success' } }]], + executionId: 'success-exec-id', + }; + + const executeWorkflowMock = jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce(mockSuccessResponse); + + const contextWithRetryNode = createMockContext({ + getNode: jest.fn().mockReturnValue({ + name: 'Test Tool', + parameters: { workflowInputs: { schema: [] } }, + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 0, + }), + getNodeParameter: jest.fn().mockImplementation((name) => { + if (name === 'source') return 'database'; + if (name === 'workflowId') return { value: 'test-workflow-id' }; + if (name === 'fields.values') return []; + return {}; + }), + executeWorkflow: executeWorkflowMock, + addOutputData: jest.fn(), + }); + contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({ + ...createMockClonedContext(contextWithRetryNode, executeWorkflowMock), + getWorkflowDataProxy: jest.fn().mockReturnValue({ + $execution: { id: 'exec-id' }, + $workflow: { id: 'workflow-id' }, + }), + getNodeParameter: contextWithRetryNode.getNodeParameter, + ...cloneOverrides, + })); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(executeWorkflowMock).toHaveBeenCalledTimes(2); + expect(result).toBe(JSON.stringify({ result: 'success' }, null, 2)); + }); + + it.each([ + { maxTries: 1, expected: 2 }, // Should be clamped to minimum 2 + { maxTries: 3, expected: 3 }, + { maxTries: 6, expected: 5 }, // Should be clamped to maximum 5 + ])('should respect maxTries limits (2-5)', async ({ maxTries, expected }) => { + const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error')); + + const contextWithRetryNode = createMockContext({ + getNode: jest.fn().mockReturnValue({ + name: 'Test Tool', + parameters: { workflowInputs: { schema: [] } }, + retryOnFail: true, + maxTries, + waitBetweenTries: 0, + }), + getNodeParameter: jest.fn().mockImplementation((name) => { + if (name === 'source') return 'database'; + if (name === 'workflowId') return { value: 'test-workflow-id' }; + if (name === 'fields.values') return []; + return {}; + }), + executeWorkflow: executeWorkflowMock, + }); + + contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({ + ...createMockClonedContext(contextWithRetryNode, executeWorkflowMock), + getWorkflowDataProxy: jest.fn().mockReturnValue({ + $execution: { id: 'exec-id' }, + $workflow: { id: 'workflow-id' }, + }), + getNodeParameter: contextWithRetryNode.getNodeParameter, + ...cloneOverrides, + })); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + await tool.func('test query'); + + expect(executeWorkflowMock).toHaveBeenCalledTimes(expected); + }); + + it('should respect waitBetweenTries with sleepWithAbort', async () => { + const { sleepWithAbort } = jest.requireMock('n8n-workflow'); + sleepWithAbort.mockClear(); + const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error')); + + const contextWithRetryNode = createMockContext({ + getNode: jest.fn().mockReturnValue({ + name: 'Test Tool', + parameters: { workflowInputs: { schema: [] } }, + retryOnFail: true, + maxTries: 2, + waitBetweenTries: 1500, + }), + getNodeParameter: jest.fn().mockImplementation((name) => { + if (name === 'source') return 'database'; + if (name === 'workflowId') return { value: 'test-workflow-id' }; + if (name === 'fields.values') return []; + return {}; + }), + executeWorkflow: executeWorkflowMock, + addOutputData: jest.fn(), + }); + contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({ + ...createMockClonedContext(contextWithRetryNode, executeWorkflowMock), + getWorkflowDataProxy: jest.fn().mockReturnValue({ + $execution: { id: 'exec-id' }, + $workflow: { id: 'workflow-id' }, + }), + getNodeParameter: contextWithRetryNode.getNodeParameter, + ...cloneOverrides, + })); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + await tool.func('test query'); + + expect(sleepWithAbort).toHaveBeenCalledWith(1500, undefined); + }); + }); + + describe('abort signal functionality', () => { + let abortController: AbortController; + + beforeEach(() => { + jest.clearAllMocks(); + abortController = new AbortController(); + }); + + const createAbortSignalContext = ( + executeWorkflowMock: jest.MockedFunction, + abortSignal?: AbortSignal, + ) => { + const contextWithRetryNode = createMockContext({ + getNode: jest.fn().mockReturnValue({ + name: 'Test Tool', + parameters: { workflowInputs: { schema: [] } }, + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 100, + }), + getNodeParameter: jest.fn().mockImplementation((name) => { + if (name === 'source') return 'database'; + if (name === 'workflowId') return { value: 'test-workflow-id' }; + if (name === 'fields.values') return []; + return {}; + }), + executeWorkflow: executeWorkflowMock, + addOutputData: jest.fn(), + }); + contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({ + ...createMockClonedContext(contextWithRetryNode, executeWorkflowMock), + getWorkflowDataProxy: jest.fn().mockReturnValue({ + $execution: { id: 'exec-id' }, + $workflow: { id: 'workflow-id' }, + }), + getNodeParameter: contextWithRetryNode.getNodeParameter, + getExecutionCancelSignal: jest.fn(() => abortSignal), + ...cloneOverrides, + })); + return contextWithRetryNode; + }; + + it('should return cancellation message if signal is already aborted', async () => { + const executeWorkflowMock = jest.fn().mockResolvedValue({ + data: [[{ json: { result: 'success' } }]], + executionId: 'success-exec-id', + }); + + // Abort before starting + abortController.abort(); + + const contextWithRetryNode = createAbortSignalContext( + executeWorkflowMock, + abortController.signal, + ); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(result).toBe('There was an error: "Execution was cancelled"'); + expect(executeWorkflowMock).not.toHaveBeenCalled(); + }); + + it('should handle abort signal during retry wait', async () => { + const { sleepWithAbort } = jest.requireMock('n8n-workflow'); + sleepWithAbort.mockRejectedValue(new Error('Execution was cancelled')); + + const executeWorkflowMock = jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce({ + data: [[{ json: { result: 'success' } }]], + executionId: 'success-exec-id', + }); + + const contextWithRetryNode = createAbortSignalContext( + executeWorkflowMock, + abortController.signal, + ); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(result).toBe('There was an error: "Execution was cancelled"'); + expect(sleepWithAbort).toHaveBeenCalledWith(100, abortController.signal); + expect(executeWorkflowMock).toHaveBeenCalledTimes(1); // Only first attempt + }); + + it('should handle abort signal during execution', async () => { + const executeWorkflowMock = jest.fn().mockImplementation(() => { + // Simulate abort during execution + abortController.abort(); + throw new ApplicationError('Workflow execution failed'); + }); + + const contextWithRetryNode = createAbortSignalContext( + executeWorkflowMock, + abortController.signal, + ); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(result).toBe('There was an error: "Execution was cancelled"'); + expect(executeWorkflowMock).toHaveBeenCalledTimes(1); + }); + + it('should complete successfully if not aborted', async () => { + const { sleepWithAbort } = jest.requireMock('n8n-workflow'); + sleepWithAbort.mockClear().mockResolvedValue(undefined); + + const executeWorkflowMock = jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce({ + data: [[{ json: { result: 'success' } }]], + executionId: 'success-exec-id', + }); + + const contextWithRetryNode = createAbortSignalContext( + executeWorkflowMock, + abortController.signal, + ); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(result).toBe(JSON.stringify({ result: 'success' }, null, 2)); + expect(executeWorkflowMock).toHaveBeenCalledTimes(2); + expect(sleepWithAbort).toHaveBeenCalledWith(100, abortController.signal); + }); + + it('should work when getExecutionCancelSignal is not available', async () => { + const { sleepWithAbort } = jest.requireMock('n8n-workflow'); + sleepWithAbort.mockClear().mockResolvedValue(undefined); + + const executeWorkflowMock = jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce({ + data: [[{ json: { result: 'success' } }]], + executionId: 'success-exec-id', + }); + + // Create context without getExecutionCancelSignal + const contextWithRetryNode = createAbortSignalContext(executeWorkflowMock, undefined); + + service = new WorkflowToolService(contextWithRetryNode); + const tool = await service.createTool({ + ctx: contextWithRetryNode, + name: 'Test Tool', + description: 'Test Description', + itemIndex: 0, + }); + + const result = await tool.func('test query'); + + expect(result).toBe(JSON.stringify({ result: 'success' }, null, 2)); + expect(sleepWithAbort).toHaveBeenCalledWith(100, undefined); + }); + }); }); diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts index 8d95731b3c..e68997ee4e 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/v2/utils/WorkflowToolService.ts @@ -25,6 +25,7 @@ import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata, + sleepWithAbort, traverseNodeParameters, } from 'n8n-workflow'; import { z } from 'zod'; @@ -75,69 +76,116 @@ export class WorkflowToolService { // This function will execute the sub-workflow and return the response // We get the runIndex from the context to handle multiple executions // of the same tool when the tool is used in a loop or in a parallel execution. + const node = ctx.getNode(); + let runIndex: number = ctx.getNextRunIndex(); const toolHandler = async ( query: string | IDataObject, runManager?: CallbackManagerForToolRun, ): Promise => { - const localRunIndex = runIndex++; - // We need to clone the context here to handle runIndex correctly - // Otherwise the runIndex will be shared between different executions - // Causing incorrect data to be passed to the sub-workflow and via $fromAI - const context = this.baseContext.cloneWith({ - runIndex: localRunIndex, - inputData: [[{ json: { query } }]], - }); - - try { - const response = await this.runFunction(context, query, itemIndex, runManager); - - const processedResponse = this.handleToolResponse(response); - - let responseData: INodeExecutionData[]; - if (isNodeExecutionData(response)) { - responseData = response; - } else { - const reParsedData = jsonParse(processedResponse, { - fallbackValue: { response: processedResponse }, - }); - - responseData = [{ json: reParsedData }]; - } - - // Once the sub-workflow is executed, add the output data to the context - // This will be used to link the sub-workflow execution in the parent workflow - let metadata: ITaskMetadata | undefined; - if (this.subExecutionId && this.subWorkflowId) { - metadata = { - subExecution: { - executionId: this.subExecutionId, - workflowId: this.subWorkflowId, - }, - }; - } - - void context.addOutputData( - NodeConnectionTypes.AiTool, - localRunIndex, - [responseData], - metadata, - ); - - return processedResponse; - } catch (error) { - const executionError = error as ExecutionError; - const errorResponse = `There was an error: "${executionError.message}"`; - - const metadata = parseErrorMetadata(error); - void context.addOutputData( - NodeConnectionTypes.AiTool, - localRunIndex, - executionError, - metadata, - ); - return errorResponse; + let maxTries = 1; + if (node.retryOnFail === true) { + maxTries = Math.min(5, Math.max(2, node.maxTries ?? 3)); } + + let waitBetweenTries = 0; + if (node.retryOnFail === true) { + waitBetweenTries = Math.min(5000, Math.max(0, node.waitBetweenTries ?? 1000)); + } + + let lastError: ExecutionError | undefined; + + for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) { + const localRunIndex = runIndex++; + // We need to clone the context here to handle runIndex correctly + // Otherwise the runIndex will be shared between different executions + // Causing incorrect data to be passed to the sub-workflow and via $fromAI + const context = this.baseContext.cloneWith({ + runIndex: localRunIndex, + inputData: [[{ json: { query } }]], + }); + + // Get abort signal from context for cancellation support + const abortSignal = context.getExecutionCancelSignal?.(); + + // Check if execution was cancelled before retry + if (abortSignal?.aborted) { + return 'There was an error: "Execution was cancelled"'; + } + + if (tryIndex !== 0) { + // Reset error from previous attempt + lastError = undefined; + if (waitBetweenTries !== 0) { + try { + await sleepWithAbort(waitBetweenTries, abortSignal); + } catch (abortError) { + return 'There was an error: "Execution was cancelled"'; + } + } + } + + try { + const response = await this.runFunction(context, query, itemIndex, runManager); + + const processedResponse = this.handleToolResponse(response); + + let responseData: INodeExecutionData[]; + if (isNodeExecutionData(response)) { + responseData = response; + } else { + const reParsedData = jsonParse(processedResponse, { + fallbackValue: { response: processedResponse }, + }); + + responseData = [{ json: reParsedData }]; + } + + // Once the sub-workflow is executed, add the output data to the context + // This will be used to link the sub-workflow execution in the parent workflow + let metadata: ITaskMetadata | undefined; + if (this.subExecutionId && this.subWorkflowId) { + metadata = { + subExecution: { + executionId: this.subExecutionId, + workflowId: this.subWorkflowId, + }, + }; + } + + void context.addOutputData( + NodeConnectionTypes.AiTool, + localRunIndex, + [responseData], + metadata, + ); + + return processedResponse; + } catch (error) { + // Check if error is due to cancellation + if (abortSignal?.aborted) { + return 'There was an error: "Execution was cancelled"'; + } + + const executionError = error as ExecutionError; + lastError = executionError; + const errorResponse = `There was an error: "${executionError.message}"`; + + const metadata = parseErrorMetadata(error); + void context.addOutputData( + NodeConnectionTypes.AiTool, + localRunIndex, + executionError, + metadata, + ); + + if (tryIndex === maxTries - 1) { + return errorResponse; + } + } + } + + return `There was an error: ${lastError?.message ?? 'Unknown error'}`; }; // Create structured tool if input schema is provided diff --git a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts index f464c3f102..e1ffd8017b 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/__tests__/get-input-connection-data.test.ts @@ -711,4 +711,342 @@ describe('makeHandleToolInvocation', () => { expect(contextFactory).toHaveBeenCalledWith(1); expect(contextFactory).toHaveBeenCalledWith(2); }); + + describe('retry functionality', () => { + const contextFactory = jest.fn(); + const toolArgs = { input: 'test' }; + let handleToolInvocation: ReturnType; + let mockContext: unknown; + + beforeEach(() => { + jest.clearAllMocks(); + mockContext = { + addInputData: jest.fn(), + addOutputData: jest.fn(), + logger: { warn: jest.fn() }, + }; + contextFactory.mockReturnValue(mockContext); + }); + + it('should not retry when retryOnFail is false', async () => { + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: false, + }); + const connectedNodeType = mock({ + execute: jest.fn().mockRejectedValue(new Error('Test error')), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(contextFactory).toHaveBeenCalledTimes(1); + expect(connectedNodeType.execute).toHaveBeenCalledTimes(1); + expect(result).toContain('Error during node execution'); + }); + + it('should retry up to maxTries when retryOnFail is true', async () => { + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 0, + }); + const connectedNodeType = mock({ + execute: jest.fn().mockRejectedValue(new Error('Test error')), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(contextFactory).toHaveBeenCalledTimes(3); + expect(connectedNodeType.execute).toHaveBeenCalledTimes(3); + expect(result).toContain('Error during node execution'); + }); + + it('should succeed on retry after initial failure', async () => { + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 0, + }); + const connectedNodeType = mock({ + execute: jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce([[{ json: { result: 'success' } }]]), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(contextFactory).toHaveBeenCalledTimes(2); + expect(connectedNodeType.execute).toHaveBeenCalledTimes(2); + expect(result).toBe(JSON.stringify([{ result: 'success' }])); + }); + + it('should respect maxTries limits (2-5)', async () => { + const testCases = [ + { maxTries: 1, expected: 2 }, // Should be clamped to minimum 2 + { maxTries: 3, expected: 3 }, + { maxTries: 6, expected: 5 }, // Should be clamped to maximum 5 + ]; + + for (const { maxTries, expected } of testCases) { + jest.clearAllMocks(); + + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries, + waitBetweenTries: 0, + }); + const connectedNodeType = mock({ + execute: jest.fn().mockRejectedValue(new Error('Test error')), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + await handleToolInvocation(toolArgs); + + expect(connectedNodeType.execute).toHaveBeenCalledTimes(expected); + } + }); + + it('should respect waitBetweenTries limits (0-5000ms)', async () => { + const sleepWithAbortSpy = jest + .spyOn(require('n8n-workflow'), 'sleepWithAbort') + .mockResolvedValue(undefined); + + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 2, + waitBetweenTries: 1500, + }); + const connectedNodeType = mock({ + execute: jest.fn().mockRejectedValue(new Error('Test error')), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + await handleToolInvocation(toolArgs); + + expect(sleepWithAbortSpy).toHaveBeenCalledWith(1500, undefined); + sleepWithAbortSpy.mockRestore(); + }); + }); + + describe('abort signal functionality', () => { + const contextFactory = jest.fn(); + const toolArgs = { input: 'test' }; + let handleToolInvocation: ReturnType; + let mockContext: unknown; + let abortController: AbortController; + + beforeEach(() => { + jest.clearAllMocks(); + abortController = new AbortController(); + mockContext = { + addInputData: jest.fn(), + addOutputData: jest.fn(), + logger: { warn: jest.fn() }, + getExecutionCancelSignal: jest.fn(() => abortController.signal), + }; + contextFactory.mockReturnValue(mockContext); + }); + + it('should return cancellation message if signal is already aborted', async () => { + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 100, + }); + const connectedNodeType = mock({ + execute: jest.fn().mockResolvedValue([[{ json: { result: 'success' } }]]), + }); + + // Abort before starting + abortController.abort(); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(result).toBe('Error during node execution: Execution was cancelled'); + expect(connectedNodeType.execute).not.toHaveBeenCalled(); + }); + + it('should handle abort signal during retry wait', async () => { + const sleepWithAbortSpy = jest + .spyOn(require('n8n-workflow'), 'sleepWithAbort') + .mockRejectedValue(new Error('Execution was cancelled')); + + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 1000, + }); + const connectedNodeType = mock({ + execute: jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce([[{ json: { result: 'success' } }]]), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(result).toBe('Error during node execution: Execution was cancelled'); + expect(sleepWithAbortSpy).toHaveBeenCalledWith(1000, abortController.signal); + expect(connectedNodeType.execute).toHaveBeenCalledTimes(1); // Only first attempt + + sleepWithAbortSpy.mockRestore(); + }); + + it('should handle abort signal during execution', async () => { + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 0, + }); + const connectedNodeType = mock({ + execute: jest.fn().mockImplementation(() => { + // Simulate abort during execution + abortController.abort(); + throw new Error('Execution failed'); + }), + }); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(result).toBe('Error during node execution: Execution was cancelled'); + expect(connectedNodeType.execute).toHaveBeenCalledTimes(1); + }); + + it('should complete successfully if not aborted', async () => { + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 3, + waitBetweenTries: 10, + }); + const connectedNodeType = mock({ + execute: jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce([[{ json: { result: 'success' } }]]), + }); + + const sleepWithAbortSpy = jest + .spyOn(require('n8n-workflow'), 'sleepWithAbort') + .mockResolvedValue(undefined); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(result).toBe(JSON.stringify([{ result: 'success' }])); + expect(connectedNodeType.execute).toHaveBeenCalledTimes(2); + expect(sleepWithAbortSpy).toHaveBeenCalledWith(10, abortController.signal); + + sleepWithAbortSpy.mockRestore(); + }); + + it('should work when getExecutionCancelSignal is not available', async () => { + const contextWithoutSignal = { + addInputData: jest.fn(), + addOutputData: jest.fn(), + logger: { warn: jest.fn() }, + // No getExecutionCancelSignal method + }; + contextFactory.mockReturnValue(contextWithoutSignal); + + const connectedNode = mock({ + name: 'Test Tool', + retryOnFail: true, + maxTries: 2, + waitBetweenTries: 10, + }); + const connectedNodeType = mock({ + execute: jest + .fn() + .mockRejectedValueOnce(new Error('First attempt fails')) + .mockResolvedValueOnce([[{ json: { result: 'success' } }]]), + }); + + const sleepWithAbortSpy = jest + .spyOn(require('n8n-workflow'), 'sleepWithAbort') + .mockResolvedValue(undefined); + + handleToolInvocation = makeHandleToolInvocation( + contextFactory, + connectedNode, + connectedNodeType, + runExecutionData, + ); + + const result = await handleToolInvocation(toolArgs); + + expect(result).toBe(JSON.stringify([{ result: 'success' }])); + expect(sleepWithAbortSpy).toHaveBeenCalledWith(10, undefined); + + sleepWithAbortSpy.mockRestore(); + }); + }); }); diff --git a/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts b/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts index b62f4b5f15..7d0562bd2c 100644 --- a/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts +++ b/packages/core/src/execution-engine/node-execution-context/utils/get-input-connection-data.ts @@ -24,6 +24,7 @@ import { ExecutionBaseError, ApplicationError, UserError, + sleepWithAbort, } from 'n8n-workflow'; import { createNodeAsTool } from './create-node-as-tool'; @@ -50,41 +51,91 @@ export function makeHandleToolInvocation( let runIndex = getNextRunIndex(runExecutionData, node.name); return async (toolArgs: IDataObject) => { - // Increment the runIndex for the next invocation - const localRunIndex = runIndex++; - const context = contextFactory(localRunIndex); - context.addInputData(NodeConnectionTypes.AiTool, [[{ json: toolArgs }]]); + let maxTries = 1; + if (node.retryOnFail === true) { + maxTries = Math.min(5, Math.max(2, node.maxTries ?? 3)); + } - try { - // Execute the sub-node with the proxied context - const result = await nodeType.execute?.call(context as unknown as IExecuteFunctions); + let waitBetweenTries = 0; + if (node.retryOnFail === true) { + waitBetweenTries = Math.min(5000, Math.max(0, node.waitBetweenTries ?? 1000)); + } - // Process and map the results - const mappedResults = result?.[0]?.flatMap((item) => item.json); - let response: string | typeof mappedResults = mappedResults; + let lastError: NodeOperationError | undefined; - // Warn if any (unusable) binary data was returned - if (result?.some((x) => x.some((y) => y.binary))) { - if (!mappedResults || mappedResults.flatMap((x) => Object.keys(x ?? {})).length === 0) { - response = - 'Error: The Tool attempted to return binary data, which is not supported in Agents'; - } else { - context.logger.warn( - `Response from Tool '${node.name}' included binary data, which is not supported in Agents. The binary data was omitted from the response.`, - ); + for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) { + // Increment the runIndex for the next invocation + const localRunIndex = runIndex++; + const context = contextFactory(localRunIndex); + + // Get abort signal from context for cancellation support + const abortSignal = context.getExecutionCancelSignal?.(); + + // Check if execution was cancelled before retry + if (abortSignal?.aborted) { + return 'Error during node execution: Execution was cancelled'; + } + + if (tryIndex !== 0) { + // Reset error from previous attempt + lastError = undefined; + if (waitBetweenTries !== 0) { + try { + await sleepWithAbort(waitBetweenTries, abortSignal); + } catch (abortError) { + return 'Error during node execution: Execution was cancelled'; + } } } - // Add output data to the context - context.addOutputData(NodeConnectionTypes.AiTool, localRunIndex, [[{ json: { response } }]]); + context.addInputData(NodeConnectionTypes.AiTool, [[{ json: toolArgs }]]); - // Return the stringified results - return JSON.stringify(response); - } catch (error) { - const nodeError = new NodeOperationError(node, error as Error); - context.addOutputData(NodeConnectionTypes.AiTool, localRunIndex, nodeError); - return 'Error during node execution: ' + (nodeError.description ?? nodeError.message); + try { + // Execute the sub-node with the proxied context + const result = await nodeType.execute?.call(context as unknown as IExecuteFunctions); + + // Process and map the results + const mappedResults = result?.[0]?.flatMap((item) => item.json); + let response: string | typeof mappedResults = mappedResults; + + // Warn if any (unusable) binary data was returned + if (result?.some((x) => x.some((y) => y.binary))) { + if (!mappedResults || mappedResults.flatMap((x) => Object.keys(x ?? {})).length === 0) { + response = + 'Error: The Tool attempted to return binary data, which is not supported in Agents'; + } else { + context.logger.warn( + `Response from Tool '${node.name}' included binary data, which is not supported in Agents. The binary data was omitted from the response.`, + ); + } + } + + // Add output data to the context + context.addOutputData(NodeConnectionTypes.AiTool, localRunIndex, [ + [{ json: { response } }], + ]); + + // Return the stringified results + return JSON.stringify(response); + } catch (error) { + // Check if error is due to cancellation + if (abortSignal?.aborted) { + return 'Error during node execution: Execution was cancelled'; + } + + const nodeError = new NodeOperationError(node, error as Error); + context.addOutputData(NodeConnectionTypes.AiTool, localRunIndex, nodeError); + + lastError = nodeError; + + // If this is the last attempt, throw the error + if (tryIndex === maxTries - 1) { + return 'Error during node execution: ' + (nodeError.description ?? nodeError.message); + } + } } + + return 'Error during node execution : ' + (lastError?.description ?? lastError?.message); }; } diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index bcf9b2fc19..47ab898195 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -31,6 +31,7 @@ export { jsonStringify, replaceCircularReferences, sleep, + sleepWithAbort, fileTypeFromMimeType, assert, removeCircularRefs, diff --git a/packages/workflow/src/utils.ts b/packages/workflow/src/utils.ts index 5e48880010..01c2844ce0 100644 --- a/packages/workflow/src/utils.ts +++ b/packages/workflow/src/utils.ts @@ -9,6 +9,7 @@ import merge from 'lodash/merge'; import { ALPHABET } from './constants'; import { ApplicationError } from './errors/application.error'; +import { ExecutionCancelledError } from './errors/execution-cancelled.error'; import type { BinaryFileType, IDisplayOptions, INodeProperties, JsonObject } from './interfaces'; const readStreamClasses = new Set(['ReadStream', 'Readable', 'ReadableStream']); @@ -199,6 +200,23 @@ export const sleep = async (ms: number): Promise => setTimeout(resolve, ms); }); +export const sleepWithAbort = async (ms: number, abortSignal?: AbortSignal): Promise => + await new Promise((resolve, reject) => { + if (abortSignal?.aborted) { + reject(new ExecutionCancelledError('')); + return; + } + + const timeout = setTimeout(resolve, ms); + + const abortHandler = () => { + clearTimeout(timeout); + reject(new ExecutionCancelledError('')); + }; + + abortSignal?.addEventListener('abort', abortHandler, { once: true }); + }); + export function fileTypeFromMimeType(mimeType: string): BinaryFileType | undefined { if (mimeType.startsWith('application/json')) return 'json'; if (mimeType.startsWith('text/html')) return 'html'; diff --git a/packages/workflow/test/utils.test.ts b/packages/workflow/test/utils.test.ts index 823e14f3dc..b05e1adf71 100644 --- a/packages/workflow/test/utils.test.ts +++ b/packages/workflow/test/utils.test.ts @@ -1,5 +1,6 @@ import { ALPHABET } from '@/constants'; import { ApplicationError } from '@/errors/application.error'; +import { ExecutionCancelledError } from '@/errors/execution-cancelled.error'; import { jsonParse, jsonStringify, @@ -11,6 +12,7 @@ import { hasKey, isSafeObjectProperty, setSafeObjectProperty, + sleepWithAbort, } from '@/utils'; describe('isObjectEmpty', () => { @@ -394,3 +396,68 @@ describe('setSafeObjectProperty', () => { expect(obj).toEqual(expected); }); }); + +describe('sleepWithAbort', () => { + it('should resolve after the specified time when not aborted', async () => { + const start = Date.now(); + await sleepWithAbort(100); + const end = Date.now(); + const elapsed = end - start; + + // Allow some tolerance for timing + expect(elapsed).toBeGreaterThanOrEqual(90); + expect(elapsed).toBeLessThan(200); + }); + + it('should reject immediately if abort signal is already aborted', async () => { + const abortController = new AbortController(); + abortController.abort(); + + await expect(sleepWithAbort(1000, abortController.signal)).rejects.toThrow( + ExecutionCancelledError, + ); + }); + + it('should reject when abort signal is triggered during sleep', async () => { + const abortController = new AbortController(); + + // Start the sleep and abort after 50ms + setTimeout(() => abortController.abort(), 50); + + const start = Date.now(); + await expect(sleepWithAbort(1000, abortController.signal)).rejects.toThrow( + ExecutionCancelledError, + ); + const end = Date.now(); + const elapsed = end - start; + + // Should have been aborted after ~50ms, not the full 1000ms + expect(elapsed).toBeLessThan(200); + }); + + it('should work without abort signal', async () => { + const start = Date.now(); + await sleepWithAbort(100, undefined); + const end = Date.now(); + const elapsed = end - start; + + expect(elapsed).toBeGreaterThanOrEqual(90); + expect(elapsed).toBeLessThan(200); + }); + + it('should clean up timeout when aborted during sleep', async () => { + const abortController = new AbortController(); + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout'); + + // Start the sleep and abort after 50ms + const sleepPromise = sleepWithAbort(1000, abortController.signal); + setTimeout(() => abortController.abort(), 50); + + await expect(sleepPromise).rejects.toThrow(ExecutionCancelledError); + + // clearTimeout should have been called to clean up + expect(clearTimeoutSpy).toHaveBeenCalled(); + + clearTimeoutSpy.mockRestore(); + }); +});