import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { mock } from 'jest-mock-extended'; import { AgentExecutor } from 'langchain/agents'; import type { Tool } from 'langchain/tools'; import type { ISupplyDataFunctions, IExecuteFunctions, INode } from 'n8n-workflow'; import * as helpers from '../../../../../utils/helpers'; import * as outputParserModule from '../../../../../utils/output_parsers/N8nOutputParser'; import { toolsAgentExecute } from '../../agents/ToolsAgent/V2/execute'; jest.mock('../../../../../utils/output_parsers/N8nOutputParser', () => ({ getOptionalOutputParser: jest.fn(), N8nStructuredOutputParser: jest.fn(), })); const mockHelpers = mock(); const mockContext = mock({ helpers: mockHelpers }); beforeEach(() => { jest.clearAllMocks(); jest.resetAllMocks(); }); describe('toolsAgentExecute', () => { beforeEach(() => { jest.clearAllMocks(); mockContext.logger = { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn(), }; }); it('should process items sequentially when batchSize is not set', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockTools = [mock()]; jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue(mockTools); // Mock getNodeParameter to return default values mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'text') return 'test input'; if (param === 'needsFallback') return false; if (param === 'options.batching.batchSize') return defaultValue; if (param === 'options.batching.delayBetweenBatches') return defaultValue; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: { text: 'success 1' } }) .mockResolvedValueOnce({ output: { text: 'success 2' } }), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(mockExecutor.invoke).toHaveBeenCalledTimes(2); expect(result[0]).toHaveLength(2); expect(result[0][0].json).toEqual({ output: { text: 'success 1' } }); expect(result[0][1].json).toEqual({ output: { text: 'success 2' } }); }); it('should process items in parallel within batches when batchSize > 1', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, { json: { text: 'test input 3' } }, { json: { text: 'test input 4' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockTools = [mock()]; jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue(mockTools); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'options.batching.batchSize') return 2; if (param === 'options.batching.delayBetweenBatches') return 100; if (param === 'text') return 'test input'; if (param === 'needsFallback') return false; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: { text: 'success 1' } }) .mockResolvedValueOnce({ output: { text: 'success 2' } }) .mockResolvedValueOnce({ output: { text: 'success 3' } }) .mockResolvedValueOnce({ output: { text: 'success 4' } }), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(mockExecutor.invoke).toHaveBeenCalledTimes(4); // Each item is processed individually expect(result[0]).toHaveLength(4); expect(result[0][0].json).toEqual({ output: { text: 'success 1' } }); expect(result[0][1].json).toEqual({ output: { text: 'success 2' } }); expect(result[0][2].json).toEqual({ output: { text: 'success 3' } }); expect(result[0][3].json).toEqual({ output: { text: 'success 4' } }); }); it('should handle errors in batch processing when continueOnFail is true', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockTools = [mock()]; jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue(mockTools); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'options.batching.batchSize') return 2; if (param === 'options.batching.delayBetweenBatches') return 0; if (param === 'text') return 'test input'; if (param === 'needsFallback') return false; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); mockContext.continueOnFail.mockReturnValue(true); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: { text: 'success' } }) .mockRejectedValueOnce(new Error('Test error')), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(result[0]).toHaveLength(2); expect(result[0][0].json).toEqual({ output: { text: 'success' } }); expect(result[0][1].json).toEqual({ error: 'Test error' }); }); it('should throw error in batch processing when continueOnFail is false', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockTools = [mock()]; jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue(mockTools); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'options.batching.batchSize') return 2; if (param === 'options.batching.delayBetweenBatches') return 0; if (param === 'text') return 'test input'; if (param === 'needsFallback') return false; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); mockContext.continueOnFail.mockReturnValue(false); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success' }) }) .mockRejectedValueOnce(new Error('Test error')), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); await expect(toolsAgentExecute.call(mockContext)).rejects.toThrow('Test error'); }); it('should fetch output parser with correct item index', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, { json: { text: 'test input 3' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockTools = [mock()]; jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue(mockTools); const mockParser1 = mock(); const mockParser2 = mock(); const mockParser3 = mock(); const getOptionalOutputParserSpy = jest .spyOn(outputParserModule, 'getOptionalOutputParser') .mockResolvedValueOnce(mockParser1) .mockResolvedValueOnce(mockParser2) .mockResolvedValueOnce(mockParser3) .mockResolvedValueOnce(undefined); // For the check call mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'text') return 'test input'; if (param === 'options.batching.batchSize') return defaultValue; if (param === 'options.batching.delayBetweenBatches') return defaultValue; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 1' }) }) .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 2' }) }) .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 3' }) }), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); await toolsAgentExecute.call(mockContext); // Verify getOptionalOutputParser was called with correct indices expect(getOptionalOutputParserSpy).toHaveBeenCalledTimes(6); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(1, mockContext, 0); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(2, mockContext, 0); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(3, mockContext, 1); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(4, mockContext, 0); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(5, mockContext, 2); }); it('should pass different output parsers to getTools for each item', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockParser1 = mock(); const mockParser2 = mock(); jest .spyOn(outputParserModule, 'getOptionalOutputParser') .mockResolvedValueOnce(mockParser1) .mockResolvedValueOnce(mockParser2); const getToolsSpy = jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'text') return 'test input'; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 1' }) }) .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 2' }) }), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); await toolsAgentExecute.call(mockContext); // Verify getTools was called with different parsers expect(getToolsSpy).toHaveBeenCalledTimes(2); expect(getToolsSpy).toHaveBeenNthCalledWith(1, mockContext, true, false); expect(getToolsSpy).toHaveBeenNthCalledWith(2, mockContext, true, false); }); it('should maintain correct parser-item mapping in batch processing', async () => { const mockNode = mock(); mockNode.typeVersion = 2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([ { json: { text: 'test input 1' } }, { json: { text: 'test input 2' } }, { json: { text: 'test input 3' } }, { json: { text: 'test input 4' } }, ]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockResolvedValue(mockModel); const mockParsers = [ mock(), mock(), mock(), mock(), ]; const getOptionalOutputParserSpy = jest .spyOn(outputParserModule, 'getOptionalOutputParser') .mockImplementation(async (_ctx, index) => mockParsers[index || 0]); jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'options.batching.batchSize') return 2; if (param === 'options.batching.delayBetweenBatches') return 0; if (param === 'text') return 'test input'; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); const mockExecutor = { invoke: jest .fn() .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 1' }) }) .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 2' }) }) .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 3' }) }) .mockResolvedValueOnce({ output: JSON.stringify({ text: 'success 4' }) }), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); await toolsAgentExecute.call(mockContext); // Verify each item got its corresponding parser based on index // It's called once per item + once to check if output parser is connected expect(getOptionalOutputParserSpy).toHaveBeenCalledTimes(6); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(1, mockContext, 0); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(2, mockContext, 1); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(3, mockContext, 0); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(4, mockContext, 2); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(5, mockContext, 3); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(6, mockContext, 0); }); describe('streaming', () => { let mockNode: INode; let mockModel: BaseChatModel; beforeEach(() => { jest.clearAllMocks(); mockNode = mock(); mockNode.typeVersion = 2.2; mockContext.getNode.mockReturnValue(mockNode); mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]); mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockContext.getInputConnectionData.mockImplementation(async (type, _index) => { if (type === 'ai_languageModel') return mockModel; if (type === 'ai_memory') return undefined; return undefined; }); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'enableStreaming') return true; if (param === 'text') return 'test input'; if (param === 'options.batching.batchSize') return defaultValue; if (param === 'options.batching.delayBetweenBatches') return defaultValue; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); }); it('should handle streaming when enableStreaming is true', async () => { jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); mockContext.isStreaming.mockReturnValue(true); // Mock async generator for streamEvents const mockStreamEvents = async function* () { yield { event: 'on_chat_model_stream', data: { chunk: { content: 'Hello ', }, }, }; yield { event: 'on_chat_model_stream', data: { chunk: { content: 'world!', }, }, }; }; const mockExecutor = { streamEvents: jest.fn().mockReturnValue(mockStreamEvents()), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(mockContext.sendChunk).toHaveBeenCalledWith('begin'); expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'Hello '); expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'world!'); expect(mockContext.sendChunk).toHaveBeenCalledWith('end'); expect(mockExecutor.streamEvents).toHaveBeenCalledTimes(1); expect(result[0]).toHaveLength(1); expect(result[0][0].json.output).toBe('Hello world!'); }); it('should capture intermediate steps during streaming when returnIntermediateSteps is true', async () => { jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); mockContext.isStreaming.mockReturnValue(true); mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'enableStreaming') return true; if (param === 'text') return 'test input'; if (param === 'options.batching.batchSize') return defaultValue; if (param === 'options.batching.delayBetweenBatches') return defaultValue; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: true, // Enable intermediate steps passthroughBinaryImages: true, }; return defaultValue; }); // Mock async generator for streamEvents with tool calls const mockStreamEvents = async function* () { // LLM response with tool call yield { event: 'on_chat_model_end', data: { output: { content: 'I need to call a tool', tool_calls: [ { id: 'call_123', name: 'TestTool', args: { input: 'test data' }, type: 'function', }, ], }, }, }; // Tool execution result yield { event: 'on_tool_end', name: 'TestTool', data: { output: 'Tool execution result', }, }; // Final LLM response yield { event: 'on_chat_model_stream', data: { chunk: { content: 'Final response', }, }, }; }; const mockExecutor = { streamEvents: jest.fn().mockReturnValue(mockStreamEvents()), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(result[0]).toHaveLength(1); expect(result[0][0].json.output).toBe('Final response'); // Check intermediate steps expect(result[0][0].json.intermediateSteps).toBeDefined(); expect(result[0][0].json.intermediateSteps).toHaveLength(1); const step = (result[0][0].json.intermediateSteps as any[])[0]; expect(step.action).toBeDefined(); expect(step.action.tool).toBe('TestTool'); expect(step.action.toolInput).toEqual({ input: 'test data' }); expect(step.action.toolCallId).toBe('call_123'); expect(step.action.type).toBe('function'); expect(step.action.messageLog).toBeDefined(); expect(step.observation).toBe('Tool execution result'); }); it('should use regular execution on version 2.2 when enableStreaming is false', async () => { jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); const mockExecutor = { invoke: jest.fn().mockResolvedValue({ output: 'Regular response' }), streamEvents: jest.fn(), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(mockContext.sendChunk).not.toHaveBeenCalled(); expect(mockExecutor.invoke).toHaveBeenCalledTimes(1); expect(mockExecutor.streamEvents).not.toHaveBeenCalled(); expect(result[0][0].json.output).toBe('Regular response'); }); it('should use regular execution on version 2.2 when streaming is not available', async () => { mockContext.isStreaming.mockReturnValue(false); jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); const mockExecutor = { invoke: jest.fn().mockResolvedValue({ output: 'Regular response' }), streamEvents: jest.fn(), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockContext); expect(mockContext.sendChunk).not.toHaveBeenCalled(); expect(mockExecutor.invoke).toHaveBeenCalledTimes(1); expect(mockExecutor.streamEvents).not.toHaveBeenCalled(); expect(result[0][0].json.output).toBe('Regular response'); }); }); it('should process items if SupplyDataContext is passed and isStreaming is not set', async () => { const mockSupplyDataContext = mock(); // @ts-expect-error isStreaming is not supported by SupplyDataFunctions, but mock object still resolves it mockSupplyDataContext.isStreaming = undefined; mockSupplyDataContext.logger = { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn(), }; const mockNode = mock(); mockNode.typeVersion = 2.2; // version where streaming is supported mockSupplyDataContext.getNode.mockReturnValue(mockNode); mockSupplyDataContext.getInputData.mockReturnValue([{ json: { text: 'test input 1' } }]); const mockModel = mock(); mockModel.bindTools = jest.fn(); mockModel.lc_namespace = ['chat_models']; mockSupplyDataContext.getInputConnectionData.mockResolvedValue(mockModel); const mockTools = [mock()]; jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue(mockTools); // Mock getNodeParameter to return default values mockSupplyDataContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { if (param === 'enableStreaming') return true; if (param === 'text') return 'test input'; if (param === 'needsFallback') return false; if (param === 'options.batching.batchSize') return defaultValue; if (param === 'options.batching.delayBetweenBatches') return defaultValue; if (param === 'options') return { systemMessage: 'You are a helpful assistant', maxIterations: 10, returnIntermediateSteps: false, passthroughBinaryImages: true, }; return defaultValue; }); const mockExecutor = { invoke: jest.fn().mockResolvedValueOnce({ output: { text: 'success 1' } }), }; jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); const result = await toolsAgentExecute.call(mockSupplyDataContext); expect(mockExecutor.invoke).toHaveBeenCalledTimes(1); expect(result[0]).toHaveLength(1); expect(result[0][0].json).toEqual({ output: { text: 'success 1' } }); }); });