fix(core): Add retry mechanism to tools (#16667)

This commit is contained in:
Benjamin Schroth
2025-06-26 13:11:41 +02:00
committed by GitHub
parent f690ed5e97
commit 9e61d0b9c0
7 changed files with 1056 additions and 88 deletions

View File

@@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/dot-notation */ // Disabled to allow access to private methods
import { DynamicTool } from '@langchain/core/tools';
import { NodeOperationError } from 'n8n-workflow';
import { ApplicationError, NodeOperationError } from 'n8n-workflow';
import type {
ISupplyDataFunctions,
INodeExecutionData,
@@ -11,13 +11,33 @@ import type {
import { WorkflowToolService } from './utils/WorkflowToolService';
// Mock ISupplyDataFunctions interface
// Mock the sleep functions
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn().mockResolvedValue(undefined),
sleepWithAbort: jest.fn().mockResolvedValue(undefined),
}));
function createMockClonedContext(
baseContext: ISupplyDataFunctions,
executeWorkflowMock?: jest.MockedFunction<any>,
): ISupplyDataFunctions {
return {
...baseContext,
addOutputData: jest.fn(),
getNodeParameter: baseContext.getNodeParameter,
getWorkflowDataProxy: baseContext.getWorkflowDataProxy,
executeWorkflow: executeWorkflowMock || baseContext.executeWorkflow,
getNode: baseContext.getNode,
} as ISupplyDataFunctions;
}
function createMockContext(overrides?: Partial<ISupplyDataFunctions>): ISupplyDataFunctions {
let runIndex = 0;
const getNextRunIndex = jest.fn(() => {
return runIndex++;
});
return {
const context = {
runIndex: 0,
getNodeParameter: jest.fn(),
getWorkflowDataProxy: jest.fn(),
@@ -34,7 +54,6 @@ function createMockContext(overrides?: Partial<ISupplyDataFunctions>): ISupplyDa
getTimezone: jest.fn(),
getWorkflow: jest.fn(),
getWorkflowStaticData: jest.fn(),
cloneWith: jest.fn(),
logger: {
debug: jest.fn(),
error: jest.fn(),
@@ -43,6 +62,8 @@ function createMockContext(overrides?: Partial<ISupplyDataFunctions>): ISupplyDa
},
...overrides,
} as ISupplyDataFunctions;
context.cloneWith = jest.fn().mockImplementation((_) => createMockClonedContext(context));
return context;
}
describe('WorkflowTool::WorkflowToolService', () => {
@@ -331,4 +352,428 @@ describe('WorkflowTool::WorkflowToolService', () => {
).rejects.toThrow(NodeOperationError);
});
});
describe('retry functionality', () => {
beforeEach(() => {
jest.clearAllMocks();
});
it('should not retry when retryOnFail is false', async () => {
const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error'));
const contextWithNonRetryNode = createMockContext({
getNode: jest.fn().mockReturnValue({
name: 'Test Tool',
parameters: { workflowInputs: { schema: [] } },
retryOnFail: false,
}),
getNodeParameter: jest.fn().mockImplementation((name) => {
if (name === 'source') return 'database';
if (name === 'workflowId') return { value: 'test-workflow-id' };
if (name === 'fields.values') return [];
return {};
}),
executeWorkflow: executeWorkflowMock,
addOutputData: jest.fn(),
});
contextWithNonRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({
...createMockClonedContext(contextWithNonRetryNode, executeWorkflowMock),
getWorkflowDataProxy: jest.fn().mockReturnValue({
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
}),
getNodeParameter: contextWithNonRetryNode.getNodeParameter,
...cloneOverrides,
}));
service = new WorkflowToolService(contextWithNonRetryNode);
const tool = await service.createTool({
ctx: contextWithNonRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(executeWorkflowMock).toHaveBeenCalledTimes(1);
expect(result).toContain('There was an error');
});
it('should retry up to maxTries when retryOnFail is true', async () => {
const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error'));
const contextWithRetryNode = createMockContext({
getNode: jest.fn().mockReturnValue({
name: 'Test Tool',
parameters: { workflowInputs: { schema: [] } },
retryOnFail: true,
maxTries: 3,
waitBetweenTries: 0,
}),
getNodeParameter: jest.fn().mockImplementation((name) => {
if (name === 'source') return 'database';
if (name === 'workflowId') return { value: 'test-workflow-id' };
if (name === 'fields.values') return [];
return {};
}),
executeWorkflow: executeWorkflowMock,
addOutputData: jest.fn(),
});
contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({
...createMockClonedContext(contextWithRetryNode, executeWorkflowMock),
getWorkflowDataProxy: jest.fn().mockReturnValue({
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
}),
getNodeParameter: contextWithRetryNode.getNodeParameter,
...cloneOverrides,
}));
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(executeWorkflowMock).toHaveBeenCalledTimes(3);
expect(result).toContain('There was an error');
});
it('should succeed on retry after initial failure', async () => {
const mockSuccessResponse = {
data: [[{ json: { result: 'success' } }]],
executionId: 'success-exec-id',
};
const executeWorkflowMock = jest
.fn()
.mockRejectedValueOnce(new Error('First attempt fails'))
.mockResolvedValueOnce(mockSuccessResponse);
const contextWithRetryNode = createMockContext({
getNode: jest.fn().mockReturnValue({
name: 'Test Tool',
parameters: { workflowInputs: { schema: [] } },
retryOnFail: true,
maxTries: 3,
waitBetweenTries: 0,
}),
getNodeParameter: jest.fn().mockImplementation((name) => {
if (name === 'source') return 'database';
if (name === 'workflowId') return { value: 'test-workflow-id' };
if (name === 'fields.values') return [];
return {};
}),
executeWorkflow: executeWorkflowMock,
addOutputData: jest.fn(),
});
contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({
...createMockClonedContext(contextWithRetryNode, executeWorkflowMock),
getWorkflowDataProxy: jest.fn().mockReturnValue({
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
}),
getNodeParameter: contextWithRetryNode.getNodeParameter,
...cloneOverrides,
}));
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(executeWorkflowMock).toHaveBeenCalledTimes(2);
expect(result).toBe(JSON.stringify({ result: 'success' }, null, 2));
});
it.each([
{ maxTries: 1, expected: 2 }, // Should be clamped to minimum 2
{ maxTries: 3, expected: 3 },
{ maxTries: 6, expected: 5 }, // Should be clamped to maximum 5
])('should respect maxTries limits (2-5)', async ({ maxTries, expected }) => {
const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error'));
const contextWithRetryNode = createMockContext({
getNode: jest.fn().mockReturnValue({
name: 'Test Tool',
parameters: { workflowInputs: { schema: [] } },
retryOnFail: true,
maxTries,
waitBetweenTries: 0,
}),
getNodeParameter: jest.fn().mockImplementation((name) => {
if (name === 'source') return 'database';
if (name === 'workflowId') return { value: 'test-workflow-id' };
if (name === 'fields.values') return [];
return {};
}),
executeWorkflow: executeWorkflowMock,
});
contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({
...createMockClonedContext(contextWithRetryNode, executeWorkflowMock),
getWorkflowDataProxy: jest.fn().mockReturnValue({
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
}),
getNodeParameter: contextWithRetryNode.getNodeParameter,
...cloneOverrides,
}));
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
await tool.func('test query');
expect(executeWorkflowMock).toHaveBeenCalledTimes(expected);
});
it('should respect waitBetweenTries with sleepWithAbort', async () => {
const { sleepWithAbort } = jest.requireMock('n8n-workflow');
sleepWithAbort.mockClear();
const executeWorkflowMock = jest.fn().mockRejectedValue(new Error('Test error'));
const contextWithRetryNode = createMockContext({
getNode: jest.fn().mockReturnValue({
name: 'Test Tool',
parameters: { workflowInputs: { schema: [] } },
retryOnFail: true,
maxTries: 2,
waitBetweenTries: 1500,
}),
getNodeParameter: jest.fn().mockImplementation((name) => {
if (name === 'source') return 'database';
if (name === 'workflowId') return { value: 'test-workflow-id' };
if (name === 'fields.values') return [];
return {};
}),
executeWorkflow: executeWorkflowMock,
addOutputData: jest.fn(),
});
contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({
...createMockClonedContext(contextWithRetryNode, executeWorkflowMock),
getWorkflowDataProxy: jest.fn().mockReturnValue({
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
}),
getNodeParameter: contextWithRetryNode.getNodeParameter,
...cloneOverrides,
}));
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
await tool.func('test query');
expect(sleepWithAbort).toHaveBeenCalledWith(1500, undefined);
});
});
describe('abort signal functionality', () => {
let abortController: AbortController;
beforeEach(() => {
jest.clearAllMocks();
abortController = new AbortController();
});
const createAbortSignalContext = (
executeWorkflowMock: jest.MockedFunction<any>,
abortSignal?: AbortSignal,
) => {
const contextWithRetryNode = createMockContext({
getNode: jest.fn().mockReturnValue({
name: 'Test Tool',
parameters: { workflowInputs: { schema: [] } },
retryOnFail: true,
maxTries: 3,
waitBetweenTries: 100,
}),
getNodeParameter: jest.fn().mockImplementation((name) => {
if (name === 'source') return 'database';
if (name === 'workflowId') return { value: 'test-workflow-id' };
if (name === 'fields.values') return [];
return {};
}),
executeWorkflow: executeWorkflowMock,
addOutputData: jest.fn(),
});
contextWithRetryNode.cloneWith = jest.fn().mockImplementation((cloneOverrides) => ({
...createMockClonedContext(contextWithRetryNode, executeWorkflowMock),
getWorkflowDataProxy: jest.fn().mockReturnValue({
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
}),
getNodeParameter: contextWithRetryNode.getNodeParameter,
getExecutionCancelSignal: jest.fn(() => abortSignal),
...cloneOverrides,
}));
return contextWithRetryNode;
};
it('should return cancellation message if signal is already aborted', async () => {
const executeWorkflowMock = jest.fn().mockResolvedValue({
data: [[{ json: { result: 'success' } }]],
executionId: 'success-exec-id',
});
// Abort before starting
abortController.abort();
const contextWithRetryNode = createAbortSignalContext(
executeWorkflowMock,
abortController.signal,
);
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(result).toBe('There was an error: "Execution was cancelled"');
expect(executeWorkflowMock).not.toHaveBeenCalled();
});
it('should handle abort signal during retry wait', async () => {
const { sleepWithAbort } = jest.requireMock('n8n-workflow');
sleepWithAbort.mockRejectedValue(new Error('Execution was cancelled'));
const executeWorkflowMock = jest
.fn()
.mockRejectedValueOnce(new Error('First attempt fails'))
.mockResolvedValueOnce({
data: [[{ json: { result: 'success' } }]],
executionId: 'success-exec-id',
});
const contextWithRetryNode = createAbortSignalContext(
executeWorkflowMock,
abortController.signal,
);
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(result).toBe('There was an error: "Execution was cancelled"');
expect(sleepWithAbort).toHaveBeenCalledWith(100, abortController.signal);
expect(executeWorkflowMock).toHaveBeenCalledTimes(1); // Only first attempt
});
it('should handle abort signal during execution', async () => {
const executeWorkflowMock = jest.fn().mockImplementation(() => {
// Simulate abort during execution
abortController.abort();
throw new ApplicationError('Workflow execution failed');
});
const contextWithRetryNode = createAbortSignalContext(
executeWorkflowMock,
abortController.signal,
);
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(result).toBe('There was an error: "Execution was cancelled"');
expect(executeWorkflowMock).toHaveBeenCalledTimes(1);
});
it('should complete successfully if not aborted', async () => {
const { sleepWithAbort } = jest.requireMock('n8n-workflow');
sleepWithAbort.mockClear().mockResolvedValue(undefined);
const executeWorkflowMock = jest
.fn()
.mockRejectedValueOnce(new Error('First attempt fails'))
.mockResolvedValueOnce({
data: [[{ json: { result: 'success' } }]],
executionId: 'success-exec-id',
});
const contextWithRetryNode = createAbortSignalContext(
executeWorkflowMock,
abortController.signal,
);
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(result).toBe(JSON.stringify({ result: 'success' }, null, 2));
expect(executeWorkflowMock).toHaveBeenCalledTimes(2);
expect(sleepWithAbort).toHaveBeenCalledWith(100, abortController.signal);
});
it('should work when getExecutionCancelSignal is not available', async () => {
const { sleepWithAbort } = jest.requireMock('n8n-workflow');
sleepWithAbort.mockClear().mockResolvedValue(undefined);
const executeWorkflowMock = jest
.fn()
.mockRejectedValueOnce(new Error('First attempt fails'))
.mockResolvedValueOnce({
data: [[{ json: { result: 'success' } }]],
executionId: 'success-exec-id',
});
// Create context without getExecutionCancelSignal
const contextWithRetryNode = createAbortSignalContext(executeWorkflowMock, undefined);
service = new WorkflowToolService(contextWithRetryNode);
const tool = await service.createTool({
ctx: contextWithRetryNode,
name: 'Test Tool',
description: 'Test Description',
itemIndex: 0,
});
const result = await tool.func('test query');
expect(result).toBe(JSON.stringify({ result: 'success' }, null, 2));
expect(sleepWithAbort).toHaveBeenCalledWith(100, undefined);
});
});
});

View File

@@ -25,6 +25,7 @@ import {
NodeConnectionTypes,
NodeOperationError,
parseErrorMetadata,
sleepWithAbort,
traverseNodeParameters,
} from 'n8n-workflow';
import { z } from 'zod';
@@ -75,69 +76,116 @@ export class WorkflowToolService {
// This function will execute the sub-workflow and return the response
// We get the runIndex from the context to handle multiple executions
// of the same tool when the tool is used in a loop or in a parallel execution.
const node = ctx.getNode();
let runIndex: number = ctx.getNextRunIndex();
const toolHandler = async (
query: string | IDataObject,
runManager?: CallbackManagerForToolRun,
): Promise<IDataObject | IDataObject[] | string> => {
const localRunIndex = runIndex++;
// We need to clone the context here to handle runIndex correctly
// Otherwise the runIndex will be shared between different executions
// Causing incorrect data to be passed to the sub-workflow and via $fromAI
const context = this.baseContext.cloneWith({
runIndex: localRunIndex,
inputData: [[{ json: { query } }]],
});
try {
const response = await this.runFunction(context, query, itemIndex, runManager);
const processedResponse = this.handleToolResponse(response);
let responseData: INodeExecutionData[];
if (isNodeExecutionData(response)) {
responseData = response;
} else {
const reParsedData = jsonParse<IDataObject>(processedResponse, {
fallbackValue: { response: processedResponse },
});
responseData = [{ json: reParsedData }];
}
// Once the sub-workflow is executed, add the output data to the context
// This will be used to link the sub-workflow execution in the parent workflow
let metadata: ITaskMetadata | undefined;
if (this.subExecutionId && this.subWorkflowId) {
metadata = {
subExecution: {
executionId: this.subExecutionId,
workflowId: this.subWorkflowId,
},
};
}
void context.addOutputData(
NodeConnectionTypes.AiTool,
localRunIndex,
[responseData],
metadata,
);
return processedResponse;
} catch (error) {
const executionError = error as ExecutionError;
const errorResponse = `There was an error: "${executionError.message}"`;
const metadata = parseErrorMetadata(error);
void context.addOutputData(
NodeConnectionTypes.AiTool,
localRunIndex,
executionError,
metadata,
);
return errorResponse;
let maxTries = 1;
if (node.retryOnFail === true) {
maxTries = Math.min(5, Math.max(2, node.maxTries ?? 3));
}
let waitBetweenTries = 0;
if (node.retryOnFail === true) {
waitBetweenTries = Math.min(5000, Math.max(0, node.waitBetweenTries ?? 1000));
}
let lastError: ExecutionError | undefined;
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
const localRunIndex = runIndex++;
// We need to clone the context here to handle runIndex correctly
// Otherwise the runIndex will be shared between different executions
// Causing incorrect data to be passed to the sub-workflow and via $fromAI
const context = this.baseContext.cloneWith({
runIndex: localRunIndex,
inputData: [[{ json: { query } }]],
});
// Get abort signal from context for cancellation support
const abortSignal = context.getExecutionCancelSignal?.();
// Check if execution was cancelled before retry
if (abortSignal?.aborted) {
return 'There was an error: "Execution was cancelled"';
}
if (tryIndex !== 0) {
// Reset error from previous attempt
lastError = undefined;
if (waitBetweenTries !== 0) {
try {
await sleepWithAbort(waitBetweenTries, abortSignal);
} catch (abortError) {
return 'There was an error: "Execution was cancelled"';
}
}
}
try {
const response = await this.runFunction(context, query, itemIndex, runManager);
const processedResponse = this.handleToolResponse(response);
let responseData: INodeExecutionData[];
if (isNodeExecutionData(response)) {
responseData = response;
} else {
const reParsedData = jsonParse<IDataObject>(processedResponse, {
fallbackValue: { response: processedResponse },
});
responseData = [{ json: reParsedData }];
}
// Once the sub-workflow is executed, add the output data to the context
// This will be used to link the sub-workflow execution in the parent workflow
let metadata: ITaskMetadata | undefined;
if (this.subExecutionId && this.subWorkflowId) {
metadata = {
subExecution: {
executionId: this.subExecutionId,
workflowId: this.subWorkflowId,
},
};
}
void context.addOutputData(
NodeConnectionTypes.AiTool,
localRunIndex,
[responseData],
metadata,
);
return processedResponse;
} catch (error) {
// Check if error is due to cancellation
if (abortSignal?.aborted) {
return 'There was an error: "Execution was cancelled"';
}
const executionError = error as ExecutionError;
lastError = executionError;
const errorResponse = `There was an error: "${executionError.message}"`;
const metadata = parseErrorMetadata(error);
void context.addOutputData(
NodeConnectionTypes.AiTool,
localRunIndex,
executionError,
metadata,
);
if (tryIndex === maxTries - 1) {
return errorResponse;
}
}
}
return `There was an error: ${lastError?.message ?? 'Unknown error'}`;
};
// Create structured tool if input schema is provided