fix: Minor streaming fixes (no-changelog) (#17683)

This commit is contained in:
oleg
2025-07-28 10:18:00 +02:00
committed by GitHub
parent e0ffadef34
commit e1aa60ce6f
11 changed files with 662 additions and 86 deletions

View File

@@ -27,8 +27,7 @@ export class Agent extends VersionedNodeType {
], ],
}, },
}, },
// We keep defaultVersion as 2.1 to ensure we publish streaming when everything is ready defaultVersion: 2.2,
defaultVersion: 2.1,
}; };
const nodeVersions: IVersionedNodeType['nodeVersions'] = { const nodeVersions: IVersionedNodeType['nodeVersions'] = {

View File

@@ -48,7 +48,7 @@ const lastNodeResponseMode = {
}; };
const streamingResponseMode = { const streamingResponseMode = {
name: 'Streaming Response', name: 'Streaming',
value: 'streaming', value: 'streaming',
description: 'Streaming response from specified nodes (e.g. Agents)', description: 'Streaming response from specified nodes (e.g. Agents)',
}; };
@@ -495,7 +495,7 @@ export class ChatTrigger extends Node {
displayName: 'Response Mode', displayName: 'Response Mode',
name: 'responseMode', name: 'responseMode',
type: 'options', type: 'options',
options: [lastNodeResponseMode, respondToWebhookResponseMode], options: [lastNodeResponseMode, streamingResponseMode, respondToWebhookResponseMode],
default: 'lastNode', default: 'lastNode',
description: 'When and how to respond to the chat', description: 'When and how to respond to the chat',
displayOptions: { show: { '/mode': ['webhook'] } }, displayOptions: { show: { '/mode': ['webhook'] } },
@@ -504,7 +504,7 @@ export class ChatTrigger extends Node {
displayName: 'Response Mode', displayName: 'Response Mode',
name: 'responseMode', name: 'responseMode',
type: 'options', type: 'options',
options: [lastNodeResponseMode, respondNodesResponseMode], options: [lastNodeResponseMode, streamingResponseMode, respondNodesResponseMode],
default: 'lastNode', default: 'lastNode',
description: 'When and how to respond to the webhook', description: 'When and how to respond to the webhook',
displayOptions: { show: { '/mode': ['hostedChat'] } }, displayOptions: { show: { '/mode': ['hostedChat'] } },

View File

@@ -513,7 +513,7 @@ describe('streaming functionality', () => {
expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function)); expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function));
}); });
it('should not setup sendChunk handler when streaming is enabled but execution mode is manual', async () => { it('should setup sendChunk handler when streaming is enabled and execution mode is manual', async () => {
// ARRANGE // ARRANGE
const activeExecutions = Container.get(ActiveExecutions); const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'add').mockResolvedValue('1'); jest.spyOn(activeExecutions, 'add').mockResolvedValue('1');
@@ -550,6 +550,6 @@ describe('streaming functionality', () => {
await runner.run(data); await runner.run(data);
// ASSERT // ASSERT
expect(mockHooks.addHandler).not.toHaveBeenCalledWith('sendChunk', expect.any(Function)); expect(mockHooks.addHandler).toHaveBeenCalledWith('sendChunk', expect.any(Function));
}); });
}); });

View File

@@ -268,13 +268,11 @@ export class WorkflowRunner {
}); });
if (data.streamingEnabled) { if (data.streamingEnabled) {
if (data.executionMode !== 'manual') {
lifecycleHooks.addHandler('sendChunk', (chunk) => { lifecycleHooks.addHandler('sendChunk', (chunk) => {
data.httpResponse?.write(JSON.stringify(chunk) + '\n'); data.httpResponse?.write(JSON.stringify(chunk) + '\n');
data.httpResponse?.flush?.(); data.httpResponse?.flush?.();
}); });
} }
}
additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({
executionId, executionId,

View File

@@ -0,0 +1,447 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { createApp } from 'vue';
import * as api from '@n8n/chat/api';
import type { StreamingEventHandlers } from '@n8n/chat/api/message';
import { localStorageSessionIdKey } from '@n8n/chat/constants';
import { chatEventBus } from '@n8n/chat/event-buses';
import { ChatPlugin } from '@n8n/chat/plugins/chat';
import type { Chat, ChatOptions, LoadPreviousSessionResponse } from '@n8n/chat/types';
// Mock dependencies
vi.mock('@n8n/chat/api');
vi.mock('@n8n/chat/event-buses', () => ({
chatEventBus: {
emit: vi.fn(),
},
}));
// Helper function to set up chat store with proper typing
function setupChatStore(options: ChatOptions): Chat {
const app = createApp({
template: '<div></div>',
});
app.use(ChatPlugin, options);
return app.config.globalProperties.$chat as Chat;
}
describe('ChatPlugin', () => {
let mockOptions: ChatOptions;
beforeEach(() => {
// Reset mocks
vi.clearAllMocks();
// Setup default options
mockOptions = {
webhookUrl: 'http://localhost:5678/webhook',
chatInputKey: 'message',
chatSessionKey: 'sessionId',
enableStreaming: false,
initialMessages: [], // Explicitly set to empty to override defaults
i18n: {
en: {
title: 'Test Chat',
subtitle: 'Test subtitle',
footer: '',
getStarted: 'Start',
inputPlaceholder: 'Type a message...',
closeButtonTooltip: 'Close',
},
},
};
// Setup localStorage mock
const localStorageMock = {
getItem: vi.fn(),
setItem: vi.fn(),
removeItem: vi.fn(),
clear: vi.fn(),
};
Object.defineProperty(window, 'localStorage', {
value: localStorageMock,
writable: true,
});
});
afterEach(() => {
vi.clearAllMocks();
});
describe('sendMessage', () => {
let chatStore: Chat;
beforeEach(() => {
chatStore = setupChatStore(mockOptions);
});
it('should send a message without streaming', async () => {
const mockResponse = { output: 'Hello from bot!' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('Hello bot!');
expect(api.sendMessage).toHaveBeenCalledWith('Hello bot!', [], null, mockOptions);
expect(chatStore.messages.value).toHaveLength(2);
expect(chatStore.messages.value[0]).toMatchObject({
text: 'Hello bot!',
sender: 'user',
});
expect(chatStore.messages.value[1]).toMatchObject({
text: 'Hello from bot!',
sender: 'bot',
});
});
it('should handle empty response gracefully', async () => {
const mockResponse = {};
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('Hello bot!');
expect(chatStore.messages.value).toHaveLength(2);
expect(chatStore.messages.value[1]).toMatchObject({
text: '',
sender: 'bot',
});
});
it('should handle response with only text property', async () => {
const mockResponse = { text: 'Response text' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('Hello bot!');
expect(chatStore.messages.value[1]).toMatchObject({
text: 'Response text',
sender: 'bot',
});
});
it('should handle errors during message sending', async () => {
vi.mocked(api.sendMessage).mockRejectedValueOnce(new Error('Network error'));
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
await chatStore.sendMessage('Hello bot!');
expect(consoleErrorSpy).toHaveBeenCalledWith('Chat API error:', expect.any(Error));
// Error messages should be displayed to the user
expect(chatStore.messages.value).toHaveLength(2);
expect(chatStore.messages.value[0]).toMatchObject({
text: 'Hello bot!',
sender: 'user',
});
expect(chatStore.messages.value[1]).toMatchObject({
text: 'Error: Failed to receive response',
sender: 'bot',
});
consoleErrorSpy.mockRestore();
});
it('should send files with message', async () => {
const mockFile = new File(['content'], 'test.txt', { type: 'text/plain' });
const mockResponse = { output: 'File received!' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('Here is a file', [mockFile]);
expect(api.sendMessage).toHaveBeenCalledWith('Here is a file', [mockFile], null, mockOptions);
expect(chatStore.messages.value[0]).toMatchObject({
text: 'Here is a file',
sender: 'user',
files: [mockFile],
});
});
it('should set waitingForResponse correctly', async () => {
const mockResponse = { output: 'Response' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
expect(chatStore.waitingForResponse.value).toBe(false);
const sendPromise = chatStore.sendMessage('Test');
expect(chatStore.waitingForResponse.value).toBe(true);
await sendPromise;
expect(chatStore.waitingForResponse.value).toBe(false);
});
it('should emit scrollToBottom events', async () => {
const mockResponse = { output: 'Response' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('Test');
expect(chatEventBus.emit).toHaveBeenCalledWith('scrollToBottom');
expect(chatEventBus.emit).toHaveBeenCalledTimes(2); // Once after user message, once after bot response
});
});
describe('streaming', () => {
let chatStore: Chat;
beforeEach(() => {
mockOptions.enableStreaming = true;
chatStore = setupChatStore(mockOptions);
});
it('should handle streaming messages', async () => {
const mockStreamingResponse = { hasReceivedChunks: true };
vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
await chatStore.sendMessage('Stream this!');
expect(api.sendMessageStreaming).toHaveBeenCalledWith(
'Stream this!',
[],
null,
mockOptions,
expect.objectContaining({
onChunk: expect.any(Function) as StreamingEventHandlers['onChunk'],
onBeginMessage: expect.any(Function) as StreamingEventHandlers['onBeginMessage'],
onEndMessage: expect.any(Function) as StreamingEventHandlers['onEndMessage'],
}),
);
});
it('should handle empty streaming response', async () => {
const mockStreamingResponse = { hasReceivedChunks: false };
vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
await chatStore.sendMessage('Stream this!');
expect(chatStore.messages.value).toHaveLength(2);
expect(chatStore.messages.value[1]).toMatchObject({
text: '[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]',
sender: 'bot',
});
});
it('should handle streaming errors', async () => {
vi.mocked(api.sendMessageStreaming).mockRejectedValueOnce(new Error('Stream error'));
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
await chatStore.sendMessage('Stream this!');
expect(consoleErrorSpy).toHaveBeenCalledWith('Chat API error:', expect.any(Error));
expect(chatStore.messages.value[1]).toMatchObject({
text: 'Error: Failed to receive response',
sender: 'bot',
});
consoleErrorSpy.mockRestore();
});
it('should handle streaming with files', async () => {
const mockFile = new File(['content'], 'test.txt', { type: 'text/plain' });
const mockStreamingResponse = { hasReceivedChunks: true };
vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
await chatStore.sendMessage('Stream with file', [mockFile]);
expect(api.sendMessageStreaming).toHaveBeenCalledWith(
'Stream with file',
[mockFile],
null,
mockOptions,
expect.objectContaining({
onChunk: expect.any(Function) as StreamingEventHandlers['onChunk'],
onBeginMessage: expect.any(Function) as StreamingEventHandlers['onBeginMessage'],
onEndMessage: expect.any(Function) as StreamingEventHandlers['onEndMessage'],
}),
);
});
});
describe('session management', () => {
let chatStore: Chat;
beforeEach(() => {
mockOptions.loadPreviousSession = true;
chatStore = setupChatStore(mockOptions);
});
it('should load previous session', async () => {
const mockSessionId = 'existing-session';
const mockMessages: LoadPreviousSessionResponse = {
data: [
{
id: ['HumanMessage-1'], // The implementation expects string but types say array
kwargs: { content: 'Previous user message', additional_kwargs: {} },
lc: 1,
type: 'HumanMessage',
},
{
id: ['AIMessage-1'],
kwargs: { content: 'Previous bot message', additional_kwargs: {} },
lc: 1,
type: 'AIMessage',
},
],
};
(window.localStorage.getItem as ReturnType<typeof vi.fn>).mockReturnValueOnce(mockSessionId);
vi.mocked(api.loadPreviousSession).mockResolvedValueOnce(mockMessages);
const sessionId = await chatStore.loadPreviousSession?.();
expect(sessionId).toBe(mockSessionId);
expect(api.loadPreviousSession).toHaveBeenCalledWith(mockSessionId, mockOptions);
expect(chatStore.messages.value).toHaveLength(2);
expect(chatStore.messages.value[0]).toMatchObject({
text: 'Previous user message',
sender: 'bot', // Both will be 'bot' because id is an array, not a string
});
expect(chatStore.messages.value[1]).toMatchObject({
text: 'Previous bot message',
sender: 'bot',
});
expect(chatStore.currentSessionId.value).toBe(mockSessionId);
});
it('should create new session if no previous session exists', async () => {
(window.localStorage.getItem as ReturnType<typeof vi.fn>).mockReturnValueOnce(null);
vi.mocked(api.loadPreviousSession).mockResolvedValueOnce({ data: [] });
const sessionId = await chatStore.loadPreviousSession?.();
expect(sessionId).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i);
expect(chatStore.messages.value).toHaveLength(0);
expect(chatStore.currentSessionId.value).toBeNull();
});
it('should skip loading if loadPreviousSession is false', async () => {
mockOptions.loadPreviousSession = false;
chatStore = setupChatStore(mockOptions);
const result = await chatStore.loadPreviousSession?.();
expect(result).toBeUndefined();
expect(api.loadPreviousSession).not.toHaveBeenCalled();
});
it('should start a new session', async () => {
await chatStore.startNewSession?.();
expect(chatStore.currentSessionId.value).toMatch(
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i,
);
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(window.localStorage.setItem).toHaveBeenCalledWith(
localStorageSessionIdKey,
chatStore.currentSessionId.value,
);
});
});
describe('initial messages', () => {
it('should compute initial messages from options', () => {
mockOptions.initialMessages = ['Welcome!', 'How can I help you?'];
const chatStore = setupChatStore(mockOptions);
expect(chatStore.initialMessages.value).toHaveLength(2);
expect(chatStore.initialMessages.value[0]).toMatchObject({
text: 'Welcome!',
sender: 'bot',
});
expect(chatStore.initialMessages.value[1]).toMatchObject({
text: 'How can I help you?',
sender: 'bot',
});
});
it('should handle undefined initial messages', () => {
const chatStore = setupChatStore(mockOptions);
expect(chatStore.initialMessages.value).toHaveLength(0);
});
});
describe('edge cases', () => {
let chatStore: Chat;
beforeEach(() => {
chatStore = setupChatStore(mockOptions);
});
it('should handle sending message with null session ID', async () => {
const mockResponse = { output: 'Response' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
chatStore.currentSessionId.value = null;
await chatStore.sendMessage('Test');
expect(api.sendMessage).toHaveBeenCalledWith('Test', [], null, mockOptions);
});
it('should handle empty text message', async () => {
const mockResponse = { output: 'Response' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('');
expect(chatStore.messages.value[0]).toMatchObject({
text: '',
sender: 'user',
});
});
it('should handle streaming with existing bot messages', async () => {
mockOptions.enableStreaming = true;
chatStore = setupChatStore(mockOptions);
// Add an existing bot message
chatStore.messages.value.push({
id: 'existing',
text: 'Existing message',
sender: 'bot',
});
const mockStreamingResponse = { hasReceivedChunks: false };
vi.mocked(api.sendMessageStreaming).mockResolvedValueOnce(mockStreamingResponse);
await chatStore.sendMessage('Test');
// Should still add error message even with existing bot messages
const lastMessage = chatStore.messages.value[chatStore.messages.value.length - 1];
assert(lastMessage.type === 'text');
expect(lastMessage.text).toBe(
'[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]',
);
});
it('should return response when executionStarted is true', async () => {
const mockResponse = {
executionStarted: true,
executionId: '12345',
};
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
const result = await chatStore.sendMessage('Execute workflow');
expect(result).toEqual(mockResponse);
// Should only have the user message, no bot response
expect(chatStore.messages.value).toHaveLength(1);
expect(chatStore.messages.value[0]).toMatchObject({
text: 'Execute workflow',
sender: 'user',
});
});
it('should handle message field in response', async () => {
const mockResponse = { message: 'Response from message field' };
vi.mocked(api.sendMessage).mockResolvedValueOnce(mockResponse);
await chatStore.sendMessage('Test message field');
expect(chatStore.messages.value[1]).toMatchObject({
text: 'Response from message field',
sender: 'bot',
});
});
});
});

View File

@@ -115,7 +115,7 @@ export async function sendMessageStreaming(
sessionId: string, sessionId: string,
options: ChatOptions, options: ChatOptions,
handlers: StreamingEventHandlers, handlers: StreamingEventHandlers,
): Promise<void> { ): Promise<{ hasReceivedChunks: boolean }> {
// Build request // Build request
const response = await (files.length > 0 const response = await (files.length > 0
? sendWithFiles(message, files, sessionId, options) ? sendWithFiles(message, files, sessionId, options)
@@ -133,6 +133,7 @@ export async function sendMessageStreaming(
// Process the stream // Process the stream
const reader = response.body.pipeThrough(createLineParser()).getReader(); const reader = response.body.pipeThrough(createLineParser()).getReader();
let hasReceivedChunks = false;
try { try {
while (true) { while (true) {
@@ -147,12 +148,14 @@ export async function sendMessageStreaming(
handlers.onBeginMessage(nodeId, runIndex); handlers.onBeginMessage(nodeId, runIndex);
break; break;
case 'item': case 'item':
hasReceivedChunks = true;
handlers.onChunk(value.content ?? '', nodeId, runIndex); handlers.onChunk(value.content ?? '', nodeId, runIndex);
break; break;
case 'end': case 'end':
handlers.onEndMessage(nodeId, runIndex); handlers.onEndMessage(nodeId, runIndex);
break; break;
case 'error': case 'error':
hasReceivedChunks = true;
handlers.onChunk(`Error: ${value.content ?? 'Unknown error'}`, nodeId, runIndex); handlers.onChunk(`Error: ${value.content ?? 'Unknown error'}`, nodeId, runIndex);
handlers.onEndMessage(nodeId, runIndex); handlers.onEndMessage(nodeId, runIndex);
break; break;
@@ -161,6 +164,8 @@ export async function sendMessageStreaming(
} finally { } finally {
reader.releaseLock(); reader.releaseLock();
} }
return { hasReceivedChunks };
} }
// Helper function for file uploads // Helper function for file uploads

View File

@@ -1,11 +1,15 @@
import { v4 as uuidv4 } from 'uuid'; import { v4 as uuidv4 } from 'uuid';
import type { Plugin } from 'vue'; import { type Plugin, computed, nextTick, ref, type Ref } from 'vue';
import { computed, nextTick, ref } from 'vue';
import * as api from '@n8n/chat/api'; import * as api from '@n8n/chat/api';
import { ChatOptionsSymbol, ChatSymbol, localStorageSessionIdKey } from '@n8n/chat/constants'; import { ChatOptionsSymbol, ChatSymbol, localStorageSessionIdKey } from '@n8n/chat/constants';
import { chatEventBus } from '@n8n/chat/event-buses'; import { chatEventBus } from '@n8n/chat/event-buses';
import type { ChatMessage, ChatOptions, ChatMessageText } from '@n8n/chat/types'; import type {
ChatMessage,
ChatOptions,
ChatMessageText,
SendMessageResponse,
} from '@n8n/chat/types';
import { StreamingMessageManager, createBotMessage } from '@n8n/chat/utils/streaming'; import { StreamingMessageManager, createBotMessage } from '@n8n/chat/utils/streaming';
import { import {
handleStreamingChunk, handleStreamingChunk,
@@ -13,6 +17,169 @@ import {
handleNodeComplete, handleNodeComplete,
} from '@n8n/chat/utils/streamingHandlers'; } from '@n8n/chat/utils/streamingHandlers';
/**
* Creates a new user message object with a unique ID
* @param text - The message text content
* @param files - Optional array of files attached to the message
* @returns A ChatMessage object representing the user's message
*/
function createUserMessage(text: string, files: File[] = []): ChatMessage {
return {
id: uuidv4(),
text,
sender: 'user',
files,
};
}
/**
* Extracts text content from a message response
* Falls back to JSON stringification if no text is found but the response object has data
* @param response - The response object from the API
* @returns The extracted text message or stringified response
*/
function processMessageResponse(response: SendMessageResponse): string {
let textMessage = response.output ?? response.text ?? response.message ?? '';
if (textMessage === '' && Object.keys(response).length > 0) {
try {
textMessage = JSON.stringify(response, null, 2);
} catch (e) {
// Failed to stringify the object so fallback to empty string
}
}
return textMessage;
}
interface EmptyStreamConfig {
receivedMessage: Ref<ChatMessageText | null>;
messages: Ref<ChatMessage[]>;
}
/**
* Handles the case when a streaming response returns no chunks
* Creates an error message explaining the likely cause
* @param config - Configuration object containing message refs
*/
function handleEmptyStreamResponse(config: EmptyStreamConfig): void {
const { receivedMessage, messages } = config;
if (!receivedMessage.value) {
receivedMessage.value = createBotMessage();
messages.value.push(receivedMessage.value);
} else {
// Check if any existing messages have content
const hasContent = messages.value.some(
(msg) => msg.sender === 'bot' && 'text' in msg && msg.text.trim().length > 0,
);
if (!hasContent) {
receivedMessage.value = createBotMessage();
messages.value.push(receivedMessage.value);
}
}
receivedMessage.value.text =
'[No response received. This could happen if streaming is enabled in the trigger but disabled in agent node(s)]';
}
interface ErrorConfig {
error: unknown;
receivedMessage: Ref<ChatMessageText | null>;
messages: Ref<ChatMessage[]>;
}
/**
* Handles errors that occur during message sending
* Creates an error message for the user and logs the error to console
* @param config - Configuration object containing error and message refs
*/
function handleMessageError(config: ErrorConfig): void {
const { error, receivedMessage, messages } = config;
receivedMessage.value ??= createBotMessage();
receivedMessage.value.text = 'Error: Failed to receive response';
// Ensure the error message is added to messages array if not already there
if (!messages.value.includes(receivedMessage.value)) {
messages.value.push(receivedMessage.value);
}
console.error('Chat API error:', error);
}
interface StreamingMessageConfig {
text: string;
files: File[];
sessionId: string;
options: ChatOptions;
messages: Ref<ChatMessage[]>;
receivedMessage: Ref<ChatMessageText | null>;
streamingManager: StreamingMessageManager;
}
/**
* Handles sending messages with streaming enabled
* Sets up streaming event handlers and processes the response chunks
* @param config - Configuration object for streaming message handling
*/
async function handleStreamingMessage(config: StreamingMessageConfig): Promise<void> {
const { text, files, sessionId, options, messages, receivedMessage, streamingManager } = config;
const handlers: api.StreamingEventHandlers = {
onChunk: (chunk: string, nodeId?: string, runIndex?: number) => {
handleStreamingChunk(chunk, nodeId, streamingManager, receivedMessage, messages, runIndex);
},
onBeginMessage: (nodeId: string, runIndex?: number) => {
handleNodeStart(nodeId, streamingManager, runIndex);
},
onEndMessage: (nodeId: string, runIndex?: number) => {
handleNodeComplete(nodeId, streamingManager, runIndex);
},
};
const { hasReceivedChunks } = await api.sendMessageStreaming(
text,
files,
sessionId,
options,
handlers,
);
// Check if no chunks were received (empty stream)
if (!hasReceivedChunks) {
handleEmptyStreamResponse({ receivedMessage, messages });
}
}
interface NonStreamingMessageConfig {
text: string;
files: File[];
sessionId: string;
options: ChatOptions;
}
/**
* Handles sending messages without streaming
* Sends the message and processes the complete response
* @param config - Configuration object for non-streaming message handling
* @returns The API response or a bot message
*/
async function handleNonStreamingMessage(
config: NonStreamingMessageConfig,
): Promise<{ response?: SendMessageResponse; botMessage?: ChatMessageText }> {
const { text, files, sessionId, options } = config;
const sendMessageResponse = await api.sendMessage(text, files, sessionId, options);
if (sendMessageResponse?.executionStarted) {
return { response: sendMessageResponse };
}
const receivedMessage = createBotMessage();
receivedMessage.text = processMessageResponse(sendMessageResponse);
return { botMessage: receivedMessage };
}
export const ChatPlugin: Plugin<ChatOptions> = { export const ChatPlugin: Plugin<ChatOptions> = {
install(app, options) { install(app, options) {
app.provide(ChatOptionsSymbol, options); app.provide(ChatOptionsSymbol, options);
@@ -29,14 +196,12 @@ export const ChatPlugin: Plugin<ChatOptions> = {
})), })),
); );
async function sendMessage(text: string, files: File[] = []) { async function sendMessage(
const sentMessage: ChatMessage = { text: string,
id: uuidv4(), files: File[] = [],
text, ): Promise<SendMessageResponse | null> {
sender: 'user', // Create and add user message
files, const sentMessage = createUserMessage(text, files);
};
messages.value.push(sentMessage); messages.value.push(sentMessage);
waitingForResponse.value = true; waitingForResponse.value = true;
@@ -49,77 +214,39 @@ export const ChatPlugin: Plugin<ChatOptions> = {
try { try {
if (options?.enableStreaming) { if (options?.enableStreaming) {
const handlers: api.StreamingEventHandlers = { await handleStreamingMessage({
onChunk: (chunk: string, nodeId?: string, runIndex?: number) => { text,
handleStreamingChunk( files,
chunk, sessionId: currentSessionId.value as string,
nodeId, options,
streamingManager,
receivedMessage,
messages, messages,
runIndex, receivedMessage,
); streamingManager,
}, });
onBeginMessage: (nodeId: string, runIndex?: number) => {
handleNodeStart(nodeId, streamingManager, runIndex);
},
onEndMessage: (nodeId: string, runIndex?: number) => {
handleNodeComplete(nodeId, streamingManager, runIndex);
},
};
await api.sendMessageStreaming(
text,
files,
currentSessionId.value as string,
options,
handlers,
);
} else { } else {
receivedMessage.value = createBotMessage(); const result = await handleNonStreamingMessage({
const sendMessageResponse = await api.sendMessage(
text, text,
files, files,
currentSessionId.value as string, sessionId: currentSessionId.value as string,
options, options,
); });
if (sendMessageResponse?.executionStarted) { if (result.response?.executionStarted) {
return sendMessageResponse; waitingForResponse.value = false;
return result.response;
} }
let textMessage = if (result.botMessage) {
sendMessageResponse.output ?? receivedMessage.value = result.botMessage;
sendMessageResponse.text ?? messages.value.push(result.botMessage);
sendMessageResponse.message ??
'';
if (textMessage === '' && Object.keys(sendMessageResponse).length > 0) {
try {
textMessage = JSON.stringify(sendMessageResponse, null, 2);
} catch (e) {
// Failed to stringify the object so fallback to empty string
} }
} }
receivedMessage.value.text = textMessage;
messages.value.push(receivedMessage.value);
}
} catch (error) { } catch (error) {
if (!receivedMessage.value) { handleMessageError({ error, receivedMessage, messages });
receivedMessage.value = createBotMessage(); } finally {
messages.value.push(receivedMessage.value);
}
if (receivedMessage.value && 'text' in receivedMessage.value) {
receivedMessage.value.text = 'Error: Failed to receive response';
}
console.error('Chat API error:', error);
waitingForResponse.value = false; waitingForResponse.value = false;
} }
waitingForResponse.value = false;
void nextTick(() => { void nextTick(() => {
chatEventBus.emit('scrollToBottom'); chatEventBus.emit('scrollToBottom');
}); });
@@ -148,6 +275,7 @@ export const ChatPlugin: Plugin<ChatOptions> = {
return sessionId; return sessionId;
} }
// eslint-disable-next-line @typescript-eslint/require-await
async function startNewSession() { async function startNewSession() {
currentSessionId.value = uuidv4(); currentSessionId.value = uuidv4();

View File

@@ -11,6 +11,6 @@ export interface Chat {
waitingForResponse: Ref<boolean>; waitingForResponse: Ref<boolean>;
loadPreviousSession?: () => Promise<string | undefined>; loadPreviousSession?: () => Promise<string | undefined>;
startNewSession?: () => Promise<void>; startNewSession?: () => Promise<void>;
sendMessage: (text: string, files: File[]) => Promise<SendMessageResponse | null>; sendMessage: (text: string, files?: File[]) => Promise<SendMessageResponse | null>;
ws?: WebSocket | null; ws?: WebSocket | null;
} }

View File

@@ -16,8 +16,8 @@ export function handleStreamingChunk(
runIndex?: number, runIndex?: number,
): void { ): void {
try { try {
// Skip empty chunks to avoid showing empty responses // Only skip empty chunks, but not whitespace only chunks
if (!chunk.trim()) { if (chunk === '') {
return; return;
} }

View File

@@ -47,8 +47,7 @@ export class Webhook extends Node {
name: 'webhook', name: 'webhook',
group: ['trigger'], group: ['trigger'],
version: [1, 1.1, 2, 2.1], version: [1, 1.1, 2, 2.1],
// Keep the default version as 2 to avoid releasing streaming in broken state defaultVersion: 2.1,
defaultVersion: 2,
description: 'Starts the workflow when a webhook is called', description: 'Starts the workflow when a webhook is called',
eventTriggerDescription: 'Waiting for you to call the Test URL', eventTriggerDescription: 'Waiting for you to call the Test URL',
activationMessage: 'You can now make calls to your production webhook URL.', activationMessage: 'You can now make calls to your production webhook URL.',

View File

@@ -164,7 +164,7 @@ export const responseModePropertyStreaming: INodeProperties = {
options: [ options: [
...responseModeOptions, ...responseModeOptions,
{ {
name: 'Streaming Response', name: 'Streaming',
value: 'streaming', value: 'streaming',
description: 'Returns data in real time from streaming enabled nodes', description: 'Returns data in real time from streaming enabled nodes',
}, },