From 5a5848aa421eed9ebc5a940b1c642a42a1a94b9a Mon Sep 17 00:00:00 2001 From: Benjamin Schroth <68321970+schrothbn@users.noreply.github.com> Date: Fri, 4 Jul 2025 09:21:48 +0200 Subject: [PATCH] feat(AI Agent Node): Implement streaming on AI agent node (no-changelog) (#16897) --- .../nodes/agents/Agent/Agent.node.ts | 5 +- .../nodes/agents/Agent/V2/AgentV2.node.ts | 22 +- .../Agent/agents/ToolsAgent/V2/description.ts | 29 +++ .../Agent/agents/ToolsAgent/V2/execute.ts | 109 +++++++++- .../test/ToolsAgent/ToolsAgentV2.test.ts | 203 ++++++++++++++++++ packages/cli/src/scaling/job-processor.ts | 1 + packages/cli/src/scaling/scaling.types.ts | 1 + packages/cli/src/webhooks/webhook-helpers.ts | 18 +- .../src/workflow-execute-additional-data.ts | 2 + packages/cli/src/workflow-runner.ts | 2 + .../__tests__/execute-context.test.ts | 2 +- .../node-execution-context/execute-context.ts | 19 +- packages/workflow/src/interfaces.ts | 2 + 13 files changed, 401 insertions(+), 14 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts index f86ad3f9e1..a8ab5e514e 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts @@ -27,7 +27,8 @@ export class Agent extends VersionedNodeType { ], }, }, - defaultVersion: 2, + // We keep defaultVersion as 2.1 to ensure we publish streaming when everything is ready + defaultVersion: 2.1, }; const nodeVersions: IVersionedNodeType['nodeVersions'] = { @@ -42,6 +43,8 @@ export class Agent extends VersionedNodeType { 1.8: new AgentV1(baseDescription), 1.9: new AgentV1(baseDescription), 2: new AgentV2(baseDescription), + 2.1: new AgentV2(baseDescription), + 2.2: new AgentV2(baseDescription), }; super(nodeVersions, baseDescription); diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/V2/AgentV2.node.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/V2/AgentV2.node.ts index 5c2055835c..e72349de57 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/V2/AgentV2.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/V2/AgentV2.node.ts @@ -105,7 +105,7 @@ export class AgentV2 implements INodeType { constructor(baseDescription: INodeTypeBaseDescription) { this.description = { ...baseDescription, - version: 2, + version: [2, 2.1, 2.2], defaults: { name: 'AI Agent', color: '#404040', @@ -148,6 +148,11 @@ export class AgentV2 implements INodeType { type: 'boolean', default: false, noDataExpression: true, + displayOptions: { + show: { + '@version': [{ _cnd: { gte: 2.1 } }], + }, + }, }, { displayName: `Connect an output parser on the canvas to specify the output format you require`, @@ -166,6 +171,11 @@ export class AgentV2 implements INodeType { type: 'boolean', default: false, noDataExpression: true, + displayOptions: { + show: { + '@version': [{ _cnd: { gte: 2.1 } }], + }, + }, }, { displayName: @@ -181,6 +191,16 @@ export class AgentV2 implements INodeType { }, ...toolsAgentProperties, ], + hints: [ + { + message: + 'You are using streaming responses. Make sure to set the response mode to "Streaming Response" on the connected trigger node.', + type: 'warning', + location: 'outputPane', + whenToDisplay: 'afterExecution', + displayCondition: '={{ $parameter["enableStreaming"] === true }}', + }, + ], }; } diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/description.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/description.ts index d023ce7467..05612cf138 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/description.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/description.ts @@ -5,6 +5,30 @@ import { getBatchingOptionFields } from '@utils/sharedFields'; import { commonOptions } from '../options'; export const toolsAgentProperties: INodeProperties[] = [ + { + displayName: 'Options', + name: 'options', + type: 'collection', + default: {}, + placeholder: 'Add Option', + options: [ + ...commonOptions, + getBatchingOptionFields(undefined, 1), + { + displayName: 'Enable Streaming', + name: 'enableStreaming', + type: 'boolean', + default: true, + description: + 'Whether this agent will stream the response in real-time as it generates text', + }, + ], + displayOptions: { + hide: { + '@version': [{ _cnd: { lt: 2.2 } }], + }, + }, + }, { displayName: 'Options', name: 'options', @@ -12,5 +36,10 @@ export const toolsAgentProperties: INodeProperties[] = [ default: {}, placeholder: 'Add Option', options: [...commonOptions, getBatchingOptionFields(undefined, 1)], + displayOptions: { + show: { + '@version': [{ _cnd: { lt: 2.2 } }], + }, + }, }, ]; diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/execute.ts index bb2be6e04b..8899f818d7 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/V2/execute.ts @@ -1,4 +1,7 @@ +import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream'; +import type { IterableReadableStream } from '@langchain/core/dist/utils/stream'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; +import type { AIMessageChunk, MessageContentText } from '@langchain/core/messages'; import type { ChatPromptTemplate } from '@langchain/core/prompts'; import { RunnableSequence } from '@langchain/core/runnables'; import { @@ -73,6 +76,87 @@ function createAgentExecutor( }); } +async function processEventStream( + ctx: IExecuteFunctions, + eventStream: IterableReadableStream, + returnIntermediateSteps: boolean = false, +): Promise<{ output: string; intermediateSteps?: any[] }> { + const agentResult: { output: string; intermediateSteps?: any[] } = { + output: '', + }; + + if (returnIntermediateSteps) { + agentResult.intermediateSteps = []; + } + + ctx.sendChunk('begin'); + for await (const event of eventStream) { + // Stream chat model tokens as they come in + switch (event.event) { + case 'on_chat_model_stream': + const chunk = event.data?.chunk as AIMessageChunk; + if (chunk?.content) { + const chunkContent = chunk.content; + let chunkText = ''; + if (Array.isArray(chunkContent)) { + for (const message of chunkContent) { + chunkText += (message as MessageContentText)?.text; + } + } else if (typeof chunkContent === 'string') { + chunkText = chunkContent; + } + ctx.sendChunk('item', chunkText); + + agentResult.output += chunkText; + } + break; + case 'on_chat_model_end': + // Capture full LLM response with tool calls for intermediate steps + if (returnIntermediateSteps && event.data) { + const chatModelData = event.data as any; + const output = chatModelData.output; + + // Check if this LLM response contains tool calls + if (output?.tool_calls && output.tool_calls.length > 0) { + for (const toolCall of output.tool_calls) { + agentResult.intermediateSteps!.push({ + action: { + tool: toolCall.name, + toolInput: toolCall.args, + log: + output.content || + `Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`, + messageLog: [output], // Include the full LLM response + toolCallId: toolCall.id, + type: toolCall.type, + }, + }); + } + } + } + break; + case 'on_tool_end': + // Capture tool execution results and match with action + if (returnIntermediateSteps && event.data && agentResult.intermediateSteps!.length > 0) { + const toolData = event.data as any; + // Find the matching intermediate step for this tool call + const matchingStep = agentResult.intermediateSteps!.find( + (step) => !step.observation && step.action.tool === event.name, + ); + if (matchingStep) { + matchingStep.observation = toolData.output; + } + } + break; + default: + break; + } + } + ctx.sendChunk('end'); + + return agentResult; +} + /* ----------------------------------------------------------- Main Executor Function ----------------------------------------------------------- */ @@ -109,6 +193,9 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise { @@ -159,7 +246,27 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise= 2.1) { + const chatHistory = await memory?.chatHistory.getMessages(); + const eventStream = executor.streamEvents( + { + ...invokeParams, + chat_history: chatHistory ?? undefined, + }, + { + version: 'v2', + ...executeOptions, + }, + ); + + return await processEventStream(this, eventStream, options.returnIntermediateSteps); + } else { + // Handle regular execution + return await executor.invoke(invokeParams, executeOptions); + } }); const batchResults = await Promise.allSettled(batchPromises); diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/test/ToolsAgent/ToolsAgentV2.test.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/test/ToolsAgent/ToolsAgentV2.test.ts index 31532ec24f..efffac9584 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/test/ToolsAgent/ToolsAgentV2.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/test/ToolsAgent/ToolsAgentV2.test.ts @@ -417,4 +417,207 @@ describe('toolsAgentExecute', () => { expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(5, mockContext, 3); expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(6, mockContext, 0); }); + + describe('streaming', () => { + let mockNode: INode; + let mockModel: BaseChatModel; + + beforeEach(() => { + jest.clearAllMocks(); + mockNode = mock(); + mockNode.typeVersion = 2.2; + mockContext.getNode.mockReturnValue(mockNode); + mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]); + + mockModel = mock(); + mockModel.bindTools = jest.fn(); + mockModel.lc_namespace = ['chat_models']; + mockContext.getInputConnectionData.mockImplementation(async (type, _index) => { + if (type === 'ai_languageModel') return mockModel; + if (type === 'ai_memory') return undefined; + return undefined; + }); + + mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { + if (param === 'enableStreaming') return true; + if (param === 'text') return 'test input'; + if (param === 'options.batching.batchSize') return defaultValue; + if (param === 'options.batching.delayBetweenBatches') return defaultValue; + if (param === 'options') + return { + systemMessage: 'You are a helpful assistant', + maxIterations: 10, + returnIntermediateSteps: false, + passthroughBinaryImages: true, + }; + return defaultValue; + }); + }); + + it('should handle streaming when enableStreaming is true', async () => { + jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); + jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); + mockContext.isStreaming.mockReturnValue(true); + + // Mock async generator for streamEvents + const mockStreamEvents = async function* () { + yield { + event: 'on_chat_model_stream', + data: { + chunk: { + content: 'Hello ', + }, + }, + }; + yield { + event: 'on_chat_model_stream', + data: { + chunk: { + content: 'world!', + }, + }, + }; + }; + + const mockExecutor = { + streamEvents: jest.fn().mockReturnValue(mockStreamEvents()), + }; + + jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); + + const result = await toolsAgentExecute.call(mockContext); + + expect(mockContext.sendChunk).toHaveBeenCalledWith('begin'); + expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'Hello '); + expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'world!'); + expect(mockContext.sendChunk).toHaveBeenCalledWith('end'); + expect(mockExecutor.streamEvents).toHaveBeenCalledTimes(1); + expect(result[0]).toHaveLength(1); + expect(result[0][0].json.output).toBe('Hello world!'); + }); + + it('should capture intermediate steps during streaming when returnIntermediateSteps is true', async () => { + jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); + jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); + + mockContext.isStreaming.mockReturnValue(true); + + mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => { + if (param === 'enableStreaming') return true; + if (param === 'text') return 'test input'; + if (param === 'options.batching.batchSize') return defaultValue; + if (param === 'options.batching.delayBetweenBatches') return defaultValue; + if (param === 'options') + return { + systemMessage: 'You are a helpful assistant', + maxIterations: 10, + returnIntermediateSteps: true, // Enable intermediate steps + passthroughBinaryImages: true, + }; + return defaultValue; + }); + + // Mock async generator for streamEvents with tool calls + const mockStreamEvents = async function* () { + // LLM response with tool call + yield { + event: 'on_chat_model_end', + data: { + output: { + content: 'I need to call a tool', + tool_calls: [ + { + id: 'call_123', + name: 'TestTool', + args: { input: 'test data' }, + type: 'function', + }, + ], + }, + }, + }; + // Tool execution result + yield { + event: 'on_tool_end', + name: 'TestTool', + data: { + output: 'Tool execution result', + }, + }; + // Final LLM response + yield { + event: 'on_chat_model_stream', + data: { + chunk: { + content: 'Final response', + }, + }, + }; + }; + + const mockExecutor = { + streamEvents: jest.fn().mockReturnValue(mockStreamEvents()), + }; + + jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); + + const result = await toolsAgentExecute.call(mockContext); + + expect(result[0]).toHaveLength(1); + expect(result[0][0].json.output).toBe('Final response'); + + // Check intermediate steps + expect(result[0][0].json.intermediateSteps).toBeDefined(); + expect(result[0][0].json.intermediateSteps).toHaveLength(1); + + const step = (result[0][0].json.intermediateSteps as any[])[0]; + expect(step.action).toBeDefined(); + expect(step.action.tool).toBe('TestTool'); + expect(step.action.toolInput).toEqual({ input: 'test data' }); + expect(step.action.toolCallId).toBe('call_123'); + expect(step.action.type).toBe('function'); + expect(step.action.messageLog).toBeDefined(); + expect(step.observation).toBe('Tool execution result'); + }); + + it('should use regular execution on version 2.2 when enableStreaming is false', async () => { + jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); + jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); + + const mockExecutor = { + invoke: jest.fn().mockResolvedValue({ output: 'Regular response' }), + streamEvents: jest.fn(), + }; + + jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); + + const result = await toolsAgentExecute.call(mockContext); + + expect(mockContext.sendChunk).not.toHaveBeenCalled(); + expect(mockExecutor.invoke).toHaveBeenCalledTimes(1); + expect(mockExecutor.streamEvents).not.toHaveBeenCalled(); + expect(result[0][0].json.output).toBe('Regular response'); + }); + + it('should use regular execution on version 2.2 when streaming is not available', async () => { + mockContext.isStreaming.mockReturnValue(false); + + jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock()]); + jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined); + + const mockExecutor = { + invoke: jest.fn().mockResolvedValue({ output: 'Regular response' }), + streamEvents: jest.fn(), + }; + + jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any); + + const result = await toolsAgentExecute.call(mockContext); + + expect(mockContext.sendChunk).not.toHaveBeenCalled(); + expect(mockExecutor.invoke).toHaveBeenCalledTimes(1); + expect(mockExecutor.streamEvents).not.toHaveBeenCalled(); + expect(result[0][0].json.output).toBe('Regular response'); + }); + }); }); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index f5c04b5d29..1e48b990d4 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -122,6 +122,7 @@ export class JobProcessor { undefined, executionTimeoutTimestamp, ); + additionalData.streamingEnabled = job.data.streamingEnabled; const { pushRef } = job.data; diff --git a/packages/cli/src/scaling/scaling.types.ts b/packages/cli/src/scaling/scaling.types.ts index 5381fbd24a..33576a6eff 100644 --- a/packages/cli/src/scaling/scaling.types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -19,6 +19,7 @@ export type JobData = { executionId: string; loadStaticData: boolean; pushRef?: string; + streamingEnabled?: boolean; }; export type JobResult = { diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index ac4aad286d..50447f8225 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -656,15 +656,6 @@ export async function executeWebhook( ); } - // Start now to run the workflow - executionId = await Container.get(WorkflowRunner).run( - runData, - true, - !didSendResponse, - executionId, - responsePromise, - ); - if (responseMode === 'streaming') { Container.get(Logger).debug( `Execution of workflow "${workflow.name}" from with ID ${executionId} is set to streaming`, @@ -676,6 +667,15 @@ export async function executeWebhook( didSendResponse = true; } + // Start now to run the workflow + executionId = await Container.get(WorkflowRunner).run( + runData, + true, + !didSendResponse, + executionId, + responsePromise, + ); + if (responseMode === 'formPage' && !didSendResponse) { res.send({ formWaitingUrl: `${additionalData.formWaitingBaseUrl}/${executionId}` }); process.nextTick(() => res.end()); diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index d905aa2f28..c8c34db6f8 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -237,6 +237,8 @@ async function startExecution( if (additionalData.httpResponse) { additionalDataIntegrated.httpResponse = additionalData.httpResponse; } + // Propagate streaming state to subworkflows + additionalDataIntegrated.streamingEnabled = additionalData.streamingEnabled; let subworkflowTimeout = additionalData.executionTimeoutTimestamp; const workflowSettings = workflowData.settings; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 9aaf49cdd0..343a8dd873 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -248,6 +248,7 @@ export class WorkflowRunner { ); // TODO: set this in queue mode as well additionalData.restartExecutionId = restartExecutionId; + additionalData.streamingEnabled = data.streamingEnabled; additionalData.executionId = executionId; @@ -357,6 +358,7 @@ export class WorkflowRunner { executionId, loadStaticData: !!loadStaticData, pushRef: data.pushRef, + streamingEnabled: data.streamingEnabled, }; if (!this.scalingService) { diff --git a/packages/core/src/execution-engine/node-execution-context/__tests__/execute-context.test.ts b/packages/core/src/execution-engine/node-execution-context/__tests__/execute-context.test.ts index 6322a364c2..ec167bb031 100644 --- a/packages/core/src/execution-engine/node-execution-context/__tests__/execute-context.test.ts +++ b/packages/core/src/execution-engine/node-execution-context/__tests__/execute-context.test.ts @@ -297,7 +297,7 @@ describe('ExecuteContext', () => { expect(hooksMock.runHook).toHaveBeenCalledWith('sendChunk', [ expect.objectContaining({ type: 'item', - content: '"test"', + content: 'test', metadata: expect.objectContaining({ nodeName: 'Test Node', nodeId: 'test-node-id', diff --git a/packages/core/src/execution-engine/node-execution-context/execute-context.ts b/packages/core/src/execution-engine/node-execution-context/execute-context.ts index bf40232491..d651687828 100644 --- a/packages/core/src/execution-engine/node-execution-context/execute-context.ts +++ b/packages/core/src/execution-engine/node-execution-context/execute-context.ts @@ -131,6 +131,21 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti )) as IExecuteFunctions['getNodeParameter']; } + isStreaming(): boolean { + // Check if we have sendChunk handlers + const handlers = this.additionalData.hooks?.handlers?.sendChunk?.length; + const hasHandlers = handlers !== undefined && handlers > 0; + + // Check if streaming was enabled for this execution + const streamingEnabled = this.additionalData.streamingEnabled === true; + + // Check current execution mode supports streaming + const executionModeSupportsStreaming = ['manual', 'webhook', 'integrated']; + const isStreamingMode = executionModeSupportsStreaming.includes(this.mode); + + return hasHandlers && isStreamingMode && streamingEnabled; + } + async sendChunk(type: ChunkType, content?: IDataObject | string): Promise { const node = this.getNode(); const metadata = { @@ -139,9 +154,11 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti timestamp: Date.now(), }; + const parsedContent = typeof content === 'string' ? content : JSON.stringify(content); + const message: StructuredChunk = { type, - content: content ? JSON.stringify(content) : undefined, + content: parsedContent, metadata, }; diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 194345a8c9..abd9ff15ac 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -920,6 +920,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & sendMessageToUI(message: any): void; sendResponse(response: IExecuteResponsePromiseData): void; sendChunk(type: ChunkType, content?: IDataObject | string): void; + isStreaming(): boolean; // TODO: Make this one then only available in the new config one addInputData( @@ -2394,6 +2395,7 @@ export interface IWorkflowExecuteAdditionalData { currentNodeExecutionIndex: number; httpResponse?: express.Response; httpRequest?: express.Request; + streamingEnabled?: boolean; restApiUrl: string; instanceBaseUrl: string; setExecutionStatus?: (status: ExecutionStatus) => void;