feat(AI Agent Node): Implement streaming on AI agent node (no-changelog) (#16897)

This commit is contained in:
Benjamin Schroth
2025-07-04 09:21:48 +02:00
committed by GitHub
parent bd69907477
commit 5a5848aa42
13 changed files with 401 additions and 14 deletions

View File

@@ -27,7 +27,8 @@ export class Agent extends VersionedNodeType {
],
},
},
defaultVersion: 2,
// We keep defaultVersion as 2.1 to ensure we publish streaming when everything is ready
defaultVersion: 2.1,
};
const nodeVersions: IVersionedNodeType['nodeVersions'] = {
@@ -42,6 +43,8 @@ export class Agent extends VersionedNodeType {
1.8: new AgentV1(baseDescription),
1.9: new AgentV1(baseDescription),
2: new AgentV2(baseDescription),
2.1: new AgentV2(baseDescription),
2.2: new AgentV2(baseDescription),
};
super(nodeVersions, baseDescription);

View File

@@ -105,7 +105,7 @@ export class AgentV2 implements INodeType {
constructor(baseDescription: INodeTypeBaseDescription) {
this.description = {
...baseDescription,
version: 2,
version: [2, 2.1, 2.2],
defaults: {
name: 'AI Agent',
color: '#404040',
@@ -148,6 +148,11 @@ export class AgentV2 implements INodeType {
type: 'boolean',
default: false,
noDataExpression: true,
displayOptions: {
show: {
'@version': [{ _cnd: { gte: 2.1 } }],
},
},
},
{
displayName: `Connect an <a data-action='openSelectiveNodeCreator' data-action-parameter-connectiontype='${NodeConnectionTypes.AiOutputParser}'>output parser</a> on the canvas to specify the output format you require`,
@@ -166,6 +171,11 @@ export class AgentV2 implements INodeType {
type: 'boolean',
default: false,
noDataExpression: true,
displayOptions: {
show: {
'@version': [{ _cnd: { gte: 2.1 } }],
},
},
},
{
displayName:
@@ -181,6 +191,16 @@ export class AgentV2 implements INodeType {
},
...toolsAgentProperties,
],
hints: [
{
message:
'You are using streaming responses. Make sure to set the response mode to "Streaming Response" on the connected trigger node.',
type: 'warning',
location: 'outputPane',
whenToDisplay: 'afterExecution',
displayCondition: '={{ $parameter["enableStreaming"] === true }}',
},
],
};
}

View File

@@ -5,6 +5,30 @@ import { getBatchingOptionFields } from '@utils/sharedFields';
import { commonOptions } from '../options';
export const toolsAgentProperties: INodeProperties[] = [
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [
...commonOptions,
getBatchingOptionFields(undefined, 1),
{
displayName: 'Enable Streaming',
name: 'enableStreaming',
type: 'boolean',
default: true,
description:
'Whether this agent will stream the response in real-time as it generates text',
},
],
displayOptions: {
hide: {
'@version': [{ _cnd: { lt: 2.2 } }],
},
},
},
{
displayName: 'Options',
name: 'options',
@@ -12,5 +36,10 @@ export const toolsAgentProperties: INodeProperties[] = [
default: {},
placeholder: 'Add Option',
options: [...commonOptions, getBatchingOptionFields(undefined, 1)],
displayOptions: {
show: {
'@version': [{ _cnd: { lt: 2.2 } }],
},
},
},
];

View File

@@ -1,4 +1,7 @@
import type { StreamEvent } from '@langchain/core/dist/tracers/event_stream';
import type { IterableReadableStream } from '@langchain/core/dist/utils/stream';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AIMessageChunk, MessageContentText } from '@langchain/core/messages';
import type { ChatPromptTemplate } from '@langchain/core/prompts';
import { RunnableSequence } from '@langchain/core/runnables';
import {
@@ -73,6 +76,87 @@ function createAgentExecutor(
});
}
async function processEventStream(
ctx: IExecuteFunctions,
eventStream: IterableReadableStream<StreamEvent>,
returnIntermediateSteps: boolean = false,
): Promise<{ output: string; intermediateSteps?: any[] }> {
const agentResult: { output: string; intermediateSteps?: any[] } = {
output: '',
};
if (returnIntermediateSteps) {
agentResult.intermediateSteps = [];
}
ctx.sendChunk('begin');
for await (const event of eventStream) {
// Stream chat model tokens as they come in
switch (event.event) {
case 'on_chat_model_stream':
const chunk = event.data?.chunk as AIMessageChunk;
if (chunk?.content) {
const chunkContent = chunk.content;
let chunkText = '';
if (Array.isArray(chunkContent)) {
for (const message of chunkContent) {
chunkText += (message as MessageContentText)?.text;
}
} else if (typeof chunkContent === 'string') {
chunkText = chunkContent;
}
ctx.sendChunk('item', chunkText);
agentResult.output += chunkText;
}
break;
case 'on_chat_model_end':
// Capture full LLM response with tool calls for intermediate steps
if (returnIntermediateSteps && event.data) {
const chatModelData = event.data as any;
const output = chatModelData.output;
// Check if this LLM response contains tool calls
if (output?.tool_calls && output.tool_calls.length > 0) {
for (const toolCall of output.tool_calls) {
agentResult.intermediateSteps!.push({
action: {
tool: toolCall.name,
toolInput: toolCall.args,
log:
output.content ||
`Calling ${toolCall.name} with input: ${JSON.stringify(toolCall.args)}`,
messageLog: [output], // Include the full LLM response
toolCallId: toolCall.id,
type: toolCall.type,
},
});
}
}
}
break;
case 'on_tool_end':
// Capture tool execution results and match with action
if (returnIntermediateSteps && event.data && agentResult.intermediateSteps!.length > 0) {
const toolData = event.data as any;
// Find the matching intermediate step for this tool call
const matchingStep = agentResult.intermediateSteps!.find(
(step) => !step.observation && step.action.tool === event.name,
);
if (matchingStep) {
matchingStep.observation = toolData.output;
}
}
break;
default:
break;
}
}
ctx.sendChunk('end');
return agentResult;
}
/* -----------------------------------------------------------
Main Executor Function
----------------------------------------------------------- */
@@ -109,6 +193,9 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
);
}
// Check if streaming is enabled
const enableStreaming = this.getNodeParameter('options.enableStreaming', 0, true) as boolean;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
@@ -159,7 +246,27 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
};
const executeOptions = { signal: this.getExecutionCancelSignal() };
// Check if streaming is actually available
const isStreamingAvailable = this.isStreaming();
if (enableStreaming && isStreamingAvailable && this.getNode().typeVersion >= 2.1) {
const chatHistory = await memory?.chatHistory.getMessages();
const eventStream = executor.streamEvents(
{
...invokeParams,
chat_history: chatHistory ?? undefined,
},
{
version: 'v2',
...executeOptions,
},
);
return await processEventStream(this, eventStream, options.returnIntermediateSteps);
} else {
// Handle regular execution
return await executor.invoke(invokeParams, executeOptions);
}
});
const batchResults = await Promise.allSettled(batchPromises);

View File

@@ -417,4 +417,207 @@ describe('toolsAgentExecute', () => {
expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(5, mockContext, 3);
expect(getOptionalOutputParserSpy).toHaveBeenNthCalledWith(6, mockContext, 0);
});
describe('streaming', () => {
let mockNode: INode;
let mockModel: BaseChatModel;
beforeEach(() => {
jest.clearAllMocks();
mockNode = mock<INode>();
mockNode.typeVersion = 2.2;
mockContext.getNode.mockReturnValue(mockNode);
mockContext.getInputData.mockReturnValue([{ json: { text: 'test input' } }]);
mockModel = mock<BaseChatModel>();
mockModel.bindTools = jest.fn();
mockModel.lc_namespace = ['chat_models'];
mockContext.getInputConnectionData.mockImplementation(async (type, _index) => {
if (type === 'ai_languageModel') return mockModel;
if (type === 'ai_memory') return undefined;
return undefined;
});
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'enableStreaming') return true;
if (param === 'text') return 'test input';
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: false,
passthroughBinaryImages: true,
};
return defaultValue;
});
});
it('should handle streaming when enableStreaming is true', async () => {
jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined);
mockContext.isStreaming.mockReturnValue(true);
// Mock async generator for streamEvents
const mockStreamEvents = async function* () {
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'Hello ',
},
},
};
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'world!',
},
},
};
};
const mockExecutor = {
streamEvents: jest.fn().mockReturnValue(mockStreamEvents()),
};
jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any);
const result = await toolsAgentExecute.call(mockContext);
expect(mockContext.sendChunk).toHaveBeenCalledWith('begin');
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'Hello ');
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'world!');
expect(mockContext.sendChunk).toHaveBeenCalledWith('end');
expect(mockExecutor.streamEvents).toHaveBeenCalledTimes(1);
expect(result[0]).toHaveLength(1);
expect(result[0][0].json.output).toBe('Hello world!');
});
it('should capture intermediate steps during streaming when returnIntermediateSteps is true', async () => {
jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined);
mockContext.isStreaming.mockReturnValue(true);
mockContext.getNodeParameter.mockImplementation((param, _i, defaultValue) => {
if (param === 'enableStreaming') return true;
if (param === 'text') return 'test input';
if (param === 'options.batching.batchSize') return defaultValue;
if (param === 'options.batching.delayBetweenBatches') return defaultValue;
if (param === 'options')
return {
systemMessage: 'You are a helpful assistant',
maxIterations: 10,
returnIntermediateSteps: true, // Enable intermediate steps
passthroughBinaryImages: true,
};
return defaultValue;
});
// Mock async generator for streamEvents with tool calls
const mockStreamEvents = async function* () {
// LLM response with tool call
yield {
event: 'on_chat_model_end',
data: {
output: {
content: 'I need to call a tool',
tool_calls: [
{
id: 'call_123',
name: 'TestTool',
args: { input: 'test data' },
type: 'function',
},
],
},
},
};
// Tool execution result
yield {
event: 'on_tool_end',
name: 'TestTool',
data: {
output: 'Tool execution result',
},
};
// Final LLM response
yield {
event: 'on_chat_model_stream',
data: {
chunk: {
content: 'Final response',
},
},
};
};
const mockExecutor = {
streamEvents: jest.fn().mockReturnValue(mockStreamEvents()),
};
jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any);
const result = await toolsAgentExecute.call(mockContext);
expect(result[0]).toHaveLength(1);
expect(result[0][0].json.output).toBe('Final response');
// Check intermediate steps
expect(result[0][0].json.intermediateSteps).toBeDefined();
expect(result[0][0].json.intermediateSteps).toHaveLength(1);
const step = (result[0][0].json.intermediateSteps as any[])[0];
expect(step.action).toBeDefined();
expect(step.action.tool).toBe('TestTool');
expect(step.action.toolInput).toEqual({ input: 'test data' });
expect(step.action.toolCallId).toBe('call_123');
expect(step.action.type).toBe('function');
expect(step.action.messageLog).toBeDefined();
expect(step.observation).toBe('Tool execution result');
});
it('should use regular execution on version 2.2 when enableStreaming is false', async () => {
jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined);
const mockExecutor = {
invoke: jest.fn().mockResolvedValue({ output: 'Regular response' }),
streamEvents: jest.fn(),
};
jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any);
const result = await toolsAgentExecute.call(mockContext);
expect(mockContext.sendChunk).not.toHaveBeenCalled();
expect(mockExecutor.invoke).toHaveBeenCalledTimes(1);
expect(mockExecutor.streamEvents).not.toHaveBeenCalled();
expect(result[0][0].json.output).toBe('Regular response');
});
it('should use regular execution on version 2.2 when streaming is not available', async () => {
mockContext.isStreaming.mockReturnValue(false);
jest.spyOn(helpers, 'getConnectedTools').mockResolvedValue([mock<Tool>()]);
jest.spyOn(outputParserModule, 'getOptionalOutputParser').mockResolvedValue(undefined);
const mockExecutor = {
invoke: jest.fn().mockResolvedValue({ output: 'Regular response' }),
streamEvents: jest.fn(),
};
jest.spyOn(AgentExecutor, 'fromAgentAndTools').mockReturnValue(mockExecutor as any);
const result = await toolsAgentExecute.call(mockContext);
expect(mockContext.sendChunk).not.toHaveBeenCalled();
expect(mockExecutor.invoke).toHaveBeenCalledTimes(1);
expect(mockExecutor.streamEvents).not.toHaveBeenCalled();
expect(result[0][0].json.output).toBe('Regular response');
});
});
});

View File

@@ -122,6 +122,7 @@ export class JobProcessor {
undefined,
executionTimeoutTimestamp,
);
additionalData.streamingEnabled = job.data.streamingEnabled;
const { pushRef } = job.data;

View File

@@ -19,6 +19,7 @@ export type JobData = {
executionId: string;
loadStaticData: boolean;
pushRef?: string;
streamingEnabled?: boolean;
};
export type JobResult = {

View File

@@ -656,15 +656,6 @@ export async function executeWebhook(
);
}
// Start now to run the workflow
executionId = await Container.get(WorkflowRunner).run(
runData,
true,
!didSendResponse,
executionId,
responsePromise,
);
if (responseMode === 'streaming') {
Container.get(Logger).debug(
`Execution of workflow "${workflow.name}" from with ID ${executionId} is set to streaming`,
@@ -676,6 +667,15 @@ export async function executeWebhook(
didSendResponse = true;
}
// Start now to run the workflow
executionId = await Container.get(WorkflowRunner).run(
runData,
true,
!didSendResponse,
executionId,
responsePromise,
);
if (responseMode === 'formPage' && !didSendResponse) {
res.send({ formWaitingUrl: `${additionalData.formWaitingBaseUrl}/${executionId}` });
process.nextTick(() => res.end());

View File

@@ -237,6 +237,8 @@ async function startExecution(
if (additionalData.httpResponse) {
additionalDataIntegrated.httpResponse = additionalData.httpResponse;
}
// Propagate streaming state to subworkflows
additionalDataIntegrated.streamingEnabled = additionalData.streamingEnabled;
let subworkflowTimeout = additionalData.executionTimeoutTimestamp;
const workflowSettings = workflowData.settings;

View File

@@ -248,6 +248,7 @@ export class WorkflowRunner {
);
// TODO: set this in queue mode as well
additionalData.restartExecutionId = restartExecutionId;
additionalData.streamingEnabled = data.streamingEnabled;
additionalData.executionId = executionId;
@@ -357,6 +358,7 @@ export class WorkflowRunner {
executionId,
loadStaticData: !!loadStaticData,
pushRef: data.pushRef,
streamingEnabled: data.streamingEnabled,
};
if (!this.scalingService) {

View File

@@ -297,7 +297,7 @@ describe('ExecuteContext', () => {
expect(hooksMock.runHook).toHaveBeenCalledWith('sendChunk', [
expect.objectContaining({
type: 'item',
content: '"test"',
content: 'test',
metadata: expect.objectContaining({
nodeName: 'Test Node',
nodeId: 'test-node-id',

View File

@@ -131,6 +131,21 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
)) as IExecuteFunctions['getNodeParameter'];
}
isStreaming(): boolean {
// Check if we have sendChunk handlers
const handlers = this.additionalData.hooks?.handlers?.sendChunk?.length;
const hasHandlers = handlers !== undefined && handlers > 0;
// Check if streaming was enabled for this execution
const streamingEnabled = this.additionalData.streamingEnabled === true;
// Check current execution mode supports streaming
const executionModeSupportsStreaming = ['manual', 'webhook', 'integrated'];
const isStreamingMode = executionModeSupportsStreaming.includes(this.mode);
return hasHandlers && isStreamingMode && streamingEnabled;
}
async sendChunk(type: ChunkType, content?: IDataObject | string): Promise<void> {
const node = this.getNode();
const metadata = {
@@ -139,9 +154,11 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
timestamp: Date.now(),
};
const parsedContent = typeof content === 'string' ? content : JSON.stringify(content);
const message: StructuredChunk = {
type,
content: content ? JSON.stringify(content) : undefined,
content: parsedContent,
metadata,
};

View File

@@ -920,6 +920,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
sendMessageToUI(message: any): void;
sendResponse(response: IExecuteResponsePromiseData): void;
sendChunk(type: ChunkType, content?: IDataObject | string): void;
isStreaming(): boolean;
// TODO: Make this one then only available in the new config one
addInputData(
@@ -2394,6 +2395,7 @@ export interface IWorkflowExecuteAdditionalData {
currentNodeExecutionIndex: number;
httpResponse?: express.Response;
httpRequest?: express.Request;
streamingEnabled?: boolean;
restApiUrl: string;
instanceBaseUrl: string;
setExecutionStatus?: (status: ExecutionStatus) => void;