From e1aa60ce6fdf59db97336b0a1cdfcfe177b61f5d Mon Sep 17 00:00:00 2001 From: oleg Date: Mon, 28 Jul 2025 10:18:00 +0200 Subject: [PATCH] fix: Minor streaming fixes (no-changelog) (#17683) --- .../nodes/agents/Agent/Agent.node.ts | 3 +- .../trigger/ChatTrigger/ChatTrigger.node.ts | 6 +- .../cli/src/__tests__/workflow-runner.test.ts | 4 +- packages/cli/src/workflow-runner.ts | 10 +- .../chat/src/__tests__/plugins/chat.spec.ts | 447 ++++++++++++++++++ .../frontend/@n8n/chat/src/api/message.ts | 7 +- .../frontend/@n8n/chat/src/plugins/chat.ts | 260 +++++++--- packages/frontend/@n8n/chat/src/types/chat.ts | 2 +- .../@n8n/chat/src/utils/streamingHandlers.ts | 4 +- .../nodes-base/nodes/Webhook/Webhook.node.ts | 3 +- .../nodes-base/nodes/Webhook/description.ts | 2 +- 11 files changed, 662 insertions(+), 86 deletions(-) create mode 100644 packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts index d1b367dd45..0f3fa171a1 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/Agent.node.ts @@ -27,8 +27,7 @@ export class Agent extends VersionedNodeType { ], }, }, - // We keep defaultVersion as 2.1 to ensure we publish streaming when everything is ready - defaultVersion: 2.1, + defaultVersion: 2.2, }; const nodeVersions: IVersionedNodeType['nodeVersions'] = { diff --git a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts index 938f03a232..1ead017151 100644 --- a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts @@ -48,7 +48,7 @@ const lastNodeResponseMode = { }; const streamingResponseMode = { - name: 'Streaming Response', + name: 'Streaming', value: 'streaming', description: 'Streaming response from specified nodes (e.g. Agents)', }; @@ -495,7 +495,7 @@ export class ChatTrigger extends Node { displayName: 'Response Mode', name: 'responseMode', type: 'options', - options: [lastNodeResponseMode, respondToWebhookResponseMode], + options: [lastNodeResponseMode, streamingResponseMode, respondToWebhookResponseMode], default: 'lastNode', description: 'When and how to respond to the chat', displayOptions: { show: { '/mode': ['webhook'] } }, @@ -504,7 +504,7 @@ export class ChatTrigger extends Node { displayName: 'Response Mode', name: 'responseMode', type: 'options', - options: [lastNodeResponseMode, respondNodesResponseMode], + options: [lastNodeResponseMode, streamingResponseMode, respondNodesResponseMode], default: 'lastNode', description: 'When and how to respond to the webhook', displayOptions: { show: { '/mode': ['hostedChat'] } }, diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index ac25a57a6c..ff378cdac2 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -513,7 +513,7 @@ describe('streaming functionality', () => { expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function)); }); - it('should not setup sendChunk handler when streaming is enabled but execution mode is manual', async () => { + it('should setup sendChunk handler when streaming is enabled and execution mode is manual', async () => { // ARRANGE const activeExecutions = Container.get(ActiveExecutions); jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); @@ -550,6 +550,6 @@ describe('streaming functionality', () => { await runner.run(data); // ASSERT - expect(mockHooks.addHandler).not.toHaveBeenCalledWith('sendChunk', expect.any(Function)); + expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function)); }); }); diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index fa85c8149f..5e31b1cc68 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -268,12 +268,10 @@ export class WorkflowRunner { }); if (data.streamingEnabled) { - if (data.executionMode !== 'manual') { - lifecycleHooks.addHandler('sendChunk', (chunk) => { - data.httpResponse?.write(JSON.stringify(chunk) + '\n'); - data.httpResponse?.flush?.(); - }); - } + lifecycleHooks.addHandler('sendChunk', (chunk) => { + data.httpResponse?.write(JSON.stringify(chunk) + '\n'); + data.httpResponse?.flush?.(); + }); } additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ diff --git a/packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts b/packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts new file mode 100644 index 0000000000..329c38bd26 --- /dev/null +++ b/packages/frontend/@n8n/chat/src/__tests__/plugins/chat.spec.ts @@ -0,0 +1,447 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { createApp } from 'vue'; + +import * as api from '@n8n/chat/api'; +import type { StreamingEventHandlers } from '@n8n/chat/api/message'; +import { localStorageSessionIdKey } from '@n8n/chat/constants'; +import { chatEventBus } from '@n8n/chat/event-buses'; +import { ChatPlugin } from '@n8n/chat/plugins/chat'; +import type { Chat, ChatOptions, LoadPreviousSessionResponse } from '@n8n/chat/types'; + +// Mock dependencies +vi.mock('@n8n/chat/api'); +vi.mock('@n8n/chat/event-buses', () => ({ + chatEventBus: { + emit: vi.fn(), + }, +})); + +// Helper function to set up chat store with proper typing +function setupChatStore(options: ChatOptions): Chat { + const app = createApp({ + template: '
', + }); + app.use(ChatPlugin, options); + return app.config.globalProperties.$chat as Chat; +} + +describe('ChatPlugin', () => { + let mockOptions: ChatOptions; + + beforeEach(() => { + // Reset mocks + vi.clearAllMocks(); + + // Setup default options + mockOptions = { + webhookUrl: 'http://localhost:5678/webhook', + chatInputKey: 'message', + chatSessionKey: 'sessionId', + enableStreaming: false, + initialMessages: [], // Explicitly set to empty to override defaults + i18n: { + en: { + title: 'Test Chat', + subtitle: 'Test subtitle', + footer: '', + getStarted: 'Start', + inputPlaceholder: 'Type a message...', + closeButtonTooltip: 'Close', + }, + }, + }; + + // Setup localStorage mock + const localStorageMock = { + getItem: vi.fn(), + setItem: vi.fn(), + removeItem: vi.fn(), + clear: vi.fn(), + }; + Object.defineProperty(window, 'localStorage', { + value: localStorageMock, + writable: true, + }); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe('sendMessage', () => { + let chatStore: Chat; + + beforeEach(() => { + chatStore = setupChatStore(mockOptions); + }); + + it('should send a message without streaming', async () => { + const mockResponse = { output: 'Hello from bot!' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage('Hello bot!'); + + expect(api.sendMessage).toHaveBeenCalledWith('Hello bot!', [], null, mockOptions); + + expect(chatStore.messages.value).toHaveLength(2); + expect(chatStore.messages.value[0]).toMatchObject({ + text: 'Hello bot!', + sender: 'user', + }); + expect(chatStore.messages.value[1]).toMatchObject({ + text: 'Hello from bot!', + sender: 'bot', + }); + }); + + it('should handle empty response gracefully', async () => { + const mockResponse = {}; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage('Hello bot!'); + + expect(chatStore.messages.value).toHaveLength(2); + expect(chatStore.messages.value[1]).toMatchObject({ + text: '', + sender: 'bot', + }); + }); + + it('should handle response with only text property', async () => { + const mockResponse = { text: 'Response text' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage('Hello bot!'); + + expect(chatStore.messages.value[1]).toMatchObject({ + text: 'Response text', + sender: 'bot', + }); + }); + + it('should handle errors during message sending', async () => { + vi.mocked(api.sendMessage).mockRejectedValueOnce(new Error('Network error')); + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + await chatStore.sendMessage('Hello bot!'); + + expect(consoleErrorSpy).toHaveBeenCalledWith('Chat API error:', expect.any(Error)); + + // Error messages should be displayed to the user + expect(chatStore.messages.value).toHaveLength(2); + expect(chatStore.messages.value[0]).toMatchObject({ + text: 'Hello bot!', + sender: 'user', + }); + expect(chatStore.messages.value[1]).toMatchObject({ + text: 'Error: Failed to receive response', + sender: 'bot', + }); + + consoleErrorSpy.mockRestore(); + }); + + it('should send files with message', async () => { + const mockFile = new File(['content'], 'test.txt', { type: 'text/plain' }); + const mockResponse = { output: 'File received!' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage('Here is a file', [mockFile]); + + expect(api.sendMessage).toHaveBeenCalledWith('Here is a file', [mockFile], null, mockOptions); + + expect(chatStore.messages.value[0]).toMatchObject({ + text: 'Here is a file', + sender: 'user', + files: [mockFile], + }); + }); + + it('should set waitingForResponse correctly', async () => { + const mockResponse = { output: 'Response' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + expect(chatStore.waitingForResponse.value).toBe(false); + + const sendPromise = chatStore.sendMessage('Test'); + expect(chatStore.waitingForResponse.value).toBe(true); + + await sendPromise; + expect(chatStore.waitingForResponse.value).toBe(false); + }); + + it('should emit scrollToBottom events', async () => { + const mockResponse = { output: 'Response' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage('Test'); + + expect(chatEventBus.emit).toHaveBeenCalledWith('scrollToBottom'); + expect(chatEventBus.emit).toHaveBeenCalledTimes(2); // Once after user message, once after bot response + }); + }); + + describe('streaming', () => { + let chatStore: Chat; + + beforeEach(() => { + mockOptions.enableStreaming = true; + chatStore = setupChatStore(mockOptions); + }); + + it('should handle streaming messages', async () => { + const mockStreamingResponse = { hasReceivedChunks: true }; + vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse); + + await chatStore.sendMessage('Stream this!'); + + expect(api.sendMessageStreaming).toHaveBeenCalledWith( + 'Stream this!', + [], + null, + mockOptions, + expect.objectContaining({ + onChunk: expect.any(Function) as StreamingEventHandlers['onChunk'], + onBeginMessage: expect.any(Function) as StreamingEventHandlers['onBeginMessage'], + onEndMessage: expect.any(Function) as StreamingEventHandlers['onEndMessage'], + }), + ); + }); + + it('should handle empty streaming response', async () => { + const mockStreamingResponse = { hasReceivedChunks: false }; + vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse); + + await chatStore.sendMessage('Stream this!'); + + expect(chatStore.messages.value).toHaveLength(2); + expect(chatStore.messages.value[1]).toMatchObject({ + text: '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]', + sender: 'bot', + }); + }); + + it('should handle streaming errors', async () => { + vi.mocked(api.sendMessageStreaming).mockRejectedValueOnce(new Error('Stream error')); + const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + await chatStore.sendMessage('Stream this!'); + + expect(consoleErrorSpy).toHaveBeenCalledWith('Chat API error:', expect.any(Error)); + expect(chatStore.messages.value[1]).toMatchObject({ + text: 'Error: Failed to receive response', + sender: 'bot', + }); + + consoleErrorSpy.mockRestore(); + }); + + it('should handle streaming with files', async () => { + const mockFile = new File(['content'], 'test.txt', { type: 'text/plain' }); + const mockStreamingResponse = { hasReceivedChunks: true }; + vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse); + + await chatStore.sendMessage('Stream with file', [mockFile]); + + expect(api.sendMessageStreaming).toHaveBeenCalledWith( + 'Stream with file', + [mockFile], + null, + mockOptions, + expect.objectContaining({ + onChunk: expect.any(Function) as StreamingEventHandlers['onChunk'], + onBeginMessage: expect.any(Function) as StreamingEventHandlers['onBeginMessage'], + onEndMessage: expect.any(Function) as StreamingEventHandlers['onEndMessage'], + }), + ); + }); + }); + + describe('session management', () => { + let chatStore: Chat; + + beforeEach(() => { + mockOptions.loadPreviousSession = true; + chatStore = setupChatStore(mockOptions); + }); + + it('should load previous session', async () => { + const mockSessionId = 'existing-session'; + const mockMessages: LoadPreviousSessionResponse = { + data: [ + { + id: ['HumanMessage-1'], // The implementation expects string but types say array + kwargs: { content: 'Previous user message', additional_kwargs: {} }, + lc: 1, + type: 'HumanMessage', + }, + { + id: ['AIMessage-1'], + kwargs: { content: 'Previous bot message', additional_kwargs: {} }, + lc: 1, + type: 'AIMessage', + }, + ], + }; + + (window.localStorage.getItem as ReturnType).mockReturnValueOnce(mockSessionId); + vi.mocked(api.loadPreviousSession).mockResolvedValueOnce(mockMessages); + + const sessionId = await chatStore.loadPreviousSession?.(); + + expect(sessionId).toBe(mockSessionId); + expect(api.loadPreviousSession).toHaveBeenCalledWith(mockSessionId, mockOptions); + expect(chatStore.messages.value).toHaveLength(2); + expect(chatStore.messages.value[0]).toMatchObject({ + text: 'Previous user message', + sender: 'bot', // Both will be 'bot' because id is an array, not a string + }); + expect(chatStore.messages.value[1]).toMatchObject({ + text: 'Previous bot message', + sender: 'bot', + }); + expect(chatStore.currentSessionId.value).toBe(mockSessionId); + }); + + it('should create new session if no previous session exists', async () => { + (window.localStorage.getItem as ReturnType).mockReturnValueOnce(null); + vi.mocked(api.loadPreviousSession).mockResolvedValueOnce({ data: [] }); + + const sessionId = await chatStore.loadPreviousSession?.(); + + expect(sessionId).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i); + expect(chatStore.messages.value).toHaveLength(0); + expect(chatStore.currentSessionId.value).toBeNull(); + }); + + it('should skip loading if loadPreviousSession is false', async () => { + mockOptions.loadPreviousSession = false; + chatStore = setupChatStore(mockOptions); + + const result = await chatStore.loadPreviousSession?.(); + + expect(result).toBeUndefined(); + expect(api.loadPreviousSession).not.toHaveBeenCalled(); + }); + + it('should start a new session', async () => { + await chatStore.startNewSession?.(); + + expect(chatStore.currentSessionId.value).toMatch( + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i, + ); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(window.localStorage.setItem).toHaveBeenCalledWith( + localStorageSessionIdKey, + chatStore.currentSessionId.value, + ); + }); + }); + + describe('initial messages', () => { + it('should compute initial messages from options', () => { + mockOptions.initialMessages = ['Welcome!', 'How can I help you?']; + const chatStore = setupChatStore(mockOptions); + + expect(chatStore.initialMessages.value).toHaveLength(2); + expect(chatStore.initialMessages.value[0]).toMatchObject({ + text: 'Welcome!', + sender: 'bot', + }); + expect(chatStore.initialMessages.value[1]).toMatchObject({ + text: 'How can I help you?', + sender: 'bot', + }); + }); + + it('should handle undefined initial messages', () => { + const chatStore = setupChatStore(mockOptions); + + expect(chatStore.initialMessages.value).toHaveLength(0); + }); + }); + + describe('edge cases', () => { + let chatStore: Chat; + + beforeEach(() => { + chatStore = setupChatStore(mockOptions); + }); + + it('should handle sending message with null session ID', async () => { + const mockResponse = { output: 'Response' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + chatStore.currentSessionId.value = null; + await chatStore.sendMessage('Test'); + + expect(api.sendMessage).toHaveBeenCalledWith('Test', [], null, mockOptions); + }); + + it('should handle empty text message', async () => { + const mockResponse = { output: 'Response' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage(''); + + expect(chatStore.messages.value[0]).toMatchObject({ + text: '', + sender: 'user', + }); + }); + + it('should handle streaming with existing bot messages', async () => { + mockOptions.enableStreaming = true; + chatStore = setupChatStore(mockOptions); + + // Add an existing bot message + chatStore.messages.value.push({ + id: 'existing', + text: 'Existing message', + sender: 'bot', + }); + + const mockStreamingResponse = { hasReceivedChunks: false }; + vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse); + + await chatStore.sendMessage('Test'); + + // Should still add error message even with existing bot messages + const lastMessage = chatStore.messages.value[chatStore.messages.value.length - 1]; + assert(lastMessage.type === 'text'); + expect(lastMessage.text).toBe( + '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]', + ); + }); + + it('should return response when executionStarted is true', async () => { + const mockResponse = { + executionStarted: true, + executionId: '12345', + }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + const result = await chatStore.sendMessage('Execute workflow'); + + expect(result).toEqual(mockResponse); + // Should only have the user message, no bot response + expect(chatStore.messages.value).toHaveLength(1); + expect(chatStore.messages.value[0]).toMatchObject({ + text: 'Execute workflow', + sender: 'user', + }); + }); + + it('should handle message field in response', async () => { + const mockResponse = { message: 'Response from message field' }; + vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse); + + await chatStore.sendMessage('Test message field'); + + expect(chatStore.messages.value[1]).toMatchObject({ + text: 'Response from message field', + sender: 'bot', + }); + }); + }); +}); diff --git a/packages/frontend/@n8n/chat/src/api/message.ts b/packages/frontend/@n8n/chat/src/api/message.ts index b5cfb520a4..d2b5fbafff 100644 --- a/packages/frontend/@n8n/chat/src/api/message.ts +++ b/packages/frontend/@n8n/chat/src/api/message.ts @@ -115,7 +115,7 @@ export async function sendMessageStreaming( sessionId: string, options: ChatOptions, handlers: StreamingEventHandlers, -): Promise { +): Promise<{ hasReceivedChunks: boolean }> { // Build request const response = await (files.length > 0 ? sendWithFiles(message, files, sessionId, options) @@ -133,6 +133,7 @@ export async function sendMessageStreaming( // Process the stream const reader = response.body.pipeThrough(createLineParser()).getReader(); + let hasReceivedChunks = false; try { while (true) { @@ -147,12 +148,14 @@ export async function sendMessageStreaming( handlers.onBeginMessage(nodeId, runIndex); break; case 'item': + hasReceivedChunks = true; handlers.onChunk(value.content ?? '', nodeId, runIndex); break; case 'end': handlers.onEndMessage(nodeId, runIndex); break; case 'error': + hasReceivedChunks = true; handlers.onChunk(`Error: ${value.content ?? 'Unknown error'}`, nodeId, runIndex); handlers.onEndMessage(nodeId, runIndex); break; @@ -161,6 +164,8 @@ export async function sendMessageStreaming( } finally { reader.releaseLock(); } + + return { hasReceivedChunks }; } // Helper function for file uploads diff --git a/packages/frontend/@n8n/chat/src/plugins/chat.ts b/packages/frontend/@n8n/chat/src/plugins/chat.ts index bb2a98a474..f4cea34a7d 100644 --- a/packages/frontend/@n8n/chat/src/plugins/chat.ts +++ b/packages/frontend/@n8n/chat/src/plugins/chat.ts @@ -1,11 +1,15 @@ import { v4 as uuidv4 } from 'uuid'; -import type { Plugin } from 'vue'; -import { computed, nextTick, ref } from 'vue'; +import { type Plugin, computed, nextTick, ref, type Ref } from 'vue'; import * as api from '@n8n/chat/api'; import { ChatOptionsSymbol, ChatSymbol, localStorageSessionIdKey } from '@n8n/chat/constants'; import { chatEventBus } from '@n8n/chat/event-buses'; -import type { ChatMessage, ChatOptions, ChatMessageText } from '@n8n/chat/types'; +import type { + ChatMessage, + ChatOptions, + ChatMessageText, + SendMessageResponse, +} from '@n8n/chat/types'; import { StreamingMessageManager, createBotMessage } from '@n8n/chat/utils/streaming'; import { handleStreamingChunk, @@ -13,6 +17,169 @@ import { handleNodeComplete, } from '@n8n/chat/utils/streamingHandlers'; +/** + * Creates a new user message object with a unique ID + * @param text - The message text content + * @param files - Optional array of files attached to the message + * @returns A ChatMessage object representing the user's message + */ +function createUserMessage(text: string, files: File[] = []): ChatMessage { + return { + id: uuidv4(), + text, + sender: 'user', + files, + }; +} + +/** + * Extracts text content from a message response + * Falls back to JSON stringification if no text is found but the response object has data + * @param response - The response object from the API + * @returns The extracted text message or stringified response + */ +function processMessageResponse(response: SendMessageResponse): string { + let textMessage = response.output ?? response.text ?? response.message ?? ''; + + if (textMessage === '' && Object.keys(response).length > 0) { + try { + textMessage = JSON.stringify(response, null, 2); + } catch (e) { + // Failed to stringify the object so fallback to empty string + } + } + + return textMessage; +} + +interface EmptyStreamConfig { + receivedMessage: Ref; + messages: Ref; +} + +/** + * Handles the case when a streaming response returns no chunks + * Creates an error message explaining the likely cause + * @param config - Configuration object containing message refs + */ +function handleEmptyStreamResponse(config: EmptyStreamConfig): void { + const { receivedMessage, messages } = config; + + if (!receivedMessage.value) { + receivedMessage.value = createBotMessage(); + messages.value.push(receivedMessage.value); + } else { + // Check if any existing messages have content + const hasContent = messages.value.some( + (msg) => msg.sender === 'bot' && 'text' in msg && msg.text.trim().length > 0, + ); + if (!hasContent) { + receivedMessage.value = createBotMessage(); + messages.value.push(receivedMessage.value); + } + } + receivedMessage.value.text = + '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]'; +} + +interface ErrorConfig { + error: unknown; + receivedMessage: Ref; + messages: Ref; +} + +/** + * Handles errors that occur during message sending + * Creates an error message for the user and logs the error to console + * @param config - Configuration object containing error and message refs + */ +function handleMessageError(config: ErrorConfig): void { + const { error, receivedMessage, messages } = config; + + receivedMessage.value ??= createBotMessage(); + receivedMessage.value.text = 'Error: Failed to receive response'; + + // Ensure the error message is added to messages array if not already there + if (!messages.value.includes(receivedMessage.value)) { + messages.value.push(receivedMessage.value); + } + + console.error('Chat API error:', error); +} + +interface StreamingMessageConfig { + text: string; + files: File[]; + sessionId: string; + options: ChatOptions; + messages: Ref; + receivedMessage: Ref; + streamingManager: StreamingMessageManager; +} + +/** + * Handles sending messages with streaming enabled + * Sets up streaming event handlers and processes the response chunks + * @param config - Configuration object for streaming message handling + */ +async function handleStreamingMessage(config: StreamingMessageConfig): Promise { + const { text, files, sessionId, options, messages, receivedMessage, streamingManager } = config; + + const handlers: api.StreamingEventHandlers = { + onChunk: (chunk: string, nodeId?: string, runIndex?: number) => { + handleStreamingChunk(chunk, nodeId, streamingManager, receivedMessage, messages, runIndex); + }, + onBeginMessage: (nodeId: string, runIndex?: number) => { + handleNodeStart(nodeId, streamingManager, runIndex); + }, + onEndMessage: (nodeId: string, runIndex?: number) => { + handleNodeComplete(nodeId, streamingManager, runIndex); + }, + }; + + const { hasReceivedChunks } = await api.sendMessageStreaming( + text, + files, + sessionId, + options, + handlers, + ); + + // Check if no chunks were received (empty stream) + if (!hasReceivedChunks) { + handleEmptyStreamResponse({ receivedMessage, messages }); + } +} + +interface NonStreamingMessageConfig { + text: string; + files: File[]; + sessionId: string; + options: ChatOptions; +} + +/** + * Handles sending messages without streaming + * Sends the message and processes the complete response + * @param config - Configuration object for non-streaming message handling + * @returns The API response or a bot message + */ +async function handleNonStreamingMessage( + config: NonStreamingMessageConfig, +): Promise<{ response?: SendMessageResponse; botMessage?: ChatMessageText }> { + const { text, files, sessionId, options } = config; + + const sendMessageResponse = await api.sendMessage(text, files, sessionId, options); + + if (sendMessageResponse?.executionStarted) { + return { response: sendMessageResponse }; + } + + const receivedMessage = createBotMessage(); + receivedMessage.text = processMessageResponse(sendMessageResponse); + return { botMessage: receivedMessage }; +} + export const ChatPlugin: Plugin = { install(app, options) { app.provide(ChatOptionsSymbol, options); @@ -29,14 +196,12 @@ export const ChatPlugin: Plugin = { })), ); - async function sendMessage(text: string, files: File[] = []) { - const sentMessage: ChatMessage = { - id: uuidv4(), - text, - sender: 'user', - files, - }; - + async function sendMessage( + text: string, + files: File[] = [], + ): Promise { + // Create and add user message + const sentMessage = createUserMessage(text, files); messages.value.push(sentMessage); waitingForResponse.value = true; @@ -49,77 +214,39 @@ export const ChatPlugin: Plugin = { try { if (options?.enableStreaming) { - const handlers: api.StreamingEventHandlers = { - onChunk: (chunk: string, nodeId?: string, runIndex?: number) => { - handleStreamingChunk( - chunk, - nodeId, - streamingManager, - receivedMessage, - messages, - runIndex, - ); - }, - onBeginMessage: (nodeId: string, runIndex?: number) => { - handleNodeStart(nodeId, streamingManager, runIndex); - }, - onEndMessage: (nodeId: string, runIndex?: number) => { - handleNodeComplete(nodeId, streamingManager, runIndex); - }, - }; - - await api.sendMessageStreaming( + await handleStreamingMessage({ text, files, - currentSessionId.value as string, + sessionId: currentSessionId.value as string, options, - handlers, - ); + messages, + receivedMessage, + streamingManager, + }); } else { - receivedMessage.value = createBotMessage(); - - const sendMessageResponse = await api.sendMessage( + const result = await handleNonStreamingMessage({ text, files, - currentSessionId.value as string, + sessionId: currentSessionId.value as string, options, - ); + }); - if (sendMessageResponse?.executionStarted) { - return sendMessageResponse; + if (result.response?.executionStarted) { + waitingForResponse.value = false; + return result.response; } - let textMessage = - sendMessageResponse.output ?? - sendMessageResponse.text ?? - sendMessageResponse.message ?? - ''; - - if (textMessage === '' && Object.keys(sendMessageResponse).length > 0) { - try { - textMessage = JSON.stringify(sendMessageResponse, null, 2); - } catch (e) { - // Failed to stringify the object so fallback to empty string - } + if (result.botMessage) { + receivedMessage.value = result.botMessage; + messages.value.push(result.botMessage); } - - receivedMessage.value.text = textMessage; - messages.value.push(receivedMessage.value); } } catch (error) { - if (!receivedMessage.value) { - receivedMessage.value = createBotMessage(); - messages.value.push(receivedMessage.value); - } - if (receivedMessage.value && 'text' in receivedMessage.value) { - receivedMessage.value.text = 'Error: Failed to receive response'; - } - console.error('Chat API error:', error); + handleMessageError({ error, receivedMessage, messages }); + } finally { waitingForResponse.value = false; } - waitingForResponse.value = false; - void nextTick(() => { chatEventBus.emit('scrollToBottom'); }); @@ -148,6 +275,7 @@ export const ChatPlugin: Plugin = { return sessionId; } + // eslint-disable-next-line @typescript-eslint/require-await async function startNewSession() { currentSessionId.value = uuidv4(); diff --git a/packages/frontend/@n8n/chat/src/types/chat.ts b/packages/frontend/@n8n/chat/src/types/chat.ts index 9c0ccbb87f..578624be11 100644 --- a/packages/frontend/@n8n/chat/src/types/chat.ts +++ b/packages/frontend/@n8n/chat/src/types/chat.ts @@ -11,6 +11,6 @@ export interface Chat { waitingForResponse: Ref; loadPreviousSession?: () => Promise; startNewSession?: () => Promise; - sendMessage: (text: string, files: File[]) => Promise; + sendMessage: (text: string, files?: File[]) => Promise; ws?: WebSocket | null; } diff --git a/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts b/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts index f0a0d1e4f9..746e53b984 100644 --- a/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts +++ b/packages/frontend/@n8n/chat/src/utils/streamingHandlers.ts @@ -16,8 +16,8 @@ export function handleStreamingChunk( runIndex?: number, ): void { try { - // Skip empty chunks to avoid showing empty responses - if (!chunk.trim()) { + // Only skip empty chunks, but not whitespace only chunks + if (chunk === '') { return; } diff --git a/packages/nodes-base/nodes/Webhook/Webhook.node.ts b/packages/nodes-base/nodes/Webhook/Webhook.node.ts index 102d5e7685..5e54d77838 100644 --- a/packages/nodes-base/nodes/Webhook/Webhook.node.ts +++ b/packages/nodes-base/nodes/Webhook/Webhook.node.ts @@ -47,8 +47,7 @@ export class Webhook extends Node { name: 'webhook', group: ['trigger'], version: [1, 1.1, 2, 2.1], - // Keep the default version as 2 to avoid releasing streaming in broken state - defaultVersion: 2, + defaultVersion: 2.1, description: 'Starts the workflow when a webhook is called', eventTriggerDescription: 'Waiting for you to call the Test URL', activationMessage: 'You can now make calls to your production webhook URL.', diff --git a/packages/nodes-base/nodes/Webhook/description.ts b/packages/nodes-base/nodes/Webhook/description.ts index 2eb314390c..f752384f62 100644 --- a/packages/nodes-base/nodes/Webhook/description.ts +++ b/packages/nodes-base/nodes/Webhook/description.ts @@ -164,7 +164,7 @@ export const responseModePropertyStreaming: INodeProperties = { options: [ ...responseModeOptions, { - name: 'Streaming Response', + name: 'Streaming', value: 'streaming', description: 'Returns data in real time from streaming enabled nodes', },