diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts
index d1b367dd45..0f3fa171a1 100644
--- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts
+++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts
@@ -27,8 +27,7 @@ export class Agent extends VersionedNodeType {
],
},
},
- // We keep defaultVersion as 2.1 to ensure we publish streaming when everything is ready
- defaultVersion: 2.1,
+ defaultVersion: 2.2,
};
const nodeVersions: IVersionedNodeType['nodeVersions'] = {
diff --git a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts
index 938f03a232..1ead017151 100644
--- a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts
+++ b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts
@@ -48,7 +48,7 @@ const lastNodeResponseMode = {
};
const streamingResponseMode = {
- name: 'Streaming Response',
+ name: 'Streaming',
value: 'streaming',
description: 'Streaming response from specified nodes (e.g. Agents)',
};
@@ -495,7 +495,7 @@ export class ChatTrigger extends Node {
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
- options: [lastNodeResponseMode, respondToWebhookResponseMode],
+ options: [lastNodeResponseMode, streamingResponseMode, respondToWebhookResponseMode],
default: 'lastNode',
description: 'When and how to respond to the chat',
displayOptions: { show: { '/mode': ['webhook'] } },
@@ -504,7 +504,7 @@ export class ChatTrigger extends Node {
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
- options: [lastNodeResponseMode, respondNodesResponseMode],
+ options: [lastNodeResponseMode, streamingResponseMode, respondNodesResponseMode],
default: 'lastNode',
description: 'When and how to respond to the webhook',
displayOptions: { show: { '/mode': ['hostedChat'] } },
diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts
index ac25a57a6c..ff378cdac2 100644
--- a/packages/cli/src/__tests__/workflow-runner.test.ts
+++ b/packages/cli/src/__tests__/workflow-runner.test.ts
@@ -513,7 +513,7 @@ describe('streaming functionality', () => {
expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function));
});
- it('should not setup sendChunk handler when streaming is enabled but execution mode is manual', async () => {
+ it('should setup sendChunk handler when streaming is enabled and execution mode is manual', async () => {
// ARRANGE
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
@@ -550,6 +550,6 @@ describe('streaming functionality', () => {
await runner.run(data);
// ASSERT
- expect(mockHooks.addHandler).not.toHaveBeenCalledWith('sendChunk', expect.any(Function));
+ expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function));
});
});
diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts
index fa85c8149f..5e31b1cc68 100644
--- a/packages/cli/src/workflow-runner.ts
+++ b/packages/cli/src/workflow-runner.ts
@@ -268,12 +268,10 @@ export class WorkflowRunner {
});
if (data.streamingEnabled) {
- if (data.executionMode !== 'manual') {
- lifecycleHooks.addHandler('sendChunk', (chunk) => {
- data.httpResponse?.write(JSON.stringify(chunk) + '\n');
- data.httpResponse?.flush?.();
- });
- }
+ lifecycleHooks.addHandler('sendChunk', (chunk) => {
+ data.httpResponse?.write(JSON.stringify(chunk) + '\n');
+ data.httpResponse?.flush?.();
+ });
}
additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({
diff --git a/packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts b/packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts
new file mode 100644
index 0000000000..329c38bd26
--- /dev/null
+++ b/packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts
@@ -0,0 +1,447 @@
+import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
+import { createApp } from 'vue';
+
+import * as api from '@n8n/chat/api';
+import type { StreamingEventHandlers } from '@n8n/chat/api/message';
+import { localStorageSessionIdKey } from '@n8n/chat/constants';
+import { chatEventBus } from '@n8n/chat/event-buses';
+import { ChatPlugin } from '@n8n/chat/plugins/chat';
+import type { Chat, ChatOptions, LoadPreviousSessionResponse } from '@n8n/chat/types';
+
+// Mock dependencies
+vi.mock('@n8n/chat/api');
+vi.mock('@n8n/chat/event-buses', () => ({
+ chatEventBus: {
+ emit: vi.fn(),
+ },
+}));
+
+// Helper function to set up chat store with proper typing
+function setupChatStore(options: ChatOptions): Chat {
+ const app = createApp({
+ template: '
',
+ });
+ app.use(ChatPlugin, options);
+ return app.config.globalProperties.$chat as Chat;
+}
+
+describe('ChatPlugin', () => {
+ let mockOptions: ChatOptions;
+
+ beforeEach(() => {
+ // Reset mocks
+ vi.clearAllMocks();
+
+ // Setup default options
+ mockOptions = {
+ webhookUrl: 'http://localhost:5678/webhook',
+ chatInputKey: 'message',
+ chatSessionKey: 'sessionId',
+ enableStreaming: false,
+ initialMessages: [], // Explicitly set to empty to override defaults
+ i18n: {
+ en: {
+ title: 'Test Chat',
+ subtitle: 'Test subtitle',
+ footer: '',
+ getStarted: 'Start',
+ inputPlaceholder: 'Type a message...',
+ closeButtonTooltip: 'Close',
+ },
+ },
+ };
+
+ // Setup localStorage mock
+ const localStorageMock = {
+ getItem: vi.fn(),
+ setItem: vi.fn(),
+ removeItem: vi.fn(),
+ clear: vi.fn(),
+ };
+ Object.defineProperty(window, 'localStorage', {
+ value: localStorageMock,
+ writable: true,
+ });
+ });
+
+ afterEach(() => {
+ vi.clearAllMocks();
+ });
+
+ describe('sendMessage', () => {
+ let chatStore: Chat;
+
+ beforeEach(() => {
+ chatStore = setupChatStore(mockOptions);
+ });
+
+ it('should send a message without streaming', async () => {
+ const mockResponse = { output: 'Hello from bot!' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('Hello bot!');
+
+ expect(api.sendMessage).toHaveBeenCalledWith('Hello bot!', [], null, mockOptions);
+
+ expect(chatStore.messages.value).toHaveLength(2);
+ expect(chatStore.messages.value[0]).toMatchObject({
+ text: 'Hello bot!',
+ sender: 'user',
+ });
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: 'Hello from bot!',
+ sender: 'bot',
+ });
+ });
+
+ it('should handle empty response gracefully', async () => {
+ const mockResponse = {};
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('Hello bot!');
+
+ expect(chatStore.messages.value).toHaveLength(2);
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: '',
+ sender: 'bot',
+ });
+ });
+
+ it('should handle response with only text property', async () => {
+ const mockResponse = { text: 'Response text' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('Hello bot!');
+
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: 'Response text',
+ sender: 'bot',
+ });
+ });
+
+ it('should handle errors during message sending', async () => {
+ vi.mocked(api.sendMessage).mockRejectedValueOnce(new Error('Network error'));
+ const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
+
+ await chatStore.sendMessage('Hello bot!');
+
+ expect(consoleErrorSpy).toHaveBeenCalledWith('Chat API error:', expect.any(Error));
+
+ // Error messages should be displayed to the user
+ expect(chatStore.messages.value).toHaveLength(2);
+ expect(chatStore.messages.value[0]).toMatchObject({
+ text: 'Hello bot!',
+ sender: 'user',
+ });
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: 'Error: Failed to receive response',
+ sender: 'bot',
+ });
+
+ consoleErrorSpy.mockRestore();
+ });
+
+ it('should send files with message', async () => {
+ const mockFile = new File(['content'], 'test.txt', { type: 'text/plain' });
+ const mockResponse = { output: 'File received!' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('Here is a file', [mockFile]);
+
+ expect(api.sendMessage).toHaveBeenCalledWith('Here is a file', [mockFile], null, mockOptions);
+
+ expect(chatStore.messages.value[0]).toMatchObject({
+ text: 'Here is a file',
+ sender: 'user',
+ files: [mockFile],
+ });
+ });
+
+ it('should set waitingForResponse correctly', async () => {
+ const mockResponse = { output: 'Response' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ expect(chatStore.waitingForResponse.value).toBe(false);
+
+ const sendPromise = chatStore.sendMessage('Test');
+ expect(chatStore.waitingForResponse.value).toBe(true);
+
+ await sendPromise;
+ expect(chatStore.waitingForResponse.value).toBe(false);
+ });
+
+ it('should emit scrollToBottom events', async () => {
+ const mockResponse = { output: 'Response' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('Test');
+
+ expect(chatEventBus.emit).toHaveBeenCalledWith('scrollToBottom');
+ expect(chatEventBus.emit).toHaveBeenCalledTimes(2); // Once after user message, once after bot response
+ });
+ });
+
+ describe('streaming', () => {
+ let chatStore: Chat;
+
+ beforeEach(() => {
+ mockOptions.enableStreaming = true;
+ chatStore = setupChatStore(mockOptions);
+ });
+
+ it('should handle streaming messages', async () => {
+ const mockStreamingResponse = { hasReceivedChunks: true };
+ vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
+
+ await chatStore.sendMessage('Stream this!');
+
+ expect(api.sendMessageStreaming).toHaveBeenCalledWith(
+ 'Stream this!',
+ [],
+ null,
+ mockOptions,
+ expect.objectContaining({
+ onChunk: expect.any(Function) as StreamingEventHandlers['onChunk'],
+ onBeginMessage: expect.any(Function) as StreamingEventHandlers['onBeginMessage'],
+ onEndMessage: expect.any(Function) as StreamingEventHandlers['onEndMessage'],
+ }),
+ );
+ });
+
+ it('should handle empty streaming response', async () => {
+ const mockStreamingResponse = { hasReceivedChunks: false };
+ vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
+
+ await chatStore.sendMessage('Stream this!');
+
+ expect(chatStore.messages.value).toHaveLength(2);
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]',
+ sender: 'bot',
+ });
+ });
+
+ it('should handle streaming errors', async () => {
+ vi.mocked(api.sendMessageStreaming).mockRejectedValueOnce(new Error('Stream error'));
+ const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
+
+ await chatStore.sendMessage('Stream this!');
+
+ expect(consoleErrorSpy).toHaveBeenCalledWith('Chat API error:', expect.any(Error));
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: 'Error: Failed to receive response',
+ sender: 'bot',
+ });
+
+ consoleErrorSpy.mockRestore();
+ });
+
+ it('should handle streaming with files', async () => {
+ const mockFile = new File(['content'], 'test.txt', { type: 'text/plain' });
+ const mockStreamingResponse = { hasReceivedChunks: true };
+ vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
+
+ await chatStore.sendMessage('Stream with file', [mockFile]);
+
+ expect(api.sendMessageStreaming).toHaveBeenCalledWith(
+ 'Stream with file',
+ [mockFile],
+ null,
+ mockOptions,
+ expect.objectContaining({
+ onChunk: expect.any(Function) as StreamingEventHandlers['onChunk'],
+ onBeginMessage: expect.any(Function) as StreamingEventHandlers['onBeginMessage'],
+ onEndMessage: expect.any(Function) as StreamingEventHandlers['onEndMessage'],
+ }),
+ );
+ });
+ });
+
+ describe('session management', () => {
+ let chatStore: Chat;
+
+ beforeEach(() => {
+ mockOptions.loadPreviousSession = true;
+ chatStore = setupChatStore(mockOptions);
+ });
+
+ it('should load previous session', async () => {
+ const mockSessionId = 'existing-session';
+ const mockMessages: LoadPreviousSessionResponse = {
+ data: [
+ {
+ id: ['HumanMessage-1'], // The implementation expects string but types say array
+ kwargs: { content: 'Previous user message', additional_kwargs: {} },
+ lc: 1,
+ type: 'HumanMessage',
+ },
+ {
+ id: ['AIMessage-1'],
+ kwargs: { content: 'Previous bot message', additional_kwargs: {} },
+ lc: 1,
+ type: 'AIMessage',
+ },
+ ],
+ };
+
+ (window.localStorage.getItem as ReturnType).mockReturnValueOnce(mockSessionId);
+ vi.mocked(api.loadPreviousSession).mockResolvedValueOnce(mockMessages);
+
+ const sessionId = await chatStore.loadPreviousSession?.();
+
+ expect(sessionId).toBe(mockSessionId);
+ expect(api.loadPreviousSession).toHaveBeenCalledWith(mockSessionId, mockOptions);
+ expect(chatStore.messages.value).toHaveLength(2);
+ expect(chatStore.messages.value[0]).toMatchObject({
+ text: 'Previous user message',
+ sender: 'bot', // Both will be 'bot' because id is an array, not a string
+ });
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: 'Previous bot message',
+ sender: 'bot',
+ });
+ expect(chatStore.currentSessionId.value).toBe(mockSessionId);
+ });
+
+ it('should create new session if no previous session exists', async () => {
+ (window.localStorage.getItem as ReturnType).mockReturnValueOnce(null);
+ vi.mocked(api.loadPreviousSession).mockResolvedValueOnce({ data: [] });
+
+ const sessionId = await chatStore.loadPreviousSession?.();
+
+ expect(sessionId).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i);
+ expect(chatStore.messages.value).toHaveLength(0);
+ expect(chatStore.currentSessionId.value).toBeNull();
+ });
+
+ it('should skip loading if loadPreviousSession is false', async () => {
+ mockOptions.loadPreviousSession = false;
+ chatStore = setupChatStore(mockOptions);
+
+ const result = await chatStore.loadPreviousSession?.();
+
+ expect(result).toBeUndefined();
+ expect(api.loadPreviousSession).not.toHaveBeenCalled();
+ });
+
+ it('should start a new session', async () => {
+ await chatStore.startNewSession?.();
+
+ expect(chatStore.currentSessionId.value).toMatch(
+ /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i,
+ );
+ // eslint-disable-next-line @typescript-eslint/unbound-method
+ expect(window.localStorage.setItem).toHaveBeenCalledWith(
+ localStorageSessionIdKey,
+ chatStore.currentSessionId.value,
+ );
+ });
+ });
+
+ describe('initial messages', () => {
+ it('should compute initial messages from options', () => {
+ mockOptions.initialMessages = ['Welcome!', 'How can I help you?'];
+ const chatStore = setupChatStore(mockOptions);
+
+ expect(chatStore.initialMessages.value).toHaveLength(2);
+ expect(chatStore.initialMessages.value[0]).toMatchObject({
+ text: 'Welcome!',
+ sender: 'bot',
+ });
+ expect(chatStore.initialMessages.value[1]).toMatchObject({
+ text: 'How can I help you?',
+ sender: 'bot',
+ });
+ });
+
+ it('should handle undefined initial messages', () => {
+ const chatStore = setupChatStore(mockOptions);
+
+ expect(chatStore.initialMessages.value).toHaveLength(0);
+ });
+ });
+
+ describe('edge cases', () => {
+ let chatStore: Chat;
+
+ beforeEach(() => {
+ chatStore = setupChatStore(mockOptions);
+ });
+
+ it('should handle sending message with null session ID', async () => {
+ const mockResponse = { output: 'Response' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ chatStore.currentSessionId.value = null;
+ await chatStore.sendMessage('Test');
+
+ expect(api.sendMessage).toHaveBeenCalledWith('Test', [], null, mockOptions);
+ });
+
+ it('should handle empty text message', async () => {
+ const mockResponse = { output: 'Response' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('');
+
+ expect(chatStore.messages.value[0]).toMatchObject({
+ text: '',
+ sender: 'user',
+ });
+ });
+
+ it('should handle streaming with existing bot messages', async () => {
+ mockOptions.enableStreaming = true;
+ chatStore = setupChatStore(mockOptions);
+
+ // Add an existing bot message
+ chatStore.messages.value.push({
+ id: 'existing',
+ text: 'Existing message',
+ sender: 'bot',
+ });
+
+ const mockStreamingResponse = { hasReceivedChunks: false };
+ vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
+
+ await chatStore.sendMessage('Test');
+
+ // Should still add error message even with existing bot messages
+ const lastMessage = chatStore.messages.value[chatStore.messages.value.length - 1];
+ assert(lastMessage.type === 'text');
+ expect(lastMessage.text).toBe(
+ '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]',
+ );
+ });
+
+ it('should return response when executionStarted is true', async () => {
+ const mockResponse = {
+ executionStarted: true,
+ executionId: '12345',
+ };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ const result = await chatStore.sendMessage('Execute workflow');
+
+ expect(result).toEqual(mockResponse);
+ // Should only have the user message, no bot response
+ expect(chatStore.messages.value).toHaveLength(1);
+ expect(chatStore.messages.value[0]).toMatchObject({
+ text: 'Execute workflow',
+ sender: 'user',
+ });
+ });
+
+ it('should handle message field in response', async () => {
+ const mockResponse = { message: 'Response from message field' };
+ vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
+
+ await chatStore.sendMessage('Test message field');
+
+ expect(chatStore.messages.value[1]).toMatchObject({
+ text: 'Response from message field',
+ sender: 'bot',
+ });
+ });
+ });
+});
diff --git a/packages/frontend/@n8n/chat/src/api/message.ts b/packages/frontend/@n8n/chat/src/api/message.ts
index b5cfb520a4..d2b5fbafff 100644
--- a/packages/frontend/@n8n/chat/src/api/message.ts
+++ b/packages/frontend/@n8n/chat/src/api/message.ts
@@ -115,7 +115,7 @@ export async function sendMessageStreaming(
sessionId: string,
options: ChatOptions,
handlers: StreamingEventHandlers,
-): Promise {
+): Promise<{ hasReceivedChunks: boolean }> {
// Build request
const response = await (files.length > 0
? sendWithFiles(message, files, sessionId, options)
@@ -133,6 +133,7 @@ export async function sendMessageStreaming(
// Process the stream
const reader = response.body.pipeThrough(createLineParser()).getReader();
+ let hasReceivedChunks = false;
try {
while (true) {
@@ -147,12 +148,14 @@ export async function sendMessageStreaming(
handlers.onBeginMessage(nodeId, runIndex);
break;
case 'item':
+ hasReceivedChunks = true;
handlers.onChunk(value.content ?? '', nodeId, runIndex);
break;
case 'end':
handlers.onEndMessage(nodeId, runIndex);
break;
case 'error':
+ hasReceivedChunks = true;
handlers.onChunk(`Error: ${value.content ?? 'Unknown error'}`, nodeId, runIndex);
handlers.onEndMessage(nodeId, runIndex);
break;
@@ -161,6 +164,8 @@ export async function sendMessageStreaming(
} finally {
reader.releaseLock();
}
+
+ return { hasReceivedChunks };
}
// Helper function for file uploads
diff --git a/packages/frontend/@n8n/chat/src/plugins/chat.ts b/packages/frontend/@n8n/chat/src/plugins/chat.ts
index bb2a98a474..f4cea34a7d 100644
--- a/packages/frontend/@n8n/chat/src/plugins/chat.ts
+++ b/packages/frontend/@n8n/chat/src/plugins/chat.ts
@@ -1,11 +1,15 @@
import { v4 as uuidv4 } from 'uuid';
-import type { Plugin } from 'vue';
-import { computed, nextTick, ref } from 'vue';
+import { type Plugin, computed, nextTick, ref, type Ref } from 'vue';
import * as api from '@n8n/chat/api';
import { ChatOptionsSymbol, ChatSymbol, localStorageSessionIdKey } from '@n8n/chat/constants';
import { chatEventBus } from '@n8n/chat/event-buses';
-import type { ChatMessage, ChatOptions, ChatMessageText } from '@n8n/chat/types';
+import type {
+ ChatMessage,
+ ChatOptions,
+ ChatMessageText,
+ SendMessageResponse,
+} from '@n8n/chat/types';
import { StreamingMessageManager, createBotMessage } from '@n8n/chat/utils/streaming';
import {
handleStreamingChunk,
@@ -13,6 +17,169 @@ import {
handleNodeComplete,
} from '@n8n/chat/utils/streamingHandlers';
+/**
+ * Creates a new user message object with a unique ID
+ * @param text - The message text content
+ * @param files - Optional array of files attached to the message
+ * @returns A ChatMessage object representing the user's message
+ */
+function createUserMessage(text: string, files: File[] = []): ChatMessage {
+ return {
+ id: uuidv4(),
+ text,
+ sender: 'user',
+ files,
+ };
+}
+
+/**
+ * Extracts text content from a message response
+ * Falls back to JSON stringification if no text is found but the response object has data
+ * @param response - The response object from the API
+ * @returns The extracted text message or stringified response
+ */
+function processMessageResponse(response: SendMessageResponse): string {
+ let textMessage = response.output ?? response.text ?? response.message ?? '';
+
+ if (textMessage === '' && Object.keys(response).length > 0) {
+ try {
+ textMessage = JSON.stringify(response, null, 2);
+ } catch (e) {
+ // Failed to stringify the object so fallback to empty string
+ }
+ }
+
+ return textMessage;
+}
+
+interface EmptyStreamConfig {
+ receivedMessage: Ref;
+ messages: Ref;
+}
+
+/**
+ * Handles the case when a streaming response returns no chunks
+ * Creates an error message explaining the likely cause
+ * @param config - Configuration object containing message refs
+ */
+function handleEmptyStreamResponse(config: EmptyStreamConfig): void {
+ const { receivedMessage, messages } = config;
+
+ if (!receivedMessage.value) {
+ receivedMessage.value = createBotMessage();
+ messages.value.push(receivedMessage.value);
+ } else {
+ // Check if any existing messages have content
+ const hasContent = messages.value.some(
+ (msg) => msg.sender === 'bot' && 'text' in msg && msg.text.trim().length > 0,
+ );
+ if (!hasContent) {
+ receivedMessage.value = createBotMessage();
+ messages.value.push(receivedMessage.value);
+ }
+ }
+ receivedMessage.value.text =
+ '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]';
+}
+
+interface ErrorConfig {
+ error: unknown;
+ receivedMessage: Ref;
+ messages: Ref;
+}
+
+/**
+ * Handles errors that occur during message sending
+ * Creates an error message for the user and logs the error to console
+ * @param config - Configuration object containing error and message refs
+ */
+function handleMessageError(config: ErrorConfig): void {
+ const { error, receivedMessage, messages } = config;
+
+ receivedMessage.value ??= createBotMessage();
+ receivedMessage.value.text = 'Error: Failed to receive response';
+
+ // Ensure the error message is added to messages array if not already there
+ if (!messages.value.includes(receivedMessage.value)) {
+ messages.value.push(receivedMessage.value);
+ }
+
+ console.error('Chat API error:', error);
+}
+
+interface StreamingMessageConfig {
+ text: string;
+ files: File[];
+ sessionId: string;
+ options: ChatOptions;
+ messages: Ref;
+ receivedMessage: Ref;
+ streamingManager: StreamingMessageManager;
+}
+
+/**
+ * Handles sending messages with streaming enabled
+ * Sets up streaming event handlers and processes the response chunks
+ * @param config - Configuration object for streaming message handling
+ */
+async function handleStreamingMessage(config: StreamingMessageConfig): Promise {
+ const { text, files, sessionId, options, messages, receivedMessage, streamingManager } = config;
+
+ const handlers: api.StreamingEventHandlers = {
+ onChunk: (chunk: string, nodeId?: string, runIndex?: number) => {
+ handleStreamingChunk(chunk, nodeId, streamingManager, receivedMessage, messages, runIndex);
+ },
+ onBeginMessage: (nodeId: string, runIndex?: number) => {
+ handleNodeStart(nodeId, streamingManager, runIndex);
+ },
+ onEndMessage: (nodeId: string, runIndex?: number) => {
+ handleNodeComplete(nodeId, streamingManager, runIndex);
+ },
+ };
+
+ const { hasReceivedChunks } = await api.sendMessageStreaming(
+ text,
+ files,
+ sessionId,
+ options,
+ handlers,
+ );
+
+ // Check if no chunks were received (empty stream)
+ if (!hasReceivedChunks) {
+ handleEmptyStreamResponse({ receivedMessage, messages });
+ }
+}
+
+interface NonStreamingMessageConfig {
+ text: string;
+ files: File[];
+ sessionId: string;
+ options: ChatOptions;
+}
+
+/**
+ * Handles sending messages without streaming
+ * Sends the message and processes the complete response
+ * @param config - Configuration object for non-streaming message handling
+ * @returns The API response or a bot message
+ */
+async function handleNonStreamingMessage(
+ config: NonStreamingMessageConfig,
+): Promise<{ response?: SendMessageResponse; botMessage?: ChatMessageText }> {
+ const { text, files, sessionId, options } = config;
+
+ const sendMessageResponse = await api.sendMessage(text, files, sessionId, options);
+
+ if (sendMessageResponse?.executionStarted) {
+ return { response: sendMessageResponse };
+ }
+
+ const receivedMessage = createBotMessage();
+ receivedMessage.text = processMessageResponse(sendMessageResponse);
+ return { botMessage: receivedMessage };
+}
+
export const ChatPlugin: Plugin = {
install(app, options) {
app.provide(ChatOptionsSymbol, options);
@@ -29,14 +196,12 @@ export const ChatPlugin: Plugin = {
})),
);
- async function sendMessage(text: string, files: File[] = []) {
- const sentMessage: ChatMessage = {
- id: uuidv4(),
- text,
- sender: 'user',
- files,
- };
-
+ async function sendMessage(
+ text: string,
+ files: File[] = [],
+ ): Promise {
+ // Create and add user message
+ const sentMessage = createUserMessage(text, files);
messages.value.push(sentMessage);
waitingForResponse.value = true;
@@ -49,77 +214,39 @@ export const ChatPlugin: Plugin = {
try {
if (options?.enableStreaming) {
- const handlers: api.StreamingEventHandlers = {
- onChunk: (chunk: string, nodeId?: string, runIndex?: number) => {
- handleStreamingChunk(
- chunk,
- nodeId,
- streamingManager,
- receivedMessage,
- messages,
- runIndex,
- );
- },
- onBeginMessage: (nodeId: string, runIndex?: number) => {
- handleNodeStart(nodeId, streamingManager, runIndex);
- },
- onEndMessage: (nodeId: string, runIndex?: number) => {
- handleNodeComplete(nodeId, streamingManager, runIndex);
- },
- };
-
- await api.sendMessageStreaming(
+ await handleStreamingMessage({
text,
files,
- currentSessionId.value as string,
+ sessionId: currentSessionId.value as string,
options,
- handlers,
- );
+ messages,
+ receivedMessage,
+ streamingManager,
+ });
} else {
- receivedMessage.value = createBotMessage();
-
- const sendMessageResponse = await api.sendMessage(
+ const result = await handleNonStreamingMessage({
text,
files,
- currentSessionId.value as string,
+ sessionId: currentSessionId.value as string,
options,
- );
+ });
- if (sendMessageResponse?.executionStarted) {
- return sendMessageResponse;
+ if (result.response?.executionStarted) {
+ waitingForResponse.value = false;
+ return result.response;
}
- let textMessage =
- sendMessageResponse.output ??
- sendMessageResponse.text ??
- sendMessageResponse.message ??
- '';
-
- if (textMessage === '' && Object.keys(sendMessageResponse).length > 0) {
- try {
- textMessage = JSON.stringify(sendMessageResponse, null, 2);
- } catch (e) {
- // Failed to stringify the object so fallback to empty string
- }
+ if (result.botMessage) {
+ receivedMessage.value = result.botMessage;
+ messages.value.push(result.botMessage);
}
-
- receivedMessage.value.text = textMessage;
- messages.value.push(receivedMessage.value);
}
} catch (error) {
- if (!receivedMessage.value) {
- receivedMessage.value = createBotMessage();
- messages.value.push(receivedMessage.value);
- }
- if (receivedMessage.value && 'text' in receivedMessage.value) {
- receivedMessage.value.text = 'Error: Failed to receive response';
- }
- console.error('Chat API error:', error);
+ handleMessageError({ error, receivedMessage, messages });
+ } finally {
waitingForResponse.value = false;
}
- waitingForResponse.value = false;
-
void nextTick(() => {
chatEventBus.emit('scrollToBottom');
});
@@ -148,6 +275,7 @@ export const ChatPlugin: Plugin = {
return sessionId;
}
+ // eslint-disable-next-line @typescript-eslint/require-await
async function startNewSession() {
currentSessionId.value = uuidv4();
diff --git a/packages/frontend/@n8n/chat/src/types/chat.ts b/packages/frontend/@n8n/chat/src/types/chat.ts
index 9c0ccbb87f..578624be11 100644
--- a/packages/frontend/@n8n/chat/src/types/chat.ts
+++ b/packages/frontend/@n8n/chat/src/types/chat.ts
@@ -11,6 +11,6 @@ export interface Chat {
waitingForResponse: Ref;
loadPreviousSession?: () => Promise;
startNewSession?: () => Promise;
- sendMessage: (text: string, files: File[]) => Promise;
+ sendMessage: (text: string, files?: File[]) => Promise;
ws?: WebSocket | null;
}
diff --git a/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts b/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts
index f0a0d1e4f9..746e53b984 100644
--- a/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts
+++ b/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts
@@ -16,8 +16,8 @@ export function handleStreamingChunk(
runIndex?: number,
): void {
try {
- // Skip empty chunks to avoid showing empty responses
- if (!chunk.trim()) {
+ // Only skip empty chunks, but not whitespace only chunks
+ if (chunk === '') {
return;
}
diff --git a/packages/nodes-base/nodes/Webhook/Webhook.node.ts b/packages/nodes-base/nodes/Webhook/Webhook.node.ts
index 102d5e7685..5e54d77838 100644
--- a/packages/nodes-base/nodes/Webhook/Webhook.node.ts
+++ b/packages/nodes-base/nodes/Webhook/Webhook.node.ts
@@ -47,8 +47,7 @@ export class Webhook extends Node {
name: 'webhook',
group: ['trigger'],
version: [1, 1.1, 2, 2.1],
- // Keep the default version as 2 to avoid releasing streaming in broken state
- defaultVersion: 2,
+ defaultVersion: 2.1,
description: 'Starts the workflow when a webhook is called',
eventTriggerDescription: 'Waiting for you to call the Test URL',
activationMessage: 'You can now make calls to your production webhook URL.',
diff --git a/packages/nodes-base/nodes/Webhook/description.ts b/packages/nodes-base/nodes/Webhook/description.ts
index 2eb314390c..f752384f62 100644
--- a/packages/nodes-base/nodes/Webhook/description.ts
+++ b/packages/nodes-base/nodes/Webhook/description.ts
@@ -164,7 +164,7 @@ export const responseModePropertyStreaming: INodeProperties = {
options: [
...responseModeOptions,
{
- name: 'Streaming Response',
+ name: 'Streaming',
value: 'streaming',
description: 'Returns data in real time from streaming enabled nodes',
},