feat: Update Chat SDK to support streaming responses (#17006)

Co-authored-by: Eugene Molodkin <eugene@n8n.io>
This commit is contained in:
Benjamin Schroth
2025-07-10 12:25:29 +02:00
committed by GitHub
parent b9e7b719c0
commit 3edadb5a75
26 changed files with 1670 additions and 37 deletions

View File

@@ -82,6 +82,7 @@ function createAgentExecutor(
async function processEventStream(
ctx: IExecuteFunctions,
eventStream: IterableReadableStream<StreamEvent>,
itemIndex: number,
returnIntermediateSteps: boolean = false,
): Promise<{ output: string; intermediateSteps?: any[] }> {
const agentResult: { output: string; intermediateSteps?: any[] } = {
@@ -92,7 +93,7 @@ async function processEventStream(
agentResult.intermediateSteps = [];
}
ctx.sendChunk('begin');
ctx.sendChunk('begin', itemIndex);
for await (const event of eventStream) {
// Stream chat model tokens as they come in
switch (event.event) {
@@ -108,7 +109,7 @@ async function processEventStream(
} else if (typeof chunkContent === 'string') {
chunkText = chunkContent;
}
ctx.sendChunk('item', chunkText);
ctx.sendChunk('item', itemIndex, chunkText);
agentResult.output += chunkText;
}
@@ -155,7 +156,7 @@ async function processEventStream(
break;
}
}
ctx.sendChunk('end');
ctx.sendChunk('end', itemIndex);
return agentResult;
}
@@ -274,7 +275,12 @@ export async function toolsAgentExecute(
},
);
return await processEventStream(this, eventStream, options.returnIntermediateSteps);
return await processEventStream(
this,
eventStream,
itemIndex,
options.returnIntermediateSteps,
);
} else {
// Handle regular execution
return await executor.invoke(invokeParams, executeOptions);

View File

@@ -487,10 +487,10 @@ describe('toolsAgentExecute', () => {
const result = await toolsAgentExecute.call(mockContext);
expect(mockContext.sendChunk).toHaveBeenCalledWith('begin');
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'Hello ');
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 'world!');
expect(mockContext.sendChunk).toHaveBeenCalledWith('end');
expect(mockContext.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 0, 'Hello ');
expect(mockContext.sendChunk).toHaveBeenCalledWith('item', 0, 'world!');
expect(mockContext.sendChunk).toHaveBeenCalledWith('end', 0);
expect(mockExecutor.streamEvents).toHaveBeenCalledTimes(1);
expect(result[0]).toHaveLength(1);
expect(result[0][0].json.output).toBe('Hello world!');

View File

@@ -591,6 +591,7 @@ export class ChatTrigger extends Node {
allowFileUploads: options.allowFileUploads,
allowedFilesMimeTypes: options.allowedFilesMimeTypes,
customCss: options.customCss,
enableStreaming,
});
res.status(200).send(page).end();

View File

@@ -12,6 +12,7 @@ export function createPage({
allowFileUploads,
allowedFilesMimeTypes,
customCss,
enableStreaming,
}: {
instanceId: string;
webhookUrl?: string;
@@ -26,6 +27,7 @@ export function createPage({
allowFileUploads?: boolean;
allowedFilesMimeTypes?: string;
customCss?: string;
enableStreaming?: boolean;
}) {
const validAuthenticationOptions: AuthenticationChatOption[] = [
'none',
@@ -124,6 +126,7 @@ export function createPage({
${en ? `en: ${JSON.stringify(en)},` : ''}
},
${initialMessages.length ? `initialMessages: ${JSON.stringify(initialMessages)},` : ''}
enableStreaming: ${!!enableStreaming},
});
})();
</script>

View File

@@ -225,6 +225,8 @@ describe('ActiveExecutions', () => {
metadata: {
nodeName: 'testNode',
nodeId: uuid(),
runIndex: 0,
itemIndex: 0,
timestamp: Date.now(),
},
};
@@ -242,6 +244,8 @@ describe('ActiveExecutions', () => {
metadata: {
nodeName: 'testNode',
nodeId: uuid(),
runIndex: 0,
itemIndex: 0,
timestamp: Date.now(),
},
};

View File

@@ -2238,6 +2238,12 @@ describe('WorkflowExecute', () => {
{
type: 'error',
content: 'A detailed error description',
metadata: {
nodeId: errorNode.id,
nodeName: errorNode.name,
runIndex: 0,
itemIndex: 0,
},
},
]);
});
@@ -2296,6 +2302,12 @@ describe('WorkflowExecute', () => {
{
type: 'error',
content: 'The API returned an error',
metadata: {
nodeId: errorNode.id,
nodeName: errorNode.name,
runIndex: 0,
itemIndex: 0,
},
},
]);
});
@@ -2400,6 +2412,12 @@ describe('WorkflowExecute', () => {
{
type: 'error',
content: 'Custom error description',
metadata: {
nodeId: errorNode.id,
nodeName: errorNode.name,
runIndex: 0,
itemIndex: 0,
},
},
]);
});
@@ -2457,6 +2475,12 @@ describe('WorkflowExecute', () => {
{
type: 'error',
content: undefined, // When no description is available, content should be undefined
metadata: {
nodeId: errorNode.id,
nodeName: errorNode.name,
runIndex: 0,
itemIndex: 0,
},
},
]);
});

View File

@@ -292,7 +292,7 @@ describe('ExecuteContext', () => {
abortSignal,
);
await testExecuteContext.sendChunk('item', 'test');
await testExecuteContext.sendChunk('item', 0, 'test');
expect(hooksMock.runHook).toHaveBeenCalledWith('sendChunk', [
expect.objectContaining({
@@ -301,6 +301,8 @@ describe('ExecuteContext', () => {
metadata: expect.objectContaining({
nodeName: 'Test Node',
nodeId: 'test-node-id',
runIndex: 0,
itemIndex: 0,
timestamp: expect.any(Number),
}),
}),
@@ -330,7 +332,7 @@ describe('ExecuteContext', () => {
abortSignal,
);
await testExecuteContext.sendChunk('begin');
await testExecuteContext.sendChunk('begin', 0);
expect(hooksMock.runHook).toHaveBeenCalledWith('sendChunk', [
expect.objectContaining({
@@ -339,6 +341,8 @@ describe('ExecuteContext', () => {
metadata: expect.objectContaining({
nodeName: 'Test Node',
nodeId: 'test-node-id',
runIndex: 0,
itemIndex: 0,
timestamp: expect.any(Number),
}),
}),
@@ -366,7 +370,7 @@ describe('ExecuteContext', () => {
);
// Should not throw error
await expect(testExecuteContext.sendChunk('item', 'test')).resolves.toBeUndefined();
await expect(testExecuteContext.sendChunk('item', 0, 'test')).resolves.toBeUndefined();
});
});
});

View File

@@ -146,11 +146,17 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
return hasHandlers && isStreamingMode && streamingEnabled;
}
async sendChunk(type: ChunkType, content?: IDataObject | string): Promise<void> {
async sendChunk(
type: ChunkType,
itemIndex: number,
content?: IDataObject | string,
): Promise<void> {
const node = this.getNode();
const metadata = {
nodeId: node.id,
nodeName: node.name,
itemIndex,
runIndex: this.runIndex,
timestamp: Date.now(),
};

View File

@@ -1720,7 +1720,16 @@ export class WorkflowExecute {
// Send error to the response if necessary
await hooks?.runHook('sendChunk', [
{ type: 'error', content: executionError.description },
{
type: 'error',
content: executionError.description,
metadata: {
nodeId: executionNode.id,
nodeName: executionNode.name,
runIndex,
itemIndex: 0,
},
},
]);
if (

View File

@@ -14,6 +14,9 @@ Open the **Chat Trigger** node and add your domain to the **Allowed Origins (COR
[See example workflow](https://github.com/n8n-io/n8n/blob/master/packages/%40n8n/chat/resources/workflow.json)
To use streaming responses, you need to enable the **Streaming response** response mode in the **Chat Trigger** node.
[See example workflow with streaming](https://github.com/n8n-io/n8n/blob/master/packages/%40n8n/chat/resources/workflow-streaming.json)
> Make sure the workflow is **Active.**
### How it works
@@ -129,6 +132,7 @@ createChat({
inputPlaceholder: 'Type your question..',
},
},
enableStreaming: false,
});
```
@@ -200,6 +204,11 @@ createChat({
- **Default**: `''`
- **Description**: A comma-separated list of allowed MIME types for file uploads. Only applicable if `allowFileUploads` is set to `true`. If left empty, all file types are allowed. For example: `'image/*,application/pdf'`.
### enableStreaming
- Type: boolean
- Default: false
- Description: Whether to enable streaming responses from the n8n workflow. If set to `true`, the chat will display responses as they are being generated, providing a more interactive experience. For this to work the workflow must be configured as well to return streaming responses.
## Customization
The Chat window is entirely customizable using CSS variables.

View File

@@ -0,0 +1,84 @@
{
"nodes": [
{
"parameters": {
"model": {
"__rl": true,
"mode": "list",
"value": "gpt-4.1-mini"
},
"options": {}
},
"type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
"typeVersion": 1.2,
"position": [400, 224],
"id": "9593988a-2ca5-4a82-bd3a-3d8fcb69036d",
"name": "OpenAI Chat Model",
"credentials": {
"openAiApi": {
"id": "Rr1g6PqGGpNJcaBf",
"name": "OpenAi account"
}
}
},
{
"parameters": {
"options": {
"returnIntermediateSteps": false,
"enableStreaming": false
}
},
"type": "@n8n/n8n-nodes-langchain.agent",
"typeVersion": 2.2,
"position": [400, 48],
"id": "72b37ab2-7352-476b-bca6-5b41e2290082",
"name": "AI Agent"
},
{
"parameters": {
"public": true,
"options": {
"responseMode": "streaming"
}
},
"type": "@n8n/n8n-nodes-langchain.chatTrigger",
"typeVersion": 1.2,
"position": [32, 48],
"id": "012ecda4-3ae8-4328-a371-c7ae63cabf1c",
"name": "When chat message received",
"webhookId": "022d6461-e68d-4531-b713-833953c388c2"
}
],
"connections": {
"OpenAI Chat Model": {
"ai_languageModel": [
[
{
"node": "AI Agent",
"type": "ai_languageModel",
"index": 0
}
]
]
},
"AI Agent": {
"main": [[]]
},
"When chat message received": {
"main": [
[
{
"node": "AI Agent",
"type": "main",
"index": 0
}
]
]
}
},
"pinData": {},
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "1d6725ae695aac02e442b627cd2266d305eba55a427b2dcc7702a0271b70043c"
}
}

View File

@@ -32,6 +32,7 @@ export const Fullscreen: Story = {
args: {
webhookUrl,
mode: 'fullscreen',
enableStreaming: false,
} satisfies Partial<ChatOptions>,
};
@@ -39,6 +40,7 @@ export const Windowed: Story = {
args: {
webhookUrl,
mode: 'window',
enableStreaming: false,
} satisfies Partial<ChatOptions>,
};
@@ -51,5 +53,6 @@ export const WorkflowChat: Story = {
allowFileUploads: true,
showWelcomeScreen: false,
initialMessages: [],
enableStreaming: false,
} satisfies Partial<ChatOptions>,
};

View File

@@ -0,0 +1,485 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { sendMessageStreaming } from '@n8n/chat/api';
import type { ChatOptions } from '@n8n/chat/types';
describe('sendMessageStreaming', () => {
const mockOptions: ChatOptions = {
webhookUrl: 'https://test.example.com/webhook',
chatSessionKey: 'sessionId',
chatInputKey: 'chatInput',
i18n: {
en: {
title: 'Test',
subtitle: 'Test',
footer: 'Test',
getStarted: 'Test',
inputPlaceholder: 'Test',
closeButtonTooltip: 'Test',
},
},
};
beforeEach(() => {
vi.restoreAllMocks();
});
it('should call the webhook URL with correct parameters', async () => {
const chunks = [
{
type: 'begin',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'item',
content: 'Hello ',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'item',
content: 'World!',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'end',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => {
const data = JSON.stringify(chunk) + '\n';
controller.enqueue(encoder.encode(data));
});
controller.close();
},
});
const mockResponse = {
ok: true,
status: 200,
body: stream,
headers: new Headers(),
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
const onChunk = vi.fn();
const onBeginMessage = vi.fn();
const onEndMessage = vi.fn();
await sendMessageStreaming('Test message', [], 'test-session-id', mockOptions, {
onChunk,
onBeginMessage,
onEndMessage,
});
expect(fetch).toHaveBeenCalledWith('https://test.example.com/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/plain',
},
body: JSON.stringify({
action: 'sendMessage',
sessionId: 'test-session-id',
chatInput: 'Test message',
}),
});
expect(onBeginMessage).toHaveBeenCalledTimes(1);
expect(onBeginMessage).toHaveBeenCalledWith('node-1', 0);
expect(onChunk).toHaveBeenCalledTimes(2);
expect(onChunk).toHaveBeenCalledWith('Hello ', 'node-1', 0);
expect(onChunk).toHaveBeenCalledWith('World!', 'node-1', 0);
expect(onEndMessage).toHaveBeenCalledTimes(1);
expect(onEndMessage).toHaveBeenCalledWith('node-1', 0);
});
it('should handle multiple runs and items correctly', async () => {
const chunks = [
{
type: 'begin',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'item',
content: 'Run 0 Item 0 ',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'end',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'begin',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 1,
itemIndex: 0,
},
},
{
type: 'item',
content: 'Run 1 Item 0 ',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 1,
itemIndex: 0,
},
},
{
type: 'end',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 1,
itemIndex: 0,
},
},
];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => {
const data = JSON.stringify(chunk) + '\n';
controller.enqueue(encoder.encode(data));
});
controller.close();
},
});
const mockResponse = {
ok: true,
status: 200,
body: stream,
headers: new Headers(),
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
const onChunk = vi.fn();
const onBeginMessage = vi.fn();
const onEndMessage = vi.fn();
await sendMessageStreaming('Test message', [], 'test-session-id', mockOptions, {
onChunk,
onBeginMessage,
onEndMessage,
});
expect(onBeginMessage).toHaveBeenCalledTimes(2);
expect(onBeginMessage).toHaveBeenCalledWith('node-1', 0);
expect(onBeginMessage).toHaveBeenCalledWith('node-1', 1);
expect(onChunk).toHaveBeenCalledTimes(2);
expect(onChunk).toHaveBeenCalledWith('Run 0 Item 0 ', 'node-1', 0);
expect(onChunk).toHaveBeenCalledWith('Run 1 Item 0 ', 'node-1', 1);
expect(onEndMessage).toHaveBeenCalledTimes(2);
expect(onEndMessage).toHaveBeenCalledWith('node-1', 0);
expect(onEndMessage).toHaveBeenCalledWith('node-1', 1);
});
it('should support file uploads with streaming', async () => {
const testFile = new File(['test'], 'test.txt', { type: 'text/plain' });
const chunks = [
{
type: 'begin',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'item',
content: 'File processed: ',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'item',
content: 'test.txt',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
{
type: 'end',
metadata: {
nodeId: 'node-1',
nodeName: 'Test Node',
timestamp: Date.now(),
runIndex: 0,
itemIndex: 0,
},
},
];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => {
const data = JSON.stringify(chunk) + '\n';
controller.enqueue(encoder.encode(data));
});
controller.close();
},
});
const mockResponse = {
ok: true,
status: 200,
body: stream,
headers: new Headers(),
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
const onChunk = vi.fn();
const onBeginMessage = vi.fn();
const onEndMessage = vi.fn();
await sendMessageStreaming('Test message', [testFile], 'test-session-id', mockOptions, {
onChunk,
onBeginMessage,
onEndMessage,
});
// Verify FormData was used for file upload
expect(fetch).toHaveBeenCalledWith('https://test.example.com/webhook', {
method: 'POST',
headers: {
Accept: 'text/plain',
},
body: expect.any(FormData),
});
expect(onBeginMessage).toHaveBeenCalledTimes(1);
expect(onBeginMessage).toHaveBeenCalledWith('node-1', 0);
expect(onChunk).toHaveBeenCalledTimes(2);
expect(onChunk).toHaveBeenCalledWith('File processed: ', 'node-1', 0);
expect(onChunk).toHaveBeenCalledWith('test.txt', 'node-1', 0);
expect(onEndMessage).toHaveBeenCalledTimes(1);
expect(onEndMessage).toHaveBeenCalledWith('node-1', 0);
});
it('should handle HTTP errors', async () => {
const mockResponse = {
ok: false,
status: 500,
headers: new Headers(),
text: async () => 'Internal Server Error',
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
await expect(
sendMessageStreaming('Test message', [], 'test-session-id', mockOptions, {
onChunk: vi.fn(),
onEndMessage: vi.fn(),
onBeginMessage: vi.fn(),
}),
).rejects.toThrow('Error while sending message. Error: Internal Server Error');
});
it('should handle missing response body', async () => {
const mockResponse = {
ok: true,
status: 200,
body: null,
headers: new Headers(),
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
await expect(
sendMessageStreaming('Test message', [], 'test-session-id', mockOptions, {
onChunk: vi.fn(),
onEndMessage: vi.fn(),
onBeginMessage: vi.fn(),
}),
).rejects.toThrow('Response body is not readable');
});
it('should include custom headers from webhook config', async () => {
const optionsWithHeaders: ChatOptions = {
...mockOptions,
webhookConfig: {
headers: {
Authorization: 'Bearer token',
'X-Custom-Header': 'value',
},
},
};
const chunks = [
{
type: 'begin',
metadata: { nodeId: 'node-1', nodeName: 'Test Node', timestamp: Date.now() },
},
{ type: 'end', metadata: { nodeId: 'node-1', nodeName: 'Test Node', timestamp: Date.now() } },
];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => {
const data = JSON.stringify(chunk) + '\n';
controller.enqueue(encoder.encode(data));
});
controller.close();
},
});
const mockResponse = {
ok: true,
status: 200,
body: stream,
headers: new Headers(),
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
await sendMessageStreaming('Test message', [], 'test-session-id', optionsWithHeaders, {
onChunk: vi.fn(),
onEndMessage: vi.fn(),
onBeginMessage: vi.fn(),
});
expect(fetch).toHaveBeenCalledWith('https://test.example.com/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/plain',
Authorization: 'Bearer token',
'X-Custom-Header': 'value',
},
body: JSON.stringify({
action: 'sendMessage',
sessionId: 'test-session-id',
chatInput: 'Test message',
}),
});
});
it('should include metadata when provided', async () => {
const optionsWithMetadata: ChatOptions = {
...mockOptions,
metadata: {
userId: 'user-123',
source: 'chat-widget',
},
};
const chunks = [
{
type: 'begin',
metadata: { nodeId: 'node-1', nodeName: 'Test Node', timestamp: Date.now() },
},
{ type: 'end', metadata: { nodeId: 'node-1', nodeName: 'Test Node', timestamp: Date.now() } },
];
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => {
const data = JSON.stringify(chunk) + '\n';
controller.enqueue(encoder.encode(data));
});
controller.close();
},
});
const mockResponse = {
ok: true,
status: 200,
body: stream,
headers: new Headers(),
} as Response;
vi.spyOn(global, 'fetch').mockResolvedValue(mockResponse);
await sendMessageStreaming('Test message', [], 'test-session-id', optionsWithMetadata, {
onChunk: vi.fn(),
onEndMessage: vi.fn(),
onBeginMessage: vi.fn(),
});
expect(fetch).toHaveBeenCalledWith('https://test.example.com/webhook', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/plain',
},
body: JSON.stringify({
action: 'sendMessage',
sessionId: 'test-session-id',
chatInput: 'Test message',
metadata: {
userId: 'user-123',
source: 'chat-widget',
},
}),
});
});
});

View File

@@ -4,6 +4,7 @@ import {
createFetchResponse,
createGetLatestMessagesResponse,
createSendMessageResponse,
createMockStreamingFetchResponse,
getChatInputSendButton,
getChatInputTextarea,
getChatMessage,
@@ -216,4 +217,156 @@ describe('createChat()', () => {
},
);
});
describe('streaming', () => {
it('should handle streaming responses when enableStreaming is true', async () => {
const input = 'Tell me a story!';
const chunks = [
{
type: 'begin',
metadata: {
nodeId: 'node-1',
itemIndex: 0,
runIndex: 0,
nodeName: 'Test Node',
timestamp: Date.now(),
},
},
{
type: 'item',
content: 'Once upon ',
metadata: {
nodeId: 'node-1',
itemIndex: 0,
runIndex: 0,
nodeName: 'Test Node',
timestamp: Date.now(),
},
},
{
type: 'item',
content: 'a time, ',
metadata: {
nodeId: 'node-1',
itemIndex: 0,
runIndex: 0,
nodeName: 'Test Node',
timestamp: Date.now(),
},
},
{
type: 'item',
content: 'there was a test.',
metadata: {
nodeId: 'node-1',
itemIndex: 0,
runIndex: 0,
nodeName: 'Test Node',
timestamp: Date.now(),
},
},
{
type: 'end',
metadata: {
nodeId: 'node-1',
itemIndex: 0,
runIndex: 0,
nodeName: 'Test Node',
timestamp: Date.now(),
},
},
];
const fetchSpy = vi.spyOn(window, 'fetch');
fetchSpy
.mockImplementationOnce(createFetchResponse(createGetLatestMessagesResponse))
.mockImplementationOnce(createMockStreamingFetchResponse(chunks));
app = createChat({
mode: 'fullscreen',
enableStreaming: true,
});
await waitFor(() => expect(getChatInputTextarea()).toBeInTheDocument());
const textarea = getChatInputTextarea();
const sendButton = getChatInputSendButton();
await fireEvent.update(textarea as HTMLElement, input);
await fireEvent.click(sendButton as HTMLElement);
expect(getChatMessageByText(input)).toBeInTheDocument();
expect(getChatMessages().length).toBe(3);
await waitFor(() => expect(getChatMessageTyping()).not.toBeInTheDocument());
const expectedOutput = 'Once upon a time, there was a test.';
await waitFor(() => expect(getChatMessageByText(expectedOutput)).toBeInTheDocument());
expect(fetchSpy.mock.calls[1][1]).toEqual(
expect.objectContaining({
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/plain',
},
body: expect.stringMatching(/"action":"sendMessage"/) as unknown,
}),
);
});
it('should fall back to regular API when enableStreaming is false', async () => {
const input = 'Hello!';
const output = 'Hello Bot World!';
const fetchSpy = vi.spyOn(window, 'fetch');
fetchSpy
.mockImplementationOnce(createFetchResponse(createGetLatestMessagesResponse))
.mockImplementationOnce(createFetchResponse(createSendMessageResponse(output)));
app = createChat({
mode: 'fullscreen',
enableStreaming: false,
});
await waitFor(() => expect(getChatInputTextarea()).toBeInTheDocument());
const textarea = getChatInputTextarea();
const sendButton = getChatInputSendButton();
await fireEvent.update(textarea as HTMLElement, input);
await fireEvent.click(sendButton as HTMLElement);
expect(getChatMessageByText(input)).toBeInTheDocument();
await waitFor(() => expect(getChatMessageTyping()).not.toBeInTheDocument());
expect(getChatMessageByText(output)).toBeInTheDocument();
});
it('should handle streaming errors gracefully', async () => {
const input = 'This should fail!';
const fetchSpy = vi.spyOn(window, 'fetch');
fetchSpy
.mockImplementationOnce(createFetchResponse(createGetLatestMessagesResponse))
.mockImplementationOnce(async () => {
throw new Error('Network error');
});
app = createChat({
mode: 'fullscreen',
enableStreaming: true,
});
await waitFor(() => expect(getChatInputTextarea()).toBeInTheDocument());
const textarea = getChatInputTextarea();
const sendButton = getChatInputSendButton();
await fireEvent.update(textarea as HTMLElement, input);
await fireEvent.click(sendButton as HTMLElement);
expect(getChatMessageByText(input)).toBeInTheDocument();
await waitFor(() => expect(getChatMessageTyping()).not.toBeInTheDocument());
expect(getChatMessageByText('Error: Failed to receive response')).toBeInTheDocument();
});
});
});

View File

@@ -16,3 +16,31 @@ export const createSendMessageResponse = (
): SendMessageResponse => ({
output,
});
export function createMockStreamingFetchResponse(
chunks: Array<{
type: string;
content?: string;
metadata?: { nodeId: string; nodeName: string; timestamp: number };
}>,
) {
return async () => {
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
chunks.forEach((chunk) => {
const data = JSON.stringify(chunk) + '\n';
controller.enqueue(encoder.encode(data));
});
controller.close();
},
});
return {
ok: true,
status: 200,
body: stream,
headers: new Headers(),
} as Response;
};
}

View File

@@ -0,0 +1,181 @@
import { describe, expect, it } from 'vitest';
import type { ChatMessageText } from '@n8n/chat/types';
import {
StreamingMessageManager,
createBotMessage,
updateMessageInArray,
} from '@n8n/chat/utils/streaming';
describe('StreamingMessageManager', () => {
it('should initialize runs correctly', () => {
const manager = new StreamingMessageManager();
const message1 = manager.initializeRun('node-1', 0);
const message2 = manager.initializeRun('node-1', 1);
expect(manager.getRunCount()).toBe(2);
expect(message1.id).toBeDefined();
expect(message2.id).toBeDefined();
expect(message1.id).not.toBe(message2.id);
});
it('should create separate messages for different runs', () => {
const manager = new StreamingMessageManager();
// Initialize two different runs
const message1 = manager.addRunToActive('node-1', 0);
const message2 = manager.addRunToActive('node-1', 1);
expect(manager.getRunCount()).toBe(2);
expect(message1.id).not.toBe(message2.id);
// Add chunks to different runs
const result1 = manager.addChunkToRun('node-1', 'Run 0 content', 0);
const result2 = manager.addChunkToRun('node-1', 'Run 1 content', 1);
expect(result1?.text).toBe('Run 0 content');
expect(result2?.text).toBe('Run 1 content');
expect(result1?.id).toBe(message1.id);
expect(result2?.id).toBe(message2.id);
});
it('should accumulate chunks within the same run', () => {
const manager = new StreamingMessageManager();
const message = manager.addRunToActive('node-1', 0);
manager.addChunkToRun('node-1', 'Hello ', 0);
const result = manager.addChunkToRun('node-1', 'World!', 0);
expect(result?.text).toBe('Hello World!');
expect(result?.id).toBe(message.id);
});
it('should handle runs without runIndex (backward compatibility)', () => {
const manager = new StreamingMessageManager();
const message = manager.addRunToActive('node-1');
const result = manager.addChunkToRun('node-1', 'Single run content');
expect(result?.text).toBe('Single run content');
expect(result?.id).toBe(message.id);
expect(manager.getRunCount()).toBe(1);
});
it('should track active runs correctly', () => {
const manager = new StreamingMessageManager();
manager.addRunToActive('node-1', 0);
manager.addRunToActive('node-1', 1);
expect(manager.getRunCount()).toBe(2);
manager.removeRunFromActive('node-1', 0);
expect(manager.areAllRunsComplete()).toBe(false);
manager.removeRunFromActive('node-1', 1);
expect(manager.areAllRunsComplete()).toBe(true);
});
it('should return all messages in order', () => {
const manager = new StreamingMessageManager();
const message1 = manager.addRunToActive('node-1', 0);
const message2 = manager.addRunToActive('node-1', 1);
const message3 = manager.addRunToActive('node-2', 0);
const allMessages = manager.getAllMessages();
expect(allMessages).toHaveLength(3);
expect(allMessages[0].id).toBe(message1.id);
expect(allMessages[1].id).toBe(message2.id);
expect(allMessages[2].id).toBe(message3.id);
});
it('should reset correctly', () => {
const manager = new StreamingMessageManager();
manager.addRunToActive('node-1', 0);
manager.addRunToActive('node-1', 1);
manager.addChunkToRun('node-1', 'test', 0);
expect(manager.getRunCount()).toBe(2);
manager.reset();
expect(manager.getRunCount()).toBe(0);
expect(manager.getAllMessages()).toHaveLength(0);
});
});
describe('createBotMessage', () => {
it('should create a bot message with default values', () => {
const message = createBotMessage();
expect(message.type).toBe('text');
expect(message.text).toBe('');
expect(message.sender).toBe('bot');
expect(message.id).toBeDefined();
});
it('should create a bot message with custom id', () => {
const customId = 'custom-id-123';
const message = createBotMessage(customId);
expect(message.id).toBe(customId);
});
});
describe('updateMessageInArray', () => {
it('should update message in array', () => {
const messages: ChatMessageText[] = [
{
id: 'msg-1',
type: 'text',
text: 'Hello',
sender: 'bot',
},
{
id: 'msg-2',
type: 'text',
text: 'World',
sender: 'user',
},
];
const updatedMessage: ChatMessageText = {
id: 'msg-1',
type: 'text',
text: 'Hello Updated',
sender: 'bot',
};
updateMessageInArray(messages, 'msg-1', updatedMessage);
expect(messages[0].text).toBe('Hello Updated');
expect(messages[1].text).toBe('World'); // Should remain unchanged
});
it('should throw error on non-existent message id', () => {
const messages: ChatMessageText[] = [
{
id: 'msg-1',
type: 'text',
text: 'Hello',
sender: 'bot',
},
];
const updatedMessage: ChatMessageText = {
id: 'non-existent',
type: 'text',
text: 'Should not be added',
sender: 'bot',
};
expect(() => updateMessageInArray(messages, 'non-existent', updatedMessage)).toThrow(
"Can't update message. No message with id non-existent found",
);
});
});

View File

@@ -0,0 +1,181 @@
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { ref, type Ref } from 'vue';
import type { ChatMessage, ChatMessageText } from '@n8n/chat/types';
import { StreamingMessageManager } from '@n8n/chat/utils/streaming';
import {
handleStreamingChunk,
handleNodeStart,
handleNodeComplete,
} from '@n8n/chat/utils/streamingHandlers';
// Mock the chatEventBus
vi.mock('@n8n/chat/event-buses', () => ({
chatEventBus: {
emit: vi.fn(),
},
}));
describe('streamingHandlers', () => {
let messages: Ref<ChatMessage[]>;
let receivedMessage: Ref<ChatMessageText | null>;
let streamingManager: StreamingMessageManager;
beforeEach(() => {
messages = ref<ChatMessage[]>([]);
receivedMessage = ref<ChatMessageText | null>(null);
streamingManager = new StreamingMessageManager();
vi.clearAllMocks();
});
describe('handleStreamingChunk', () => {
it('should handle single-node streaming (no nodeId)', () => {
handleStreamingChunk('Hello', undefined, streamingManager, receivedMessage, messages);
expect(receivedMessage.value).toBeDefined();
expect(receivedMessage.value?.text).toBe('Hello');
expect(messages.value).toHaveLength(1);
handleStreamingChunk(' World!', undefined, streamingManager, receivedMessage, messages);
expect(receivedMessage.value?.text).toBe('Hello World!');
expect(messages.value).toHaveLength(1);
});
it('should handle streaming with separate messages per runIndex', () => {
// Start the runs (doesn't create messages yet)
handleNodeStart('node-1', streamingManager, 0);
handleNodeStart('node-1', streamingManager, 1);
expect(messages.value).toHaveLength(0); // No messages created yet
// Now handle chunks for different runs - this will create the messages
handleStreamingChunk(
'Run 0 content',
'node-1',
streamingManager,
receivedMessage,
messages,
0,
);
handleStreamingChunk(
'Run 1 content',
'node-1',
streamingManager,
receivedMessage,
messages,
1,
);
expect(messages.value).toHaveLength(2); // Messages created on first chunk
// Check that we have two separate messages with different content
const message1 = messages.value[0] as ChatMessageText;
const message2 = messages.value[1] as ChatMessageText;
expect(message1.text).toBe('Run 0 content');
expect(message2.text).toBe('Run 1 content');
expect(message1.id).not.toBe(message2.id);
});
it('should accumulate chunks within the same run', () => {
// Start a run (doesn't create message yet)
handleNodeStart('node-1', streamingManager, 0);
expect(messages.value).toHaveLength(0);
// Add multiple chunks to the same run - message created on first chunk
handleStreamingChunk('Hello ', 'node-1', streamingManager, receivedMessage, messages, 0);
expect(messages.value).toHaveLength(1);
handleStreamingChunk('World!', 'node-1', streamingManager, receivedMessage, messages, 0);
const message = messages.value[0] as ChatMessageText;
expect(message.text).toBe('Hello World!');
expect(messages.value).toHaveLength(1);
});
it('should handle errors gracefully', () => {
// Simulate an error by passing invalid parameters
const invalidStreamingManager = null as unknown as StreamingMessageManager;
expect(() => {
handleStreamingChunk('test', 'node-1', invalidStreamingManager, receivedMessage, messages);
}).not.toThrow();
});
});
describe('handleNodeStart', () => {
it('should register runs but not create messages yet', () => {
handleNodeStart('node-1', streamingManager, 0);
handleNodeStart('node-1', streamingManager, 1);
// No messages created yet - they'll be created on first chunk
expect(messages.value).toHaveLength(0);
// But runs should be registered as active
// We can verify this by checking that chunks will create messages
handleStreamingChunk('test', 'node-1', streamingManager, receivedMessage, messages, 0);
expect(messages.value).toHaveLength(1);
});
it('should handle runs without runIndex', () => {
handleNodeStart('node-1', streamingManager);
expect(messages.value).toHaveLength(0);
// Verify run is registered by adding a chunk
handleStreamingChunk('test', 'node-1', streamingManager, receivedMessage, messages);
expect(messages.value).toHaveLength(1);
});
it('should handle errors gracefully', () => {
const invalidStreamingManager = null as unknown as StreamingMessageManager;
expect(() => {
handleNodeStart('node-1', invalidStreamingManager);
}).not.toThrow();
});
});
describe('handleNodeComplete', () => {
it('should mark run as complete', () => {
// Setup initial state
streamingManager.addRunToActive('node-1', 0);
handleNodeComplete('node-1', streamingManager, 0);
expect(streamingManager.areAllRunsComplete()).toBe(true);
});
it('should handle multiple runs completion', () => {
// Setup two runs
streamingManager.addRunToActive('node-1', 0);
streamingManager.addRunToActive('node-1', 1);
// Complete first run
handleNodeComplete('node-1', streamingManager, 0);
expect(streamingManager.areAllRunsComplete()).toBe(false);
// Complete second run
handleNodeComplete('node-1', streamingManager, 1);
expect(streamingManager.areAllRunsComplete()).toBe(true);
});
it('should handle runs without runIndex', () => {
streamingManager.addRunToActive('node-1');
handleNodeComplete('node-1', streamingManager);
expect(streamingManager.areAllRunsComplete()).toBe(true);
});
it('should handle errors gracefully', () => {
const invalidStreamingManager = null as unknown as StreamingMessageManager;
expect(() => {
handleNodeComplete('node-1', invalidStreamingManager);
}).not.toThrow();
});
});
});

View File

@@ -3,6 +3,7 @@ import type {
ChatOptions,
LoadPreviousSessionResponse,
SendMessageResponse,
StructuredChunk,
} from '@n8n/chat/types';
export async function loadPreviousSession(sessionId: string, options: ChatOptions) {
@@ -55,3 +56,163 @@ export async function sendMessage(
},
);
}
// Create a transform stream that parses newline-delimited JSON
function createLineParser(): TransformStream<Uint8Array, StructuredChunk> {
let buffer = '';
const decoder = new TextDecoder();
return new TransformStream({
transform(chunk, controller) {
buffer += decoder.decode(chunk, { stream: true });
// Process all complete lines in the buffer
const lines = buffer.split('\n');
buffer = lines.pop() ?? ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
const parsed = JSON.parse(line) as StructuredChunk;
controller.enqueue(parsed);
} catch (error) {
// Handle non-JSON lines as plain text
controller.enqueue({
type: 'item',
content: line,
} as StructuredChunk);
}
}
}
},
flush(controller) {
// Process any remaining buffer content
if (buffer.trim()) {
try {
const parsed = JSON.parse(buffer) as StructuredChunk;
controller.enqueue(parsed);
} catch (error) {
controller.enqueue({
type: 'item',
content: buffer,
} as StructuredChunk);
}
}
},
});
}
export interface StreamingEventHandlers {
onBeginMessage: (nodeId: string, runIndex?: number) => void;
onChunk: (chunk: string, nodeId?: string, runIndex?: number) => void;
onEndMessage: (nodeId: string, runIndex?: number) => void;
}
export async function sendMessageStreaming(
message: string,
files: File[],
sessionId: string,
options: ChatOptions,
handlers: StreamingEventHandlers,
): Promise<void> {
// Build request
const response = await (files.length > 0
? sendWithFiles(message, files, sessionId, options)
: sendTextOnly(message, sessionId, options));
if (!response.ok) {
const errorText = await response.text();
console.error('HTTP error response:', response.status, errorText);
throw new Error(`Error while sending message. Error: ${errorText}`);
}
if (!response.body) {
throw new Error('Response body is not readable');
}
// Process the stream
const reader = response.body.pipeThrough(createLineParser()).getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const nodeId = value.metadata?.nodeId || 'unknown';
const runIndex = value.metadata?.runIndex;
switch (value.type) {
case 'begin':
handlers.onBeginMessage(nodeId, runIndex);
break;
case 'item':
handlers.onChunk(value.content ?? '', nodeId, runIndex);
break;
case 'end':
handlers.onEndMessage(nodeId, runIndex);
break;
case 'error':
handlers.onChunk(`Error: ${value.content ?? 'Unknown error'}`, nodeId, runIndex);
handlers.onEndMessage(nodeId, runIndex);
break;
}
}
} finally {
reader.releaseLock();
}
}
// Helper function for file uploads
async function sendWithFiles(
message: string,
files: File[],
sessionId: string,
options: ChatOptions,
): Promise<Response> {
const formData = new FormData();
formData.append('action', 'sendMessage');
formData.append(options.chatSessionKey as string, sessionId);
formData.append(options.chatInputKey as string, message);
if (options.metadata) {
formData.append('metadata', JSON.stringify(options.metadata));
}
for (const file of files) {
formData.append('files', file);
}
return await fetch(options.webhookUrl, {
method: 'POST',
headers: {
Accept: 'text/plain',
...options.webhookConfig?.headers,
},
body: formData,
});
}
// Helper function for text-only messages
async function sendTextOnly(
message: string,
sessionId: string,
options: ChatOptions,
): Promise<Response> {
const body = {
action: 'sendMessage',
[options.chatSessionKey as string]: sessionId,
[options.chatInputKey as string]: message,
...(options.metadata ? { metadata: options.metadata } : {}),
};
return await fetch(options.webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/plain',
...options.webhookConfig?.headers,
},
body: JSON.stringify(body),
});
}

View File

@@ -25,6 +25,7 @@ export const defaultOptions: ChatOptions = {
},
},
theme: {},
enableStreaming: false,
};
export const defaultMountingTarget = '#n8n-chat';

View File

@@ -5,7 +5,13 @@ import { computed, nextTick, ref } from 'vue';
import * as api from '@n8n/chat/api';
import { ChatOptionsSymbol, ChatSymbol, localStorageSessionIdKey } from '@n8n/chat/constants';
import { chatEventBus } from '@n8n/chat/event-buses';
import type { ChatMessage, ChatOptions } from '@n8n/chat/types';
import type { ChatMessage, ChatOptions, ChatMessageText } from '@n8n/chat/types';
import { StreamingMessageManager, createBotMessage } from '@n8n/chat/utils/streaming';
import {
handleStreamingChunk,
handleNodeStart,
handleNodeComplete,
} from '@n8n/chat/utils/streamingHandlers';
export const ChatPlugin: Plugin<ChatOptions> = {
install(app, options) {
@@ -38,32 +44,73 @@ export const ChatPlugin: Plugin<ChatOptions> = {
chatEventBus.emit('scrollToBottom');
});
const sendMessageResponse = await api.sendMessage(
text,
files,
currentSessionId.value as string,
options,
);
const receivedMessage = ref<ChatMessageText | null>(null);
const streamingManager = new StreamingMessageManager();
let textMessage = sendMessageResponse.output ?? sendMessageResponse.text ?? '';
try {
if (options?.enableStreaming) {
const handlers: api.StreamingEventHandlers = {
onChunk: (chunk: string, nodeId?: string, runIndex?: number) => {
handleStreamingChunk(
chunk,
nodeId,
streamingManager,
receivedMessage,
messages,
runIndex,
);
},
onBeginMessage: (nodeId: string, runIndex?: number) => {
handleNodeStart(nodeId, streamingManager, runIndex);
},
onEndMessage: (nodeId: string, runIndex?: number) => {
handleNodeComplete(nodeId, streamingManager, runIndex);
},
};
if (textMessage === '' && Object.keys(sendMessageResponse).length > 0) {
try {
textMessage = JSON.stringify(sendMessageResponse, null, 2);
} catch (e) {
// Failed to stringify the object so fallback to empty string
await api.sendMessageStreaming(
text,
files,
currentSessionId.value as string,
options,
handlers,
);
} else {
receivedMessage.value = createBotMessage();
const sendMessageResponse = await api.sendMessage(
text,
files,
currentSessionId.value as string,
options,
);
let textMessage = sendMessageResponse.output ?? sendMessageResponse.text ?? '';
if (textMessage === '' && Object.keys(sendMessageResponse).length > 0) {
try {
textMessage = JSON.stringify(sendMessageResponse, null, 2);
} catch (e) {
// Failed to stringify the object so fallback to empty string
}
}
receivedMessage.value.text = textMessage;
messages.value.push(receivedMessage.value);
}
} catch (error) {
if (!receivedMessage.value) {
receivedMessage.value = createBotMessage();
messages.value.push(receivedMessage.value);
}
if (receivedMessage.value && 'text' in receivedMessage.value) {
receivedMessage.value.text = 'Error: Failed to receive response';
}
console.error('Chat API error:', error);
} finally {
waitingForResponse.value = false;
}
const receivedMessage: ChatMessage = {
id: uuidv4(),
text: textMessage,
sender: 'bot',
};
messages.value.push(receivedMessage);
waitingForResponse.value = false;
void nextTick(() => {
chatEventBus.emit('scrollToBottom');
});

View File

@@ -2,3 +2,4 @@ export * from './chat';
export * from './messages';
export * from './options';
export * from './webhook';
export * from './streaming';

View File

@@ -33,4 +33,5 @@ export interface ChatOptions {
disabled?: Ref<boolean>;
allowFileUploads?: Ref<boolean> | boolean;
allowedFilesMimeTypes?: Ref<string> | string;
enableStreaming?: boolean;
}

View File

@@ -0,0 +1,19 @@
export type ChunkType = 'begin' | 'item' | 'end' | 'error';
export interface StructuredChunk {
type: ChunkType;
content?: string;
metadata: {
nodeId: string;
nodeName: string;
timestamp: number;
runIndex: number;
itemIndex: number;
};
}
export interface NodeStreamingState {
nodeId: string;
chunks: string[];
isActive: boolean;
startTime: number;
}

View File

@@ -0,0 +1,133 @@
import { v4 as uuidv4 } from 'uuid';
import type { ChatMessage, ChatMessageText } from '@n8n/chat/types';
export interface NodeRunData {
content: string;
isComplete: boolean;
message: ChatMessageText;
}
/**
* Manages the state of streaming messages for nodes.
* This class is responsible for tracking the state of each run of nodes,
* including the content of each chunk, whether it's complete, and the message
* object that represents the run of a given node.
*/
export class StreamingMessageManager {
private nodeRuns = new Map<string, NodeRunData>();
private runOrder: string[] = [];
private activeRuns = new Set<string>();
constructor() {}
private getRunKey(nodeId: string, runIndex?: number): string {
if (runIndex !== undefined) {
return `${nodeId}-${runIndex}`;
}
return nodeId;
}
initializeRun(nodeId: string, runIndex?: number): ChatMessageText {
const runKey = this.getRunKey(nodeId, runIndex);
if (!this.nodeRuns.has(runKey)) {
const message = createBotMessage();
this.nodeRuns.set(runKey, {
content: '',
isComplete: false,
message,
});
this.runOrder.push(runKey);
return message;
}
return this.nodeRuns.get(runKey)!.message;
}
registerRunStart(nodeId: string, runIndex?: number): void {
const runKey = this.getRunKey(nodeId, runIndex);
this.activeRuns.add(runKey);
}
addRunToActive(nodeId: string, runIndex?: number): ChatMessageText {
const runKey = this.getRunKey(nodeId, runIndex);
this.activeRuns.add(runKey);
return this.initializeRun(nodeId, runIndex);
}
removeRunFromActive(nodeId: string, runIndex?: number): void {
const runKey = this.getRunKey(nodeId, runIndex);
this.activeRuns.delete(runKey);
const runData = this.nodeRuns.get(runKey);
if (runData) {
runData.isComplete = true;
}
}
addChunkToRun(nodeId: string, chunk: string, runIndex?: number): ChatMessageText | null {
const runKey = this.getRunKey(nodeId, runIndex);
const runData = this.nodeRuns.get(runKey);
if (runData) {
runData.content += chunk;
// Create a new message object to trigger Vue reactivity
const updatedMessage: ChatMessageText = {
...runData.message,
text: runData.content,
};
runData.message = updatedMessage;
return updatedMessage;
}
return null;
}
getRunMessage(nodeId: string, runIndex?: number): ChatMessageText | null {
const runKey = this.getRunKey(nodeId, runIndex);
const runData = this.nodeRuns.get(runKey);
return runData?.message ?? null;
}
areAllRunsComplete(): boolean {
return Array.from(this.nodeRuns.values()).every((data) => data.isComplete);
}
getRunCount(): number {
return this.runOrder.length;
}
getActiveRunCount(): number {
return this.activeRuns.size;
}
getAllMessages(): ChatMessageText[] {
return this.runOrder
.map((key) => this.nodeRuns.get(key)?.message)
.filter((message): message is ChatMessageText => message !== undefined);
}
reset(): void {
this.nodeRuns.clear();
this.runOrder = [];
this.activeRuns.clear();
}
}
export function createBotMessage(id?: string): ChatMessageText {
return {
id: id ?? uuidv4(),
type: 'text',
text: '',
sender: 'bot',
};
}
export function updateMessageInArray(
messages: ChatMessage[],
messageId: string,
updatedMessage: ChatMessageText,
): void {
const messageIndex = messages.findIndex((msg: ChatMessage) => msg.id === messageId);
if (messageIndex === -1) {
throw new Error(`Can't update message. No message with id ${messageId} found`);
}
messages[messageIndex] = updatedMessage;
}

View File

@@ -0,0 +1,87 @@
import { nextTick } from 'vue';
import type { Ref } from 'vue';
import { chatEventBus } from '@n8n/chat/event-buses';
import type { ChatMessage, ChatMessageText } from '@n8n/chat/types';
import type { StreamingMessageManager } from './streaming';
import { createBotMessage, updateMessageInArray } from './streaming';
export function handleStreamingChunk(
chunk: string,
nodeId: string | undefined,
streamingManager: StreamingMessageManager,
receivedMessage: Ref<ChatMessageText | null>,
messages: Ref<ChatMessage[]>,
runIndex?: number,
): void {
try {
// Skip empty chunks to avoid showing empty responses
if (!chunk.trim()) {
return;
}
if (!nodeId) {
// Simple single-node streaming (backwards compatibility)
if (!receivedMessage.value) {
receivedMessage.value = createBotMessage();
messages.value.push(receivedMessage.value);
}
const updatedMessage: ChatMessageText = {
...receivedMessage.value,
text: receivedMessage.value.text + chunk,
};
updateMessageInArray(messages.value, receivedMessage.value.id, updatedMessage);
receivedMessage.value = updatedMessage;
} else {
// Multi-run streaming with separate messages per runIndex
// Create message on first chunk if it doesn't exist
let runMessage = streamingManager.getRunMessage(nodeId, runIndex);
if (!runMessage) {
runMessage = streamingManager.addRunToActive(nodeId, runIndex);
messages.value.push(runMessage);
}
// Add chunk to the run
const updatedMessage = streamingManager.addChunkToRun(nodeId, chunk, runIndex);
if (updatedMessage) {
updateMessageInArray(messages.value, updatedMessage.id, updatedMessage);
}
}
void nextTick(() => {
chatEventBus.emit('scrollToBottom');
});
} catch (error) {
console.error('Error handling stream chunk:', error);
// Continue gracefully without breaking the stream
}
}
export function handleNodeStart(
nodeId: string,
streamingManager: StreamingMessageManager,
runIndex?: number,
): void {
try {
// Just register the run as starting, don't create a message yet
// Message will be created when first chunk arrives
streamingManager.registerRunStart(nodeId, runIndex);
} catch (error) {
console.error('Error handling node start:', error);
}
}
export function handleNodeComplete(
nodeId: string,
streamingManager: StreamingMessageManager,
runIndex?: number,
): void {
try {
streamingManager.removeRunFromActive(nodeId, runIndex);
} catch (error) {
console.error('Error handling node complete:', error);
}
}

View File

@@ -926,7 +926,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
putExecutionToWait(waitTill: Date): Promise<void>;
sendMessageToUI(message: any): void;
sendResponse(response: IExecuteResponsePromiseData): void;
sendChunk(type: ChunkType, content?: IDataObject | string): void;
sendChunk(type: ChunkType, itemIndex: number, content?: IDataObject | string): void;
isStreaming(): boolean;
// TODO: Make this one then only available in the new config one
@@ -2944,6 +2944,8 @@ export interface StructuredChunk {
metadata: {
nodeId: string;
nodeName: string;
runIndex: number;
itemIndex: number;
timestamp: number;
};
}