feat: Respond to chat and wait for response (#12546)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
Co-authored-by: Shireen Missi <94372015+ShireenMissi@users.noreply.github.com>
This commit is contained in:
Michael Kret
2025-07-24 11:48:40 +03:00
committed by GitHub
parent e61b25c53f
commit a98ed2ca49
47 changed files with 3441 additions and 71 deletions

View File

@@ -0,0 +1,273 @@
/* eslint-disable n8n-nodes-base/node-dirname-against-convention */
import type { BaseChatMemory } from 'langchain/memory';
import {
CHAT_TRIGGER_NODE_TYPE,
CHAT_WAIT_USER_REPLY,
NodeConnectionTypes,
NodeOperationError,
} from 'n8n-workflow';
import type {
IExecuteFunctions,
INodeExecutionData,
INodeTypeDescription,
INodeType,
INodeProperties,
} from 'n8n-workflow';
import { configureInputs, configureWaitTillDate } from './util';
const limitWaitTimeProperties: INodeProperties[] = [
{
displayName: 'Limit Type',
name: 'limitType',
type: 'options',
default: 'afterTimeInterval',
description:
'Sets the condition for the execution to resume. Can be a specified date or after some time.',
options: [
{
name: 'After Time Interval',
description: 'Waits for a certain amount of time',
value: 'afterTimeInterval',
},
{
name: 'At Specified Time',
description: 'Waits until the set date and time to continue',
value: 'atSpecifiedTime',
},
],
},
{
displayName: 'Amount',
name: 'resumeAmount',
type: 'number',
displayOptions: {
show: {
limitType: ['afterTimeInterval'],
},
},
typeOptions: {
minValue: 0,
numberPrecision: 2,
},
default: 1,
description: 'The time to wait',
},
{
displayName: 'Unit',
name: 'resumeUnit',
type: 'options',
displayOptions: {
show: {
limitType: ['afterTimeInterval'],
},
},
options: [
{
name: 'Minutes',
value: 'minutes',
},
{
name: 'Hours',
value: 'hours',
},
{
name: 'Days',
value: 'days',
},
],
default: 'hours',
description: 'Unit of the interval value',
},
{
displayName: 'Max Date and Time',
name: 'maxDateAndTime',
type: 'dateTime',
displayOptions: {
show: {
limitType: ['atSpecifiedTime'],
},
},
default: '',
description: 'Continue execution after the specified date and time',
},
];
const limitWaitTimeOption: INodeProperties = {
displayName: 'Limit Wait Time',
name: 'limitWaitTime',
type: 'fixedCollection',
description:
'Whether to limit the time this node should wait for a user response before execution resumes',
default: { values: { limitType: 'afterTimeInterval', resumeAmount: 45, resumeUnit: 'minutes' } },
options: [
{
displayName: 'Values',
name: 'values',
values: limitWaitTimeProperties,
},
],
displayOptions: {
show: {
[`/${CHAT_WAIT_USER_REPLY}`]: [true],
},
},
};
export class Chat implements INodeType {
description: INodeTypeDescription = {
displayName: 'Respond to Chat',
name: 'chat',
icon: 'fa:comments',
iconColor: 'black',
group: ['input'],
version: 1,
description: 'Send a message to a chat',
defaults: {
name: 'Respond to Chat',
},
codex: {
categories: ['Core Nodes', 'HITL'],
subcategories: {
HITL: ['Human in the Loop'],
},
alias: ['human', 'wait', 'hitl'],
resources: {
primaryDocumentation: [
{
url: 'https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-langchain.chat/',
},
],
},
},
inputs: `={{ (${configureInputs})($parameter) }}`,
outputs: [NodeConnectionTypes.Main],
properties: [
{
displayName:
"Verify you're using a chat trigger with the 'Response Mode' option set to 'Using Response Nodes'",
name: 'generalNotice',
type: 'notice',
default: '',
},
{
displayName: 'Message',
name: 'message',
type: 'string',
default: '',
required: true,
typeOptions: {
rows: 6,
},
},
{
displayName: 'Wait for User Reply',
name: CHAT_WAIT_USER_REPLY,
type: 'boolean',
default: true,
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Add Memory Input Connection',
name: 'memoryConnection',
type: 'boolean',
default: false,
},
limitWaitTimeOption,
],
},
],
};
async onMessage(
context: IExecuteFunctions,
data: INodeExecutionData,
): Promise<INodeExecutionData[][]> {
const options = context.getNodeParameter('options', 0, {}) as {
memoryConnection?: boolean;
};
const waitForReply = context.getNodeParameter(CHAT_WAIT_USER_REPLY, 0, true) as boolean;
if (!waitForReply) {
const inputData = context.getInputData();
return [inputData];
}
if (options.memoryConnection) {
const memory = (await context.getInputConnectionData(NodeConnectionTypes.AiMemory, 0)) as
| BaseChatMemory
| undefined;
const message = data.json?.chatInput;
if (memory && message) {
await memory.chatHistory.addUserMessage(message as string);
}
}
return [[data]];
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const connectedNodes = this.getParentNodes(this.getNode().name, {
includeNodeParameters: true,
});
const chatTrigger = connectedNodes.find(
(node) => node.type === CHAT_TRIGGER_NODE_TYPE && !node.disabled,
);
if (!chatTrigger) {
throw new NodeOperationError(
this.getNode(),
'Workflow must be started from a chat trigger node',
);
}
const parameters = chatTrigger.parameters as {
mode?: 'hostedChat' | 'webhook';
options: { responseMode: 'lastNode' | 'responseNodes' | 'streaming' | 'responseNode' };
};
if (parameters.mode === 'webhook') {
throw new NodeOperationError(
this.getNode(),
'"Embeded chat" is not supported, change the "Mode" in the chat trigger node to the "Hosted Chat"',
);
}
if (parameters.options.responseMode !== 'responseNodes') {
throw new NodeOperationError(
this.getNode(),
'"Response Mode" in the chat trigger node must be set to "Respond Nodes"',
);
}
const message = (this.getNodeParameter('message', 0) as string) ?? '';
const options = this.getNodeParameter('options', 0, {}) as {
memoryConnection?: boolean;
};
if (options.memoryConnection) {
const memory = (await this.getInputConnectionData(NodeConnectionTypes.AiMemory, 0)) as
| BaseChatMemory
| undefined;
if (memory) {
await memory.chatHistory.addAIChatMessage(message);
}
}
const waitTill = configureWaitTillDate(this);
await this.putExecutionToWait(waitTill);
return [[{ json: {}, sendMessage: message }]];
}
}

View File

@@ -35,27 +35,30 @@ const allowedFileMimeTypeOption: INodeProperties = {
'Allowed file types for upload. Comma-separated list of <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types" target="_blank">MIME types</a>.',
};
const responseModeOptions = [
{
name: 'When Last Node Finishes',
value: 'lastNode',
description: 'Returns data of the last-executed node',
},
{
name: "Using 'Respond to Webhook' Node",
value: 'responseNode',
description: 'Response defined in that node',
},
];
const respondToWebhookResponseMode = {
name: "Using 'Respond to Webhook' Node",
value: 'responseNode',
description: 'Response defined in that node',
};
const responseModeWithStreamingOptions = [
...responseModeOptions,
{
name: 'Streaming Response',
value: 'streaming',
description: 'Streaming response from specified nodes (e.g. Agents)',
},
];
const lastNodeResponseMode = {
name: 'When Last Node Finishes',
value: 'lastNode',
description: 'Returns data of the last-executed node',
};
const streamingResponseMode = {
name: 'Streaming Response',
value: 'streaming',
description: 'Streaming response from specified nodes (e.g. Agents)',
};
const respondNodesResponseMode = {
name: 'Using Response Nodes',
value: 'responseNodes',
description:
"Send responses to the chat by using 'Respond to Chat' or 'Respond to Webhook' nodes",
};
const commonOptionsFields: INodeProperties[] = [
// CORS parameters are only valid for when chat is used in hosted or webhook mode
@@ -209,9 +212,8 @@ export class ChatTrigger extends Node {
icon: 'fa:comments',
iconColor: 'black',
group: ['trigger'],
version: [1, 1.1, 1.2],
// Keep the default version as 1.1 to avoid releasing streaming in broken state
defaultVersion: 1.1,
version: [1, 1.1, 1.2, 1.3],
defaultVersion: 1.3,
description: 'Runs the workflow when an n8n generated webchat is submitted',
defaults: {
name: 'When chat message received',
@@ -390,7 +392,7 @@ export class ChatTrigger extends Node {
displayOptions: {
show: {
public: [false],
'@version': [{ _cnd: { gte: 1.1 } }],
'@version': [1, 1.1],
},
},
placeholder: 'Add Field',
@@ -417,13 +419,13 @@ export class ChatTrigger extends Node {
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
options: responseModeOptions,
options: [lastNodeResponseMode, respondToWebhookResponseMode],
default: 'lastNode',
description: 'When and how to respond to the webhook',
},
],
},
// Options for version 1.2+ (with streaming)
// Options for version 1.2 (with streaming)
{
displayName: 'Options',
name: 'options',
@@ -432,7 +434,7 @@ export class ChatTrigger extends Node {
show: {
mode: ['hostedChat', 'webhook'],
public: [true],
'@version': [{ _cnd: { gte: 1.2 } }],
'@version': [1.2],
},
},
placeholder: 'Add Field',
@@ -443,12 +445,72 @@ export class ChatTrigger extends Node {
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
options: responseModeWithStreamingOptions,
options: [lastNodeResponseMode, respondToWebhookResponseMode, streamingResponseMode],
default: 'lastNode',
description: 'When and how to respond to the webhook',
},
],
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
displayOptions: {
show: {
public: [false],
'@version': [{ _cnd: { gte: 1.3 } }],
},
},
placeholder: 'Add Field',
default: {},
options: [
allowFileUploadsOption,
allowedFileMimeTypeOption,
{
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
options: [lastNodeResponseMode, respondNodesResponseMode],
default: 'lastNode',
description: 'When and how to respond to the chat',
},
],
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
displayOptions: {
show: {
mode: ['hostedChat', 'webhook'],
public: [true],
'@version': [{ _cnd: { gte: 1.3 } }],
},
},
placeholder: 'Add Field',
default: {},
options: [
...commonOptionsFields,
{
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
options: [lastNodeResponseMode, respondToWebhookResponseMode],
default: 'lastNode',
description: 'When and how to respond to the chat',
displayOptions: { show: { '/mode': ['webhook'] } },
},
{
displayName: 'Response Mode',
name: 'responseMode',
type: 'options',
options: [lastNodeResponseMode, respondNodesResponseMode],
default: 'lastNode',
description: 'When and how to respond to the webhook',
displayOptions: { show: { '/mode': ['hostedChat'] } },
},
],
},
],
};
@@ -536,10 +598,10 @@ export class ChatTrigger extends Node {
allowFileUploads?: boolean;
allowedFilesMimeTypes?: string;
customCss?: string;
responseMode?: string;
};
const responseMode = ctx.getNodeParameter('options.responseMode', 'lastNode') as string;
const enableStreaming = responseMode === 'streaming';
const enableStreaming = options.responseMode === 'streaming';
const req = ctx.getRequestObject();
const webhookName = ctx.getWebhookName();

View File

@@ -0,0 +1,143 @@
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import type { INode, IExecuteFunctions } from 'n8n-workflow';
import { CHAT_TRIGGER_NODE_TYPE } from 'n8n-workflow';
import { Chat } from '../Chat.node';
describe('Test Chat Node', () => {
let chat: Chat;
let mockExecuteFunctions: MockProxy<IExecuteFunctions>;
const chatNode = mock<INode>({
name: 'Chat',
type: CHAT_TRIGGER_NODE_TYPE,
parameters: {},
});
beforeEach(() => {
chat = new Chat();
mockExecuteFunctions = mock<IExecuteFunctions>();
});
afterEach(() => {
jest.clearAllMocks();
});
it('should execute and send message', async () => {
const items = [{ json: { data: 'test' } }];
mockExecuteFunctions.getInputData.mockReturnValue(items);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce('message');
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce(false);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce({
limitType: 'afterTimeInterval',
resumeAmount: 1,
resumeUnit: 'minutes',
});
mockExecuteFunctions.getNode.mockReturnValue(chatNode);
mockExecuteFunctions.getParentNodes.mockReturnValue([
{
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { mode: 'hostedChat', options: { responseMode: 'responseNodes' } },
} as any,
]);
const result = await chat.execute.call(mockExecuteFunctions);
expect(result).toEqual([[{ json: {}, sendMessage: 'message' }]]);
});
it('should execute and handle memory connection', async () => {
const items = [{ json: { data: 'test' } }];
mockExecuteFunctions.getInputData.mockReturnValue(items);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce('message');
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce({ memoryConnection: true });
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce({
limitType: 'afterTimeInterval',
resumeAmount: 1,
resumeUnit: 'minutes',
});
mockExecuteFunctions.getNode.mockReturnValue(chatNode);
mockExecuteFunctions.getParentNodes.mockReturnValue([
{
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { mode: 'hostedChat', options: { responseMode: 'responseNodes' } },
} as any,
]);
const memory = { chatHistory: { addAIChatMessage: jest.fn() } };
mockExecuteFunctions.getInputConnectionData.mockResolvedValueOnce(memory);
await chat.execute.call(mockExecuteFunctions);
expect(memory.chatHistory.addAIChatMessage).toHaveBeenCalledWith('message');
});
it('should execute without memory connection', async () => {
const items = [{ json: { data: 'test' } }];
mockExecuteFunctions.getInputData.mockReturnValue(items);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce('message');
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce(false);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce({
limitType: 'afterTimeInterval',
resumeAmount: 1,
resumeUnit: 'minutes',
});
mockExecuteFunctions.getNode.mockReturnValue(chatNode);
mockExecuteFunctions.getParentNodes.mockReturnValue([
{
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { mode: 'hostedChat', options: { responseMode: 'responseNodes' } },
} as any,
]);
const result = await chat.execute.call(mockExecuteFunctions);
expect(result).toEqual([[{ json: {}, sendMessage: 'message' }]]);
});
it('should execute with specified time limit', async () => {
const items = [{ json: { data: 'test' } }];
mockExecuteFunctions.getInputData.mockReturnValue(items);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce('message');
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce(false);
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce({
limitType: 'atSpecifiedTime',
maxDateAndTime: new Date().toISOString(),
});
mockExecuteFunctions.getNode.mockReturnValue(chatNode);
mockExecuteFunctions.getParentNodes.mockReturnValue([
{
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { mode: 'hostedChat', options: { responseMode: 'responseNodes' } },
} as any,
]);
const result = await chat.execute.call(mockExecuteFunctions);
expect(result).toEqual([[{ json: {}, sendMessage: 'message' }]]);
});
it('should process onMessage without waiting for reply', async () => {
const data = { json: { chatInput: 'user message' } };
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce({ memoryConnection: true });
mockExecuteFunctions.getNodeParameter.mockReturnValueOnce(false);
mockExecuteFunctions.getInputData.mockReturnValue([data]);
mockExecuteFunctions.getNode.mockReturnValue(chatNode);
mockExecuteFunctions.getParentNodes.mockReturnValue([
{
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { mode: 'hostedChat', options: { responseMode: 'responseNodes' } },
} as any,
]);
const result = await chat.onMessage(mockExecuteFunctions, data);
expect(result).toEqual([[data]]);
});
});

View File

@@ -150,8 +150,7 @@ describe('ChatTrigger Node', () => {
): boolean | string | object | undefined => {
if (paramName === 'public') return true;
if (paramName === 'mode') return 'hostedChat';
if (paramName === 'options') return {};
if (paramName === 'options.responseMode') return 'streaming';
if (paramName === 'options') return { responseMode: 'streaming' };
return defaultValue;
},
);
@@ -184,8 +183,7 @@ describe('ChatTrigger Node', () => {
): boolean | string | object | undefined => {
if (paramName === 'public') return true;
if (paramName === 'mode') return 'hostedChat';
if (paramName === 'options') return {};
if (paramName === 'options.responseMode') return 'lastNode';
if (paramName === 'options') return { responseMode: 'lastNode' };
return defaultValue;
},
);
@@ -220,8 +218,7 @@ describe('ChatTrigger Node', () => {
): boolean | string | object | undefined => {
if (paramName === 'public') return true;
if (paramName === 'mode') return 'hostedChat';
if (paramName === 'options') return {};
if (paramName === 'options.responseMode') return 'streaming';
if (paramName === 'options') return { responseMode: 'streaming' };
return defaultValue;
},
);

View File

@@ -77,7 +77,7 @@ export function createPage({
</head>
<body>
<script type="module">
import { createChat } from 'https://cdn.jsdelivr.net/npm/@n8n/chat/dist/chat.bundle.es.js';
import { createChat } from 'https://cdn.jsdelivr.net/npm/n8n-chat-atekron@0.49.0/dist/chat.bundle.es.js';
(async function () {
const authentication = '${sanitizedAuthentication}';

View File

@@ -0,0 +1,67 @@
import { NodeOperationError, UserError, WAIT_INDEFINITELY } from 'n8n-workflow';
import type { IExecuteFunctions } from 'n8n-workflow';
export function configureWaitTillDate(context: IExecuteFunctions) {
let waitTill = WAIT_INDEFINITELY;
const limitOptions = context.getNodeParameter('options.limitWaitTime.values', 0, {}) as {
limitType?: string;
resumeAmount?: number;
resumeUnit?: string;
maxDateAndTime?: string;
};
if (Object.keys(limitOptions).length) {
try {
if (limitOptions.limitType === 'afterTimeInterval') {
let waitAmount = limitOptions.resumeAmount as number;
if (limitOptions.resumeUnit === 'minutes') {
waitAmount *= 60;
}
if (limitOptions.resumeUnit === 'hours') {
waitAmount *= 60 * 60;
}
if (limitOptions.resumeUnit === 'days') {
waitAmount *= 60 * 60 * 24;
}
waitAmount *= 1000;
waitTill = new Date(new Date().getTime() + waitAmount);
} else {
waitTill = new Date(limitOptions.maxDateAndTime as string);
}
if (isNaN(waitTill.getTime())) {
throw new UserError('Invalid date format');
}
} catch (error) {
throw new NodeOperationError(context.getNode(), 'Could not configure Limit Wait Time', {
description: error.message,
});
}
}
return waitTill;
}
export const configureInputs = (parameters: { options?: { memoryConnection?: boolean } }) => {
const inputs = [
{
type: 'main',
displayName: 'User Response',
},
];
if (parameters.options?.memoryConnection) {
return [
...inputs,
{
type: 'ai_memory',
displayName: 'Memory',
maxConnections: 1,
},
];
}
return inputs;
};

View File

@@ -125,6 +125,7 @@
"dist/nodes/tools/ToolWorkflow/ToolWorkflow.node.js",
"dist/nodes/trigger/ManualChatTrigger/ManualChatTrigger.node.js",
"dist/nodes/trigger/ChatTrigger/ChatTrigger.node.js",
"dist/nodes/trigger/ChatTrigger/Chat.node.js",
"dist/nodes/vector_store/VectorStoreInMemory/VectorStoreInMemory.node.js",
"dist/nodes/vector_store/VectorStoreInMemoryInsert/VectorStoreInMemoryInsert.node.js",
"dist/nodes/vector_store/VectorStoreInMemoryLoad/VectorStoreInMemoryLoad.node.js",

View File

@@ -117,6 +117,20 @@ export function getSessionId(
sessionId = bodyData.sessionId as string;
} else {
sessionId = ctx.evaluateExpression('{{ $json.sessionId }}', itemIndex) as string;
// try to get sessionId from chat trigger
if (!sessionId || sessionId === undefined) {
try {
const chatTrigger = ctx.getChatTrigger();
if (chatTrigger) {
sessionId = ctx.evaluateExpression(
`{{ $('${chatTrigger.name}').first().json.sessionId }}`,
itemIndex,
) as string;
}
} catch (error) {}
}
}
if (sessionId === '' || sessionId === undefined) {

View File

@@ -10,6 +10,7 @@ import {
getConnectedTools,
hasLongSequentialRepeat,
unwrapNestedOutput,
getSessionId,
} from '../helpers';
import { N8nTool } from '../N8nTool';
@@ -376,6 +377,52 @@ describe('unwrapNestedOutput', () => {
});
});
describe('getSessionId', () => {
let mockCtx: any;
beforeEach(() => {
mockCtx = {
getNodeParameter: jest.fn(),
evaluateExpression: jest.fn(),
getChatTrigger: jest.fn(),
getNode: jest.fn(),
};
});
it('should retrieve sessionId from bodyData', () => {
mockCtx.getBodyData = jest.fn();
mockCtx.getNodeParameter.mockReturnValue('fromInput');
mockCtx.getBodyData.mockReturnValue({ sessionId: '12345' });
const sessionId = getSessionId(mockCtx, 0);
expect(sessionId).toBe('12345');
});
it('should retrieve sessionId from chat trigger', () => {
mockCtx.getNodeParameter.mockReturnValue('fromInput');
mockCtx.evaluateExpression.mockReturnValueOnce(undefined);
mockCtx.getChatTrigger.mockReturnValue({ name: 'chatTrigger' });
mockCtx.evaluateExpression.mockReturnValueOnce('67890');
const sessionId = getSessionId(mockCtx, 0);
expect(sessionId).toBe('67890');
});
it('should throw error if sessionId is not found', () => {
mockCtx.getNodeParameter.mockReturnValue('fromInput');
mockCtx.evaluateExpression.mockReturnValue(undefined);
mockCtx.getChatTrigger.mockReturnValue(undefined);
expect(() => getSessionId(mockCtx, 0)).toThrow(NodeOperationError);
});
it('should use custom sessionId if provided', () => {
mockCtx.getNodeParameter.mockReturnValueOnce('custom').mockReturnValueOnce('customSessionId');
const sessionId = getSessionId(mockCtx, 0);
expect(sessionId).toBe('customSessionId');
});
});
describe('hasLongSequentialRepeat', () => {
it('should return false for text shorter than threshold', () => {
const text = 'a'.repeat(99);

View File

@@ -0,0 +1,292 @@
import { ExecutionRepository } from '@n8n/db';
import type { IExecutionResponse } from '@n8n/db';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { WorkflowRunner } from '@/workflow-runner';
import { mockInstance } from '@n8n/backend-test-utils';
import { NodeTypes } from '../../node-types';
import { OwnershipService } from '../../services/ownership.service';
import { ChatExecutionManager } from '../chat-execution-manager';
import type { ChatMessage } from '../chat-service.types';
describe('ChatExecutionManager', () => {
const executionRepository = mockInstance(ExecutionRepository);
const workflowRunner = mockInstance(WorkflowRunner);
const ownershipService = mockInstance(OwnershipService);
const nodeTypes = mockInstance(NodeTypes);
const chatExecutionManager = new ChatExecutionManager(
executionRepository,
workflowRunner,
ownershipService,
nodeTypes,
);
beforeEach(() => {
jest.restoreAllMocks();
});
it('should handle errors from getRunData gracefully', async () => {
const execution = { id: '1', workflowData: {}, data: {} } as IExecutionResponse;
const message = { sessionId: '123', action: 'sendMessage', chatInput: 'input' } as ChatMessage;
jest
.spyOn(chatExecutionManager as any, 'getRunData')
.mockRejectedValue(new Error('Test error'));
await expect(chatExecutionManager.runWorkflow(execution, message)).rejects.toThrow(
'Test error',
);
});
describe('runWorkflow', () => {
it('should call WorkflowRunner.run with correct parameters', async () => {
const execution = { id: '1', workflowData: {}, data: {} } as IExecutionResponse;
const message = {
sessionId: '123',
action: 'sendMessage',
chatInput: 'input',
} as ChatMessage;
const runData = { executionMode: 'manual', executionData: {}, workflowData: {} } as any;
jest.spyOn(chatExecutionManager as any, 'getRunData').mockResolvedValue(runData);
await chatExecutionManager.runWorkflow(execution, message);
expect(workflowRunner.run).toHaveBeenCalledWith(runData, true, true, '1');
});
});
describe('cancelExecution', () => {
it('should update execution status to canceled if it is running', async () => {
const executionId = '1';
const execution = { id: executionId, status: 'running' } as any;
executionRepository.findSingleExecution.mockResolvedValue(execution);
await chatExecutionManager.cancelExecution(executionId);
expect(executionRepository.update).toHaveBeenCalledWith(
{ id: executionId },
{ status: 'canceled' },
);
});
it('should update execution status to canceled if it is waiting', async () => {
const executionId = '2';
const execution = { id: executionId, status: 'waiting' } as any;
executionRepository.findSingleExecution.mockResolvedValue(execution);
await chatExecutionManager.cancelExecution(executionId);
expect(executionRepository.update).toHaveBeenCalledWith(
{ id: executionId },
{ status: 'canceled' },
);
});
it('should update execution status to canceled if it is unknown', async () => {
const executionId = '3';
const execution = { id: executionId, status: 'unknown' } as any;
executionRepository.findSingleExecution.mockResolvedValue(execution);
await chatExecutionManager.cancelExecution(executionId);
expect(executionRepository.update).toHaveBeenCalledWith(
{ id: executionId },
{ status: 'canceled' },
);
});
it('should not update execution status if it is not running', async () => {
const executionId = '1';
const execution = { id: executionId, status: 'completed' } as any;
executionRepository.findSingleExecution.mockResolvedValue(execution);
await chatExecutionManager.cancelExecution(executionId);
expect(executionRepository.update).not.toHaveBeenCalled();
});
});
describe('findExecution', () => {
it('should return undefined if execution does not exist', async () => {
const executionId = 'non-existent';
executionRepository.findSingleExecution.mockResolvedValue(undefined);
const result = await chatExecutionManager.findExecution(executionId);
expect(result).toBeUndefined;
});
it('should return execution data', async () => {
const executionId = '1';
const execution = { id: executionId } as any;
executionRepository.findSingleExecution.mockResolvedValue(execution);
const result = await chatExecutionManager.findExecution(executionId);
expect(result).toEqual(execution);
});
});
describe('getRunData', () => {
it('should call runNode with correct parameters and return runData', async () => {
const execution = {
id: '1',
workflowData: { id: 'workflowId' },
data: {
resultData: { pinData: {} },
executionData: { nodeExecutionStack: [{ data: { main: [[]] } }] },
pushRef: 'pushRef',
},
mode: 'manual',
} as any;
const message = {
sessionId: '123',
action: 'sendMessage',
chatInput: 'input',
} as ChatMessage;
const project = { id: 'projectId' };
const nodeExecutionData = [[{ json: message }]];
const getRunDataSpy = jest
.spyOn(chatExecutionManager as any, 'runNode')
.mockResolvedValue(nodeExecutionData);
const getWorkflowProjectCachedSpy = jest
.spyOn(ownershipService, 'getWorkflowProjectCached')
.mockResolvedValue(project as any);
const runData = await (chatExecutionManager as any).getRunData(execution, message);
expect(getRunDataSpy).toHaveBeenCalledWith(execution, message);
expect(getWorkflowProjectCachedSpy).toHaveBeenCalledWith('workflowId');
expect(runData).toEqual({
executionMode: 'manual',
executionData: execution.data,
pushRef: execution.data.pushRef,
workflowData: execution.workflowData,
pinData: execution.data.resultData.pinData,
projectId: 'projectId',
});
});
});
describe('runNode', () => {
it('should return null if node is not found', async () => {
const execution = {
id: '1',
workflowData: { id: 'workflowId' },
data: {
resultData: { lastNodeExecuted: 'nodeId' },
executionData: { nodeExecutionStack: [{ data: { main: [[]] } }] },
},
mode: 'manual',
} as any;
const message = {
sessionId: '123',
action: 'sendMessage',
chatInput: 'input',
} as ChatMessage;
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any);
const workflow = { getNode: jest.fn().mockReturnValue(null) };
jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow);
const result = await (chatExecutionManager as any).runNode(execution, message);
expect(result).toBeNull();
});
it('should return null if executionData is undefined', async () => {
const execution = {
id: '1',
workflowData: { id: 'workflowId' },
data: {
resultData: { lastNodeExecuted: 'nodeId' },
executionData: { nodeExecutionStack: [] },
},
mode: 'manual',
} as any;
const message = {
sessionId: '123',
action: 'sendMessage',
chatInput: 'input',
} as ChatMessage;
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any);
const workflow = { getNode: jest.fn().mockReturnValue({}) };
jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow);
const result = await (chatExecutionManager as any).runNode(execution, message);
expect(result).toBeNull();
});
it('should call nodeType.onMessage with correct parameters and return the result', async () => {
const execution = {
id: '1',
workflowData: { id: 'workflowId' },
data: {
resultData: { lastNodeExecuted: 'nodeId' },
executionData: { nodeExecutionStack: [{ data: { main: [[{}]] } }] },
},
mode: 'manual',
} as any;
const message = {
sessionId: '123',
action: 'sendMessage',
chatInput: 'input',
files: [],
} as ChatMessage;
const node = { type: 'testType', typeVersion: 1 };
const nodeType = { onMessage: jest.fn().mockResolvedValue([[{ json: message }]]) };
const workflow = {
getNode: jest.fn().mockReturnValue(node),
nodeTypes: { getByNameAndVersion: jest.fn().mockReturnValue(nodeType) },
};
jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow);
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any);
const result = await (chatExecutionManager as any).runNode(execution, message);
expect(workflow.nodeTypes.getByNameAndVersion).toHaveBeenCalledWith('testType', 1);
expect(nodeType.onMessage).toHaveBeenCalled();
expect(result).toEqual([[{ json: message }]]);
});
it('should return nodeExecutionData with sessionId, action and chatInput', async () => {
const execution = {
id: '1',
workflowData: { id: 'workflowId' },
data: {
resultData: { lastNodeExecuted: 'nodeId' },
executionData: { nodeExecutionStack: [{ data: { main: [[{}]] } }] },
},
mode: 'manual',
} as any;
const message = {
sessionId: '123',
action: 'sendMessage',
chatInput: 'input',
} as ChatMessage;
const node = { type: 'testType', typeVersion: 1 };
const nodeType = { onMessage: jest.fn().mockResolvedValue([[{ json: message }]]) };
const workflow = {
getNode: jest.fn().mockReturnValue(node),
nodeTypes: { getByNameAndVersion: jest.fn().mockReturnValue(nodeType) },
};
jest.spyOn(chatExecutionManager as any, 'getWorkflow').mockReturnValue(workflow);
jest.spyOn(WorkflowExecuteAdditionalData, 'getBase').mockResolvedValue({} as any);
const result = await (chatExecutionManager as any).runNode(execution, message);
expect(result).toEqual([[{ json: message }]]);
});
});
});

View File

@@ -0,0 +1,86 @@
import type { Application } from 'express';
import { ServerResponse } from 'http';
import type { Server as HttpServer } from 'http';
import type { IncomingMessage } from 'http';
import { mock, mockReset } from 'jest-mock-extended';
import type { WebSocket } from 'ws';
import type { WebSocketServer } from 'ws';
import { ChatServer } from '../chat-server';
import type { ChatService } from '../chat-service';
import type { ChatRequest } from '../chat-service.types';
jest.mock('ws');
describe('ChatServer', () => {
const mockChatService = mock<ChatService>();
const mockWsServer = mock<WebSocketServer>();
const mockApp = mock<Application>() as unknown as Application & {
handle: (req: IncomingMessage, res: ServerResponse) => void;
};
mockApp.handle = jest.fn();
const mockHttpServer = mock<HttpServer>();
let chatServer: ChatServer;
beforeEach(() => {
mockReset(mockChatService);
mockReset(mockWsServer);
mockReset(mockApp);
mockReset(mockHttpServer);
chatServer = new ChatServer(mockChatService);
(chatServer as any).wsServer = mockWsServer;
});
it('attaches upgrade listener to HTTP server', () => {
chatServer.setup(mockHttpServer, mockApp);
expect(mockHttpServer.on).toHaveBeenCalledWith('upgrade', expect.any(Function));
});
it('handles WebSocket upgrade for /chat path', () => {
chatServer.setup(mockHttpServer, mockApp);
const req = {
url: 'http://localhost:5678/chat?sessionId=123&executionId=456',
socket: { remoteAddress: '127.0.0.1' },
} as ChatRequest;
const socket = {} as any;
const head = {} as any;
const upgradeHandler = mockHttpServer.on.mock.calls[0][1];
upgradeHandler(req, socket, head);
expect(mockWsServer.handleUpgrade).toHaveBeenCalledWith(
req,
socket,
head,
expect.any(Function),
);
});
it('calls attachToApp after WebSocket upgrade', () => {
chatServer.setup(mockHttpServer, mockApp);
const req = {
url: 'http://localhost:5678/chat?sessionId=123&executionId=456',
socket: { remoteAddress: '127.0.0.1' },
} as ChatRequest;
const socket = {} as any;
const head = {} as any;
const ws = {} as WebSocket;
const upgradeHandler = mockHttpServer.on.mock.calls[0][1];
upgradeHandler(req, socket, head);
const handleUpgradeCb = mockWsServer.handleUpgrade.mock.calls[0][3];
handleUpgradeCb(ws, req);
expect(req.ws).toBe(ws);
expect(mockApp.handle).toHaveBeenCalledWith(
expect.objectContaining({ ws }),
expect.any(ServerResponse),
);
});
});

View File

@@ -0,0 +1,399 @@
import type { Logger } from '@n8n/backend-common';
import { mock } from 'jest-mock-extended';
import { WebSocket } from 'ws';
import type { ChatExecutionManager } from '../chat-execution-manager';
import { ChatService } from '../chat-service';
import type { ChatRequest } from '../chat-service.types';
import type { ErrorReporter } from 'n8n-core';
describe('ChatService', () => {
let mockExecutionManager: ReturnType<typeof mock<ChatExecutionManager>>;
let mockLogger: ReturnType<typeof mock<Logger>>;
let mockErrorReporter: ReturnType<typeof mock<ErrorReporter>>;
let chatService: ChatService;
let mockWs: ReturnType<typeof mock<WebSocket>>;
beforeAll(() => {
jest.useFakeTimers();
});
afterAll(() => {
jest.useRealTimers();
});
beforeEach(() => {
mockExecutionManager = mock<ChatExecutionManager>();
mockLogger = mock<Logger>();
mockErrorReporter = mock<ErrorReporter>();
chatService = new ChatService(mockExecutionManager, mockLogger, mockErrorReporter);
mockWs = mock<WebSocket>();
});
it('should handle missing execution gracefully', async () => {
const req = {
ws: mockWs,
query: {
sessionId: '123',
executionId: '42',
isPublic: false,
},
} as unknown as ChatRequest;
mockExecutionManager.findExecution.mockResolvedValue(undefined);
try {
await chatService.startSession(req);
} catch (error) {
expect(error).toBeDefined();
expect(mockWs.send).toHaveBeenCalledWith('Execution with id "42" does not exist');
}
});
it('should handle missing WebSocket connection gracefully', async () => {
const req = {
ws: null,
query: {
sessionId: 'abc',
executionId: '123',
isPublic: false,
},
} as unknown as ChatRequest;
await expect(chatService.startSession(req)).rejects.toThrow('WebSocket connection is missing');
});
describe('startSession', () => {
it('should start a session and store it in sessions map', async () => {
const mockWs = mock<WebSocket>();
(mockWs as any).readyState = WebSocket.OPEN;
const req = {
ws: mockWs,
query: {
sessionId: 'abc',
executionId: '123',
isPublic: true,
},
} as unknown as ChatRequest;
mockExecutionManager.checkIfExecutionExists.mockResolvedValue({ id: '123' } as any);
await chatService.startSession(req);
const sessionKey = 'abc|123|public';
const session = (chatService as any).sessions.get(sessionKey);
expect(session).toBeDefined();
expect(session?.sessionId).toBe('abc');
expect(session?.executionId).toBe('123');
expect(session?.isPublic).toBe(true);
expect(typeof session?.intervalId).toBe('object');
});
it('should terminate existing session if the same key is used and clear interval', async () => {
const clearIntervalSpy = jest.spyOn(global, 'clearInterval').mockImplementation();
const req = {
ws: mockWs,
query: {
sessionId: 'abc',
executionId: '123',
isPublic: false,
},
} as unknown as ChatRequest;
const previousConnection = mock<WebSocket>();
(previousConnection as any).readyState = WebSocket.OPEN;
const dummyInterval = setInterval(() => {}, 9999);
const sessionKey = 'abc|123|integrated';
(chatService as any).sessions.set(sessionKey, {
connection: previousConnection,
executionId: '123',
sessionId: 'abc',
intervalId: dummyInterval,
waitingForResponse: false,
isPublic: false,
});
mockExecutionManager.checkIfExecutionExists.mockResolvedValue({ id: '123' } as any);
await chatService.startSession(req);
expect(previousConnection.terminate).toHaveBeenCalled();
expect(clearIntervalSpy).toHaveBeenCalledWith(dummyInterval);
expect((chatService as any).sessions.get(sessionKey).connection).toBe(mockWs);
clearIntervalSpy.mockRestore();
});
describe('checkHeartbeats', () => {
it('should terminate sessions that have not sent a heartbeat recently', async () => {
const sessionKey = 'abc|123|public';
const session = {
executionId: '123',
connection: mockWs,
lastHeartbeat: Date.now() - 61 * 1000,
intervalId: 123,
};
(chatService as any).sessions.set(sessionKey, session);
mockExecutionManager.cancelExecution.mockResolvedValue(undefined);
mockWs.terminate.mockImplementation(() => {});
jest.spyOn(global, 'clearInterval').mockImplementation(() => {});
await (chatService as any).checkHeartbeats();
expect(mockExecutionManager.cancelExecution).toHaveBeenCalledWith('123');
expect(mockWs.terminate).toHaveBeenCalled();
expect(clearInterval).toHaveBeenCalledWith(123);
expect((chatService as any).sessions.get(sessionKey)).toBeUndefined();
});
it('should remove sessions whose connection throws an error when sending a heartbeat', async () => {
const sessionKey = 'abc|123|public';
const session = {
executionId: '123',
connection: mockWs,
lastHeartbeat: Date.now(),
intervalId: 123,
};
(chatService as any).sessions.set(sessionKey, session);
mockWs.send.mockImplementation(() => {
throw new Error('Connection error');
});
jest.spyOn(global, 'clearInterval').mockImplementation(() => {});
await (chatService as any).checkHeartbeats();
expect(mockWs.send).toHaveBeenCalledWith('n8n|heartbeat');
expect(clearInterval).toHaveBeenCalledWith(123);
expect((chatService as any).sessions.get(sessionKey)).toBeUndefined();
});
it('should check heartbeats and maintain sessions', async () => {
const sessionKey = 'abc|123|public';
mockWs.send.mockImplementation(() => {});
const session = {
executionId: '123',
connection: mockWs,
lastHeartbeat: Date.now(),
intervalId: 123,
};
(chatService as any).sessions.set(sessionKey, session);
await (chatService as any).checkHeartbeats();
expect(mockWs.send).toHaveBeenCalledWith('n8n|heartbeat');
expect((chatService as any).sessions.get(sessionKey)).toBeDefined();
});
});
});
describe('incomingMessageHandler', () => {
it('should return if session does not exist', async () => {
const sessionKey = 'nonexistent';
const data = 'test data';
const incomingMessageHandler = (chatService as any).incomingMessageHandler(sessionKey);
await incomingMessageHandler(data);
expect(mockExecutionManager.runWorkflow).not.toHaveBeenCalled();
});
it('should handle heartbeat acknowledgement', async () => {
const sessionKey = 'abc|123|public';
const session = {
executionId: '123',
lastHeartbeat: 0,
};
(chatService as any).sessions.set(sessionKey, session);
const data = 'n8n|heartbeat-ack';
const incomingMessageHandler = (chatService as any).incomingMessageHandler(sessionKey);
await incomingMessageHandler(data);
expect(session.lastHeartbeat).not.toBe(0);
expect(mockExecutionManager.runWorkflow).not.toHaveBeenCalled();
});
it('should resume execution with processed message', async () => {
const sessionKey = 'abc|123|public';
const session = {
executionId: '123',
nodeWaitingForChatResponse: 'test',
};
(chatService as any).sessions.set(sessionKey, session);
const data = JSON.stringify({ action: 'sendMessage', chatInput: 'hello', sessionId: 'abc' });
mockExecutionManager.findExecution.mockResolvedValue({
id: '123',
status: 'waiting',
data: { resultData: {} },
} as any);
const incomingMessageHandler = (chatService as any).incomingMessageHandler(sessionKey);
await incomingMessageHandler(data);
expect(mockExecutionManager.runWorkflow).toHaveBeenCalled();
expect(session.nodeWaitingForChatResponse).toBeUndefined();
});
it('should handle errors during message processing', async () => {
const sessionKey = 'abc|123|public';
const session = {
executionId: '123',
};
(chatService as any).sessions.set(sessionKey, session);
const data = 'invalid json';
const incomingMessageHandler = (chatService as any).incomingMessageHandler(sessionKey);
await incomingMessageHandler(data);
expect(mockLogger.error).toHaveBeenCalled();
});
});
describe('pollAndProcessChatResponses', () => {
it('should return if session does not exist', async () => {
const sessionKey = 'nonexistent';
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(mockExecutionManager.findExecution).not.toHaveBeenCalled();
});
it('should return if session is processing', async () => {
const sessionKey = 'abc|123|public';
(chatService as any).sessions.set(sessionKey, { isProcessing: true });
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(mockExecutionManager.findExecution).not.toHaveBeenCalled();
});
it('should return if execution does not exist', async () => {
const sessionKey = 'abc|123|public';
(chatService as any).sessions.set(sessionKey, {
isProcessing: false,
executionId: '123',
nodeWaitingForChatResponse: undefined,
});
mockExecutionManager.findExecution.mockResolvedValue(undefined);
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(mockWs.send).not.toHaveBeenCalled();
});
it('should send continue if execution status is waiting and last node name is different from nodeWaitingForChatResponse', async () => {
const sessionKey = 'abc|123|public';
const session = {
isProcessing: false,
executionId: '123',
connection: { send: jest.fn() },
nodeWaitingForChatResponse: 'node1',
};
(chatService as any).sessions.set(sessionKey, session);
mockExecutionManager.findExecution.mockResolvedValue({
status: 'waiting',
data: { resultData: { lastNodeExecuted: 'node2' } },
workflowData: { nodes: [{ name: 'node1' }] },
} as any);
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(session.connection.send).toHaveBeenCalledWith('n8n|continue');
expect(session.nodeWaitingForChatResponse).toBeUndefined();
});
it('should send message if execution status is waiting and a message exists', async () => {
const sessionKey = 'abc|123|public';
const session = {
isProcessing: false,
executionId: '123',
connection: { send: jest.fn() },
sessionId: 'abc',
nodeWaitingForChatResponse: undefined,
};
(chatService as any).sessions.set(sessionKey, session);
mockExecutionManager.findExecution.mockResolvedValue({
status: 'waiting',
data: {
resultData: {
lastNodeExecuted: 'node1',
runData: { node1: [{ data: { main: [[{ sendMessage: 'test message' }]] } }] },
},
},
workflowData: { nodes: [{ name: 'node1' }] },
} as any);
(chatService as any).shouldResumeImmediately = jest.fn().mockReturnValue(false);
(chatService as any).resumeExecution = jest.fn();
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(session.connection.send).toHaveBeenCalledWith('test message');
expect(session.nodeWaitingForChatResponse).toEqual('node1');
});
it('should close connection if execution status is success and shouldNotReturnLastNodeResponse is false', async () => {
const sessionKey = 'abc|123|public';
const session = {
isProcessing: false,
executionId: '123',
connection: { close: jest.fn(), readyState: 1, once: jest.fn() },
isPublic: false,
};
(chatService as any).sessions.set(sessionKey, session);
mockExecutionManager.findExecution.mockResolvedValue({
status: 'success',
data: { resultData: { lastNodeExecuted: 'node1' } },
workflowData: { nodes: [{ type: 'n8n-core.respondToWebhook', name: 'node1' }] },
mode: 'manual',
} as any);
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(session.connection.once).toHaveBeenCalled();
expect(session.connection.once).toHaveBeenCalledWith('drain', expect.any(Function));
});
it('should handle errors during message processing', async () => {
const sessionKey = 'abc|123|public';
const session = {
isProcessing: false,
executionId: '123',
connection: mockWs,
nodeWaitingForChatResponse: undefined,
};
(chatService as any).sessions.set(sessionKey, session);
mockExecutionManager.findExecution.mockRejectedValue(new Error('test error'));
const pollAndProcessChatResponses = (chatService as any).pollAndProcessChatResponses(
sessionKey,
);
await pollAndProcessChatResponses();
expect(mockLogger.error).toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,303 @@
import type { IExecutionResponse } from '@n8n/db';
import type { IDataObject, INode } from 'n8n-workflow';
import { CHAT_WAIT_USER_REPLY, RESPOND_TO_WEBHOOK_NODE_TYPE } from 'n8n-workflow';
import { getMessage, getLastNodeExecuted, shouldResumeImmediately } from '../utils';
// helpers --------------------------------------------------------
const createMockExecution = (
overrides: IDataObject = {},
firstExecutionData?: IDataObject,
nodeData?: IDataObject[],
): IExecutionResponse => {
const firstItem = firstExecutionData ?? {
json: { test: 'data' },
sendMessage: 'Test message',
};
const nodeRunData = nodeData ?? [
{
data: {
main: [[firstItem]],
},
},
];
return {
id: 'test-execution-id',
data: {
resultData: {
lastNodeExecuted: 'TestNode',
runData: {
TestNode: nodeRunData,
},
},
},
workflowData: {
nodes: [
{
name: 'TestNode',
type: 'test-node',
parameters: {},
},
],
},
...overrides,
} as unknown as IExecutionResponse;
};
const createMockNode = (overrides: Partial<INode> = {}): INode =>
({
name: 'TestNode',
type: 'test-node',
parameters: {},
...overrides,
}) as INode;
// ---------------------------------------------------------
describe('getMessage', () => {
it('should return sendMessage from the last node execution', () => {
const execution = createMockExecution();
const result = getMessage(execution);
expect(result).toBe('Test message');
});
it('should return undefined when no sendMessage exists', () => {
const execution = createMockExecution(
{},
{
json: { test: 'data' },
},
);
const result = getMessage(execution);
expect(result).toBeUndefined();
});
it('should return undefined when nodeExecutionData is empty', () => {
const execution = createMockExecution({}, undefined, [
{
data: {
main: [[]],
},
},
]);
const result = getMessage(execution);
expect(result).toBeUndefined();
});
it('should handle multiple run data entries and use the last one', () => {
const execution = createMockExecution({}, undefined, [
{
data: {
main: [
[
{
json: { test: 'first' },
sendMessage: 'First message',
},
],
],
},
},
{
data: {
main: [
[
{
json: { test: 'second' },
sendMessage: 'Second message',
},
],
],
},
},
]);
const result = getMessage(execution);
expect(result).toBe('Second message');
});
it('should return undefined when main data is missing', () => {
const execution = createMockExecution({}, undefined, [
{
data: {},
},
]);
const result = getMessage(execution);
expect(result).toBeUndefined();
});
it('should return undefined when nodeExecutionData is undefined', () => {
const execution = createMockExecution({
data: {
resultData: {
lastNodeExecuted: 'TestNode',
runData: {
TestNode: [
{
data: {
main: undefined,
},
},
],
},
},
},
});
const result = getMessage(execution);
expect(result).toBeUndefined();
});
});
describe('getLastNodeExecuted', () => {
it('should return the node that was last executed', () => {
const execution = createMockExecution();
const result = getLastNodeExecuted(execution);
expect(result).toEqual({
name: 'TestNode',
type: 'test-node',
parameters: {},
});
});
it('should return undefined when last executed node is not found', () => {
const execution = createMockExecution({
data: {
resultData: {
lastNodeExecuted: 'NonExistentNode',
runData: {},
},
},
});
const result = getLastNodeExecuted(execution);
expect(result).toBeUndefined();
});
it('should find the correct node among multiple nodes', () => {
const execution = createMockExecution({
data: {
resultData: {
lastNodeExecuted: 'SecondNode',
runData: {},
},
},
workflowData: {
nodes: [
{
name: 'FirstNode',
type: 'first-type',
parameters: {},
},
{
name: 'SecondNode',
type: 'second-type',
parameters: { test: 'value' },
},
],
},
});
const result = getLastNodeExecuted(execution);
expect(result).toEqual({
name: 'SecondNode',
type: 'second-type',
parameters: { test: 'value' },
});
});
it('should return undefined when workflowData.nodes is undefined', () => {
const execution = createMockExecution({
workflowData: undefined,
});
const result = getLastNodeExecuted(execution);
expect(result).toBeUndefined();
});
});
describe('shouldResumeImmediately', () => {
it('should return true for RESPOND_TO_WEBHOOK_NODE_TYPE', () => {
const node = createMockNode({
type: RESPOND_TO_WEBHOOK_NODE_TYPE,
});
const result = shouldResumeImmediately(node);
expect(result).toBe(true);
});
it('should return true when CHAT_WAIT_USER_REPLY is false', () => {
const node = createMockNode({
parameters: {
options: {
[CHAT_WAIT_USER_REPLY]: false,
},
},
});
const result = shouldResumeImmediately(node);
expect(result).toBe(true);
});
it('should return false when CHAT_WAIT_USER_REPLY is true', () => {
const node = createMockNode({
parameters: {
options: {
[CHAT_WAIT_USER_REPLY]: true,
},
},
});
const result = shouldResumeImmediately(node);
expect(result).toBe(false);
});
it('should return false when CHAT_WAIT_USER_REPLY is undefined', () => {
const node = createMockNode({
parameters: {
options: {},
},
});
const result = shouldResumeImmediately(node);
expect(result).toBe(false);
});
it('should return false when no options exist', () => {
const node = createMockNode({
parameters: {},
});
const result = shouldResumeImmediately(node);
expect(result).toBe(false);
});
it('should return false when no parameters exist', () => {
const node = createMockNode({
parameters: undefined,
});
const result = shouldResumeImmediately(node);
expect(result).toBe(false);
});
it('should handle null node', () => {
const result = shouldResumeImmediately(null as any);
expect(result).toBe(false);
});
it('should handle undefined node', () => {
const result = shouldResumeImmediately(undefined as any);
expect(result).toBe(false);
});
it('should return true when CHAT_WAIT_USER_REPLY is false directly in parameters', () => {
const node = createMockNode({
parameters: {
[CHAT_WAIT_USER_REPLY]: false,
},
});
const result = shouldResumeImmediately(node);
expect(result).toBe(true);
});
it('should return false when CHAT_WAIT_USER_REPLY is true directly in parameters', () => {
const node = createMockNode({
parameters: {
[CHAT_WAIT_USER_REPLY]: true,
},
});
const result = shouldResumeImmediately(node);
expect(result).toBe(false);
});
});

View File

@@ -0,0 +1,156 @@
import { ExecutionRepository } from '@n8n/db';
import type { IExecutionResponse, Project } from '@n8n/db';
import { Service } from '@n8n/di';
import { ExecuteContext } from 'n8n-core';
import type {
IBinaryKeyData,
INodeExecutionData,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import { Workflow, BINARY_ENCODING } from 'n8n-workflow';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { WorkflowRunner } from '@/workflow-runner';
import type { ChatMessage } from './chat-service.types';
import { NodeTypes } from '../node-types';
import { OwnershipService } from '../services/ownership.service';
@Service()
export class ChatExecutionManager {
constructor(
private readonly executionRepository: ExecutionRepository,
private readonly workflowRunner: WorkflowRunner,
private readonly ownershipService: OwnershipService,
private readonly nodeTypes: NodeTypes,
) {}
async runWorkflow(execution: IExecutionResponse, message: ChatMessage) {
await this.workflowRunner.run(
await this.getRunData(execution, message),
true,
true,
execution.id,
);
}
async cancelExecution(executionId: string) {
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!execution) return;
if (['running', 'waiting', 'unknown'].includes(execution.status)) {
await this.executionRepository.update({ id: executionId }, { status: 'canceled' });
}
}
async findExecution(executionId: string) {
return await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
}
async checkIfExecutionExists(executionId: string) {
return await this.executionRepository.findSingleExecution(executionId);
}
private getWorkflow(execution: IExecutionResponse) {
const { workflowData } = execution;
return new Workflow({
id: workflowData.id,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
}
private async mapFilesToBinaryData(context: ExecuteContext, files: ChatMessage['files']) {
if (!files) return;
const binary: IBinaryKeyData = {};
for (const [index, file] of files.entries()) {
const base64 = file.data;
const buffer = Buffer.from(base64, BINARY_ENCODING);
const binaryData = await context.helpers.prepareBinaryData(buffer, file.name, file.type);
binary[`data_${index}`] = binaryData;
}
return binary;
}
private async runNode(execution: IExecutionResponse, message: ChatMessage) {
const workflow = this.getWorkflow(execution);
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted as string;
const node = workflow.getNode(lastNodeExecuted);
const additionalData = await WorkflowExecuteAdditionalData.getBase();
const executionData = execution.data.executionData?.nodeExecutionStack[0];
if (!node || !executionData) return null;
const inputData = executionData.data;
const connectionInputData = executionData.data.main[0];
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const context = new ExecuteContext(
workflow,
node,
additionalData,
'manual',
execution.data,
0,
connectionInputData ?? [],
inputData,
executionData,
[],
);
const { sessionId, action, chatInput, files } = message;
const binary = await this.mapFilesToBinaryData(context, files);
const nodeExecutionData: INodeExecutionData = { json: { sessionId, action, chatInput } };
if (binary && Object.keys(binary).length > 0) {
nodeExecutionData.binary = binary;
}
if (nodeType.onMessage) {
return await nodeType.onMessage(context, nodeExecutionData);
}
return [[nodeExecutionData]];
}
private async getRunData(execution: IExecutionResponse, message: ChatMessage) {
const { workflowData, mode: executionMode, data: runExecutionData } = execution;
runExecutionData.executionData!.nodeExecutionStack[0].data.main = (await this.runNode(
execution,
message,
)) ?? [[{ json: message }]];
let project: Project | undefined = undefined;
try {
project = await this.ownershipService.getWorkflowProjectCached(workflowData.id);
} catch (error) {
throw new NotFoundError('Cannot find workflow');
}
const runData: IWorkflowExecutionDataProcess = {
executionMode,
executionData: runExecutionData,
pushRef: runExecutionData.pushRef,
workflowData,
pinData: runExecutionData.resultData.pinData,
projectId: project?.id,
};
return runData;
}
}

View File

@@ -0,0 +1,54 @@
import { Service } from '@n8n/di';
import { OnShutdown } from '@n8n/decorators';
import type { Application } from 'express';
import type { Server as HttpServer } from 'http';
import { ServerResponse } from 'http';
import { parse as parseUrl } from 'url';
import type { WebSocket } from 'ws';
import { Server as WebSocketServer } from 'ws';
import { ChatService } from './chat-service';
import type { ChatRequest } from './chat-service.types';
interface ExpressApplication extends Application {
handle: (req: any, res: any) => void;
}
@Service()
export class ChatServer {
private readonly wsServer = new WebSocketServer({ noServer: true });
constructor(private readonly chatService: ChatService) {}
setup(server: HttpServer, app: Application) {
server.on('upgrade', (req: ChatRequest, socket, head) => {
const parsedUrl = parseUrl(req.url ?? '');
if (parsedUrl.pathname?.startsWith('/chat')) {
this.wsServer.handleUpgrade(req, socket, head, (ws) => {
this.attachToApp(req, ws, app as ExpressApplication);
});
}
});
app.use('/chat', async (req: ChatRequest) => {
await this.chatService.startSession(req);
});
}
private attachToApp(req: ChatRequest, ws: WebSocket, app: ExpressApplication) {
req.ws = ws;
const res = new ServerResponse(req);
res.writeHead = (statusCode) => {
if (statusCode > 200) ws.close();
return res;
};
app.handle(req, res);
}
@OnShutdown()
shutdown() {
this.wsServer.close();
}
}

View File

@@ -0,0 +1,339 @@
import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import { OnShutdown } from '@n8n/decorators';
import { jsonParse, UnexpectedError, ensureError } from 'n8n-workflow';
import { type RawData, WebSocket } from 'ws';
import { z } from 'zod';
import { ChatExecutionManager } from './chat-execution-manager';
import {
chatMessageSchema,
type ChatMessage,
type ChatRequest,
Session,
} from './chat-service.types';
import { getLastNodeExecuted, getMessage, shouldResumeImmediately } from './utils';
import { ErrorReporter } from 'n8n-core';
import { IExecutionResponse } from '@n8n/db';
const CHECK_FOR_RESPONSE_INTERVAL = 3000;
const DRAIN_TIMEOUT = 50;
const HEARTBEAT_INTERVAL = 30 * 1000;
const HEARTBEAT_TIMEOUT = 60 * 1000;
/**
* let frontend know that no user input is expected
*/
const N8N_CONTINUE = 'n8n|continue';
/**
* send message for heartbeat check
*/
const N8N_HEARTBEAT = 'n8n|heartbeat';
/**
* frontend did acknowledge the heartbeat
*/
const N8N_HEARTBEAT_ACK = 'n8n|heartbeat-ack';
function closeConnection(ws: WebSocket) {
if (ws.readyState !== WebSocket.OPEN) return;
ws.once('drain', () => {
ws.close();
});
setTimeout(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.close();
}
}, DRAIN_TIMEOUT);
}
@Service()
export class ChatService {
private readonly sessions = new Map<string, Session>();
private heartbeatIntervalId: NodeJS.Timeout;
constructor(
private readonly executionManager: ChatExecutionManager,
private readonly logger: Logger,
private readonly errorReporter: ErrorReporter,
) {
this.heartbeatIntervalId = setInterval(
async () => await this.checkHeartbeats(),
HEARTBEAT_INTERVAL,
);
}
async startSession(req: ChatRequest) {
const {
ws,
query: { sessionId, executionId, isPublic },
} = req;
if (!ws) {
throw new UnexpectedError('WebSocket connection is missing');
}
if (!sessionId || !executionId) {
ws.close(1008);
return;
}
const execution = await this.executionManager.checkIfExecutionExists(executionId);
if (!execution) {
ws.send(`Execution with id "${executionId}" does not exist`);
ws.close(1008);
return;
}
ws.isAlive = true;
const key = `${sessionId}|${executionId}|${isPublic ? 'public' : 'integrated'}`;
const existingSession = this.sessions.get(key);
if (existingSession) {
this.cleanupSession(existingSession, key);
}
const onMessage = this.incomingMessageHandler(key);
const respondToChat = this.pollAndProcessChatResponses(key);
const intervalId = setInterval(async () => await respondToChat(), CHECK_FOR_RESPONSE_INTERVAL);
ws.once('close', () => {
ws.off('message', onMessage);
clearInterval(intervalId);
this.sessions.delete(key);
});
ws.on('message', onMessage);
const session: Session = {
connection: ws,
executionId,
sessionId,
intervalId,
isPublic: isPublic ?? false,
isProcessing: false,
lastHeartbeat: Date.now(),
};
this.sessions.set(key, session);
ws.send(N8N_HEARTBEAT);
}
private async processWaitingExecution(
execution: IExecutionResponse,
session: Session,
sessionKey: string,
) {
const message = getMessage(execution);
if (message === undefined) return;
session.connection.send(message);
const lastNode = getLastNodeExecuted(execution);
if (lastNode && shouldResumeImmediately(lastNode)) {
session.connection.send(N8N_CONTINUE);
const data: ChatMessage = {
action: 'sendMessage',
chatInput: '',
sessionId: session.sessionId,
};
await this.resumeExecution(session.executionId, data, sessionKey);
session.nodeWaitingForChatResponse = undefined;
} else {
session.nodeWaitingForChatResponse = lastNode?.name;
}
}
private processSuccessExecution(session: Session) {
closeConnection(session.connection);
return;
}
private waitForChatResponseOrContinue(execution: IExecutionResponse, session: Session) {
const lastNode = getLastNodeExecuted(execution);
if (execution.status === 'waiting' && lastNode?.name !== session.nodeWaitingForChatResponse) {
session.connection.send(N8N_CONTINUE);
session.nodeWaitingForChatResponse = undefined;
}
}
private pollAndProcessChatResponses(sessionKey: string) {
return async () => {
const session = this.sessions.get(sessionKey);
if (!session) return;
if (session.isProcessing) return;
try {
session.isProcessing = true;
if (!session.executionId || !session.connection) return;
const execution = await this.getExecutionOrCleanupSession(session.executionId, sessionKey);
if (!execution) return;
if (session.nodeWaitingForChatResponse) {
this.waitForChatResponseOrContinue(execution, session);
return;
}
if (execution.status === 'waiting') {
await this.processWaitingExecution(execution, session, sessionKey);
return;
}
if (execution.status === 'success') {
this.processSuccessExecution(session);
return;
}
} catch (e) {
const error = ensureError(e);
this.errorReporter.error(error);
this.logger.error(
`Error sending message to chat in session ${sessionKey}: ${error.message}`,
);
} finally {
// get only active sessions, as it could have been deleted, and set isProcessing to false
const activeSession = this.sessions.get(sessionKey);
if (activeSession) {
activeSession.isProcessing = false;
}
}
};
}
private incomingMessageHandler(sessionKey: string) {
return async (data: RawData) => {
try {
const session = this.sessions.get(sessionKey);
if (!session) return;
const message = this.stringifyRawData(data);
if (message === N8N_HEARTBEAT_ACK) {
session.lastHeartbeat = Date.now();
return;
}
const executionId = session.executionId;
await this.resumeExecution(executionId, this.parseChatMessage(message), sessionKey);
session.nodeWaitingForChatResponse = undefined;
} catch (e) {
const error = ensureError(e);
this.errorReporter.error(error);
this.logger.error(
`Error processing message from chat in session ${sessionKey}: ${error.message}`,
);
}
};
}
private async resumeExecution(executionId: string, message: ChatMessage, sessionKey: string) {
const execution = await this.getExecutionOrCleanupSession(executionId, sessionKey);
if (!execution || execution.status !== 'waiting') return;
await this.executionManager.runWorkflow(execution, message);
}
private async getExecutionOrCleanupSession(executionId: string, sessionKey: string) {
const execution = await this.executionManager.findExecution(executionId);
if (!execution || ['error', 'canceled', 'crashed'].includes(execution.status)) {
const session = this.sessions.get(sessionKey);
if (!session) return null;
this.cleanupSession(session, sessionKey);
return null;
}
if (execution.status === 'running') return null;
return execution;
}
private stringifyRawData(data: RawData) {
const buffer = Array.isArray(data)
? Buffer.concat(data.map((chunk) => Buffer.from(chunk)))
: Buffer.from(data);
return buffer.toString('utf8');
}
private cleanupSession(session: Session, sessionKey: string) {
session.connection.terminate();
clearInterval(session.intervalId);
if (sessionKey) this.sessions.delete(sessionKey);
}
private parseChatMessage(message: string): ChatMessage {
try {
const parsedMessage = chatMessageSchema.parse(jsonParse(message));
if (parsedMessage.files) {
parsedMessage.files = parsedMessage.files.map((file) => ({
...file,
data: file.data.includes('base64,') ? file.data.split('base64,')[1] : file.data,
}));
}
return parsedMessage;
} catch (error) {
if (error instanceof z.ZodError) {
throw new UnexpectedError(
`Chat message validation error: ${error.errors.map((error) => error.message).join(', ')}`,
);
}
throw error;
}
}
private async checkHeartbeats() {
try {
const now = Date.now();
for (const [key, session] of this.sessions.entries()) {
const timeSinceLastHeartbeat = now - (session.lastHeartbeat ?? 0);
if (timeSinceLastHeartbeat > HEARTBEAT_TIMEOUT) {
await this.executionManager.cancelExecution(session.executionId);
this.cleanupSession(session, key);
} else {
try {
session.connection.send(N8N_HEARTBEAT);
} catch (e) {
this.cleanupSession(session, key);
const error = ensureError(e);
this.errorReporter.error(error);
this.logger.error(`Error sending heartbeat to session ${key}: ${error.message}`);
}
}
}
} catch (e) {
const error = ensureError(e);
this.errorReporter.error(error);
this.logger.error(`Error checking heartbeats: ${error.message}`);
}
}
@OnShutdown()
shutdown() {
for (const [key, session] of this.sessions.entries()) {
this.cleanupSession(session, key);
}
this.sessions.clear();
clearInterval(this.heartbeatIntervalId);
}
}

View File

@@ -0,0 +1,41 @@
import type { IncomingMessage } from 'http';
import type { WebSocket } from 'ws';
import { z } from 'zod';
export interface ChatRequest extends IncomingMessage {
url: string;
query: {
sessionId: string;
executionId: string;
isPublic?: boolean;
};
ws: WebSocket;
}
export type Session = {
connection: WebSocket;
executionId: string;
sessionId: string;
intervalId: NodeJS.Timeout;
nodeWaitingForChatResponse?: string;
isPublic: boolean;
isProcessing: boolean;
lastHeartbeat?: number;
};
export const chatMessageSchema = z.object({
sessionId: z.string(),
action: z.literal('sendMessage'),
chatInput: z.string(),
files: z
.array(
z.object({
name: z.string(),
type: z.string(),
data: z.string(),
}),
)
.optional(),
});
export type ChatMessage = z.infer<typeof chatMessageSchema>;

View File

@@ -0,0 +1,48 @@
import type { IExecutionResponse } from '@n8n/db';
import type { INode } from 'n8n-workflow';
import { CHAT_WAIT_USER_REPLY, RESPOND_TO_WEBHOOK_NODE_TYPE } from 'n8n-workflow';
/**
* Returns the message to be sent of the last executed node
*/
export function getMessage(execution: IExecutionResponse) {
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted;
if (typeof lastNodeExecuted !== 'string') return undefined;
const runIndex = execution.data.resultData.runData[lastNodeExecuted].length - 1;
const nodeExecutionData =
execution.data.resultData.runData[lastNodeExecuted][runIndex]?.data?.main?.[0];
return nodeExecutionData?.[0] ? nodeExecutionData[0].sendMessage : undefined;
}
/**
* Returns the last node executed
*/
export function getLastNodeExecuted(execution: IExecutionResponse) {
const lastNodeExecuted = execution.data.resultData.lastNodeExecuted;
if (typeof lastNodeExecuted !== 'string') return undefined;
return execution.workflowData?.nodes?.find((node) => node.name === lastNodeExecuted);
}
/**
* Check if execution should be resumed immediately after receivng a message
*/
export function shouldResumeImmediately(lastNode: INode) {
if (lastNode?.type === RESPOND_TO_WEBHOOK_NODE_TYPE) {
return true;
}
if (lastNode?.parameters?.[CHAT_WAIT_USER_REPLY] === false) {
return true;
}
const options = lastNode?.parameters?.options as {
[CHAT_WAIT_USER_REPLY]?: boolean;
};
if (options && options[CHAT_WAIT_USER_REPLY] === false) {
return true;
}
return false;
}

View File

@@ -62,6 +62,9 @@ import '@/evaluation.ee/test-runs.controller.ee';
import '@/workflows/workflow-history.ee/workflow-history.controller.ee';
import '@/workflows/workflows.controller';
import '@/webhooks/webhooks.controller';
import { ChatServer } from './chat/chat-server';
import { MfaService } from './mfa/mfa.service';
@Service()
@@ -478,5 +481,6 @@ export class Server extends AbstractServer {
protected setupPushServer(): void {
const { restEndpoint, server, app } = this;
Container.get(Push).setupPushServer(restEndpoint, server, app);
Container.get(ChatServer).setup(server, app);
}
}

View File

@@ -14,7 +14,12 @@ import type {
IRunExecutionData,
IExecuteData,
} from 'n8n-workflow';
import { createDeferredPromise, FORM_NODE_TYPE, WAIT_NODE_TYPE } from 'n8n-workflow';
import {
createDeferredPromise,
FORM_NODE_TYPE,
WAIT_NODE_TYPE,
CHAT_TRIGGER_NODE_TYPE,
} from 'n8n-workflow';
import type { Readable } from 'stream';
import { finished } from 'stream/promises';
@@ -23,6 +28,7 @@ import {
handleFormRedirectionCase,
setupResponseNodePromise,
prepareExecutionData,
handleHostedChatResponse,
} from '../webhook-helpers';
import type { IWebhookResponseCallbackData } from '../webhook.types';
@@ -38,6 +44,15 @@ describe('autoDetectResponseMode', () => {
workflow.nodes = {};
});
test('should return hostedChat when start node is CHAT_TRIGGER_NODE_TYPE, method is POST, and public is true', () => {
const workflowStartNode = mock<INode>({
type: CHAT_TRIGGER_NODE_TYPE,
parameters: { options: { responseMode: 'responseNodes' } },
});
const result = autoDetectResponseMode(workflowStartNode, workflow, 'POST');
expect(result).toBe('hostedChat');
});
test('should return undefined if start node is WAIT_NODE_TYPE with resume not equal to form', () => {
const workflowStartNode = mock<INode>({
type: WAIT_NODE_TYPE,
@@ -259,6 +274,61 @@ describe('setupResponseNodePromise', () => {
});
});
describe('handleHostedChatResponse', () => {
it('should send executionStarted: true and executionId when responseMode is hostedChat and didSendResponse is false', async () => {
const res = {
send: jest.fn(),
end: jest.fn(),
} as unknown as express.Response;
const executionId = 'testExecutionId';
let didSendResponse = false;
const responseMode = 'hostedChat';
(res.send as jest.Mock).mockImplementation((data) => {
expect(data).toEqual({ executionStarted: true, executionId });
});
const result = handleHostedChatResponse(res, responseMode, didSendResponse, executionId);
expect(res.send).toHaveBeenCalled();
await new Promise((resolve) => setTimeout(resolve, 0));
expect(res.end).toHaveBeenCalled();
expect(result).toBe(true);
});
it('should not send response when responseMode is not hostedChat', () => {
const res = {
send: jest.fn(),
end: jest.fn(),
} as unknown as express.Response;
const executionId = 'testExecutionId';
let didSendResponse = false;
const responseMode = 'responseNode';
const result = handleHostedChatResponse(res, responseMode, didSendResponse, executionId);
expect(res.send).not.toHaveBeenCalled();
expect(res.end).not.toHaveBeenCalled();
expect(result).toBe(false);
});
it('should not send response when didSendResponse is true', () => {
const res = {
send: jest.fn(),
end: jest.fn(),
} as unknown as express.Response;
const executionId = 'testExecutionId';
let didSendResponse = true;
const responseMode = 'hostedChat';
const result = handleHostedChatResponse(res, responseMode, didSendResponse, executionId);
expect(res.send).not.toHaveBeenCalled();
expect(res.end).not.toHaveBeenCalled();
expect(result).toBe(true);
});
});
describe('prepareExecutionData', () => {
const workflowStartNode = mock<INode>({ name: 'Start' });
const webhookResultData: IWebhookResponseData = {

View File

@@ -32,6 +32,7 @@ import type {
WebhookResponseData,
} from 'n8n-workflow';
import {
CHAT_TRIGGER_NODE_TYPE,
createDeferredPromise,
ExecutionCancelledError,
FORM_NODE_TYPE,
@@ -70,6 +71,21 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da
import * as WorkflowHelpers from '@/workflow-helpers';
import { WorkflowRunner } from '@/workflow-runner';
export function handleHostedChatResponse(
res: express.Response,
responseMode: WebhookResponseMode,
didSendResponse: boolean,
executionId: string,
): boolean {
if (responseMode === 'hostedChat' && !didSendResponse) {
res.send({ executionStarted: true, executionId });
process.nextTick(() => res.end());
return true;
}
return didSendResponse;
}
/**
* Returns all the webhooks which should be created for the given workflow
*/
@@ -111,6 +127,23 @@ export function getWorkflowWebhooks(
return returnData;
}
const getChatResponseMode = (workflowStartNode: INode, method: string) => {
const parameters = workflowStartNode.parameters as {
public: boolean;
options?: { responseMode: string };
};
if (workflowStartNode.type !== CHAT_TRIGGER_NODE_TYPE) return undefined;
if (method === 'GET') return 'onReceived';
if (method === 'POST' && parameters.options?.responseMode === 'responseNodes') {
return 'hostedChat';
}
return undefined;
};
// eslint-disable-next-line complexity
export function autoDetectResponseMode(
workflowStartNode: INode,
@@ -133,6 +166,9 @@ export function autoDetectResponseMode(
}
}
const chatResponseMode = getChatResponseMode(workflowStartNode, method);
if (chatResponseMode) return chatResponseMode;
// If there are form nodes connected to a current form node we're dealing with a multipage form
// and we need to return the formPage response mode when a second page of the form gets submitted
// to be able to show potential form errors correctly.
@@ -375,7 +411,11 @@ export async function executeWebhook(
additionalKeys,
);
if (!['onReceived', 'lastNode', 'responseNode', 'formPage', 'streaming'].includes(responseMode)) {
if (
!['onReceived', 'lastNode', 'responseNode', 'formPage', 'streaming', 'hostedChat'].includes(
responseMode,
)
) {
// If the mode is not known we error. Is probably best like that instead of using
// the default that people know as early as possible (probably already testing phase)
// that something does not resolve properly.
@@ -600,6 +640,8 @@ export async function executeWebhook(
didSendResponse = true;
}
didSendResponse = handleHostedChatResponse(res, responseMode, didSendResponse, executionId);
Container.get(Logger).debug(
`Started execution of workflow "${workflow.name}" from webhook with execution ID ${executionId}`,
{ executionId },

View File

@@ -10,7 +10,7 @@ import type {
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow';
import { CHAT_TRIGGER_NODE_TYPE, NodeConnectionTypes } from 'n8n-workflow';
import { InstanceSettings } from '@/instance-settings';
@@ -115,6 +115,31 @@ describe('NodeExecutionContext', () => {
});
});
describe('getChatTrigger', () => {
it('should return a chat trigger node if it exists in the workflow', () => {
const chatNode = mock<INode>({ name: 'Chat', type: CHAT_TRIGGER_NODE_TYPE });
workflow.nodes = {
Chat: chatNode,
};
const result = testContext.getChatTrigger();
expect(result).toEqual(chatNode);
});
it('should return a null if there is no chat trigger node in the workflow', () => {
const someNode = mock<INode>({ name: 'Some Node', type: 'someType' });
workflow.nodes = {
'Some Node': someNode,
};
const result = testContext.getChatTrigger();
expect(result).toBeNull();
});
});
describe('getKnownNodeTypes', () => {
it('should call getKnownTypes method of nodeTypes', () => {
testContext.getKnownNodeTypes();

View File

@@ -25,6 +25,7 @@ import type {
} from 'n8n-workflow';
import {
ApplicationError,
CHAT_TRIGGER_NODE_TYPE,
deepCopy,
ExpressionError,
NodeHelpers,
@@ -106,22 +107,43 @@ export abstract class NodeExecutionContext implements Omit<FunctionsBase, 'getCr
return output;
}
getParentNodes(nodeName: string) {
getParentNodes(nodeName: string, options?: { includeNodeParameters?: boolean }) {
const output: NodeTypeAndVersion[] = [];
const nodeNames = this.workflow.getParentNodes(nodeName);
for (const n of nodeNames) {
const node = this.workflow.nodes[n];
output.push({
const entry: NodeTypeAndVersion = {
name: node.name,
type: node.type,
typeVersion: node.typeVersion,
disabled: node.disabled ?? false,
});
};
if (options?.includeNodeParameters) {
entry.parameters = node.parameters;
}
output.push(entry);
}
return output;
}
/**
* Gets the chat trigger node
*
* this is needed for sub-nodes where the parent nodes are not available
*/
getChatTrigger() {
for (const node of Object.values(this.workflow.nodes)) {
if (this.workflow.nodes[node.name].type === CHAT_TRIGGER_NODE_TYPE) {
return this.workflow.nodes[node.name];
}
}
return null;
}
@Memoized
get nodeType() {
const { type, typeVersion } = this.node;

View File

@@ -4,7 +4,7 @@ import { onMounted } from 'vue';
import { createChat } from '@n8n/chat/index';
import type { ChatOptions } from '@n8n/chat/types';
const webhookUrl = 'http://localhost:5678/webhook/f406671e-c954-4691-b39a-66c90aa2f103/chat';
const webhookUrl = 'http://localhost:5678/webhook/ad712f8b-3546-4d08-b049-e0d035334a4c/chat';
const meta = {
title: 'Chat',

View File

@@ -0,0 +1,157 @@
import type { VueWrapper } from '@vue/test-utils';
import { mount } from '@vue/test-utils';
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import Input from '../components/Input.vue';
vi.mock('@vueuse/core', () => ({
useFileDialog: vi.fn(() => ({
open: vi.fn(),
reset: vi.fn(),
onChange: vi.fn(),
})),
}));
vi.mock('uuid', () => ({
v4: vi.fn(() => 'mock-uuid-123'),
}));
vi.mock('virtual:icons/mdi/paperclip', () => ({
default: { name: 'IconPaperclip' },
}));
vi.mock('virtual:icons/mdi/send', () => ({
default: { name: 'IconSend' },
}));
vi.mock('@n8n/chat/composables', () => ({
useI18n: () => ({
t: (key: string) => key,
}),
useChat: () => ({
waitingForResponse: { value: false },
currentSessionId: { value: 'session-123' },
messages: { value: [] },
sendMessage: vi.fn(),
ws: null,
}),
useOptions: () => ({
options: {
disabled: { value: false },
allowFileUploads: { value: true },
allowedFilesMimeTypes: { value: 'image/*,text/*' },
webhookUrl: 'https://example.com/webhook',
},
}),
}));
vi.mock('@n8n/chat/event-buses', () => ({
chatEventBus: {
on: vi.fn(),
off: vi.fn(),
},
}));
vi.mock('./ChatFile.vue', () => ({
default: { name: 'ChatFile' },
}));
describe('ChatInput', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let wrapper: VueWrapper<any>;
beforeEach(() => {
// @ts-expect-error - mock WebSocket
global.WebSocket = vi.fn().mockImplementation(
() =>
({
send: vi.fn(),
close: vi.fn(),
onmessage: null,
onclose: null,
}) as unknown as WebSocket,
);
});
afterEach(() => {
if (wrapper) {
wrapper.unmount();
}
vi.clearAllMocks();
});
it('renders the component with default props', () => {
wrapper = mount(Input);
expect(wrapper.find('textarea').exists()).toBe(true);
expect(wrapper.find('[data-test-id="chat-input"]').exists()).toBe(true);
expect(wrapper.find('.chat-input-send-button').exists()).toBe(true);
});
it('applies custom placeholder', () => {
wrapper = mount(Input, {
props: {
placeholder: 'customPlaceholder',
},
});
const textarea = wrapper.find('textarea');
expect(textarea.attributes('placeholder')).toBe('customPlaceholder');
});
it('updates input value when typing', async () => {
const textarea = wrapper.find('textarea');
await textarea.setValue('Hello world');
expect(wrapper.vm.input).toBe('Hello world');
});
it('does not submit on Shift+Enter', async () => {
const textarea = wrapper.find('textarea');
const onSubmitSpy = vi.spyOn(wrapper.vm, 'onSubmit');
await textarea.setValue('Test message');
await textarea.trigger('keydown.enter', { shiftKey: true });
expect(onSubmitSpy).not.toHaveBeenCalled();
});
it('sets up WebSocket connection with execution ID', () => {
const executionId = 'exec-123';
wrapper.vm.setupWebsocketConnection(executionId);
expect(global.WebSocket).toHaveBeenCalledWith(expect.stringContaining('sessionId=session-123'));
expect(global.WebSocket).toHaveBeenCalledWith(expect.stringContaining('executionId=exec-123'));
});
it('handles WebSocket messages correctly', async () => {
const mockWs = {
send: vi.fn(),
onmessage: null,
onclose: null,
};
wrapper.vm.chatStore.ws = mockWs;
wrapper.vm.waitingForChatResponse = true;
await wrapper.vm.respondToChatNode(mockWs, 'Test message');
expect(mockWs.send).toHaveBeenCalledWith(expect.stringContaining('"chatInput":"Test message"'));
});
it('handles empty file list gracefully', () => {
wrapper.vm.files = null;
expect(() => wrapper.vm.attachFiles()).not.toThrow();
expect(wrapper.vm.attachFiles()).toEqual([]);
});
it('prevents submit when disabled', async () => {
const submitButton = wrapper.find('.chat-input-send-button');
await submitButton.trigger('click');
expect(wrapper.vm.isSubmitting).toBe(false);
});
});

View File

@@ -0,0 +1,66 @@
import { vi, describe, it, expect } from 'vitest';
import { createApp } from 'vue';
import * as api from '@n8n/chat/api';
import { ChatPlugin } from '../../plugins/chat';
vi.mock('@n8n/chat/api');
describe('ChatPlugin', () => {
it('should return sendMessageResponse when executionStarted is true', async () => {
const app = createApp({});
const options = {
webhookUrl: 'test',
i18n: {
en: {
message: 'message',
title: 'title',
subtitle: 'subtitle',
footer: 'footer',
getStarted: 'getStarted',
inputPlaceholder: 'inputPlaceholder',
closeButtonTooltip: 'closeButtonTooltip',
},
},
};
(api.sendMessage as jest.Mock).mockResolvedValue({ executionStarted: true });
app.use(ChatPlugin, options);
const chatStore = app.config.globalProperties.$chat;
const result = await chatStore.sendMessage('test message');
expect(result).toEqual({ executionStarted: true });
});
it('should return null when sendMessageResponse is null', async () => {
const app = createApp({});
const options = {
webhookUrl: 'test',
i18n: {
en: {
message: 'message',
title: 'title',
subtitle: 'subtitle',
footer: 'footer',
getStarted: 'getStarted',
inputPlaceholder: 'inputPlaceholder',
closeButtonTooltip: 'closeButtonTooltip',
},
},
};
(api.sendMessage as jest.Mock).mockResolvedValue({});
app.use(ChatPlugin, options);
const chatStore = app.config.globalProperties.$chat;
const result = await chatStore.sendMessage('test message');
expect(result).toEqual(null);
});
});

View File

@@ -1,10 +1,16 @@
import type { LoadPreviousSessionResponse, SendMessageResponse } from '@n8n/chat/types';
export function createFetchResponse<T>(data: T) {
const jsonData = JSON.stringify(data);
return async () =>
({
json: async () => await new Promise<T>((resolve) => resolve(data)),
}) as Response;
text: async () => jsonData,
clone() {
return this;
},
}) as unknown as Response;
}
export const createGetLatestMessagesResponse = (

View File

@@ -24,7 +24,15 @@ export async function authenticatedFetch<T>(...args: Parameters<typeof fetch>):
headers,
});
return (await response.json()) as T;
let responseData;
try {
responseData = await response.clone().json();
} catch (error) {
responseData = await response.text();
}
return responseData as T;
}
export async function get<T>(url: string, query: object = {}, options: RequestInit = {}) {

View File

@@ -1,13 +1,16 @@
<script setup lang="ts">
import { useFileDialog } from '@vueuse/core';
import { v4 as uuidv4 } from 'uuid';
import IconPaperclip from 'virtual:icons/mdi/paperclip';
import IconSend from 'virtual:icons/mdi/send';
import { computed, onMounted, onUnmounted, ref, unref } from 'vue';
import { useI18n, useChat, useOptions } from '@n8n/chat/composables';
import { chatEventBus } from '@n8n/chat/event-buses';
import { constructChatWebsocketUrl } from '@n8n/chat/utils';
import ChatFile from './ChatFile.vue';
import type { ChatMessage } from '../types';
export interface ChatInputProps {
placeholder?: string;
@@ -36,8 +39,10 @@ const chatTextArea = ref<HTMLTextAreaElement | null>(null);
const input = ref('');
const isSubmitting = ref(false);
const resizeObserver = ref<ResizeObserver | null>(null);
const waitingForChatResponse = ref(false);
const isSubmitDisabled = computed(() => {
if (waitingForChatResponse.value) return false;
return input.value === '' || unref(waitingForResponse) || options.disabled?.value === true;
});
@@ -127,6 +132,110 @@ function setInputValue(value: string) {
focusChatInput();
}
function attachFiles() {
if (files.value) {
const filesToAttach = Array.from(files.value);
resetFileDialog();
files.value = null;
return filesToAttach;
}
return [];
}
function setupWebsocketConnection(executionId: string) {
// if webhookUrl is not defined onSubmit is called from integrated chat
// do not setup websocket as it would be handled by the integrated chat
if (options.webhookUrl && chatStore.currentSessionId.value) {
try {
const wsUrl = constructChatWebsocketUrl(
options.webhookUrl,
executionId,
chatStore.currentSessionId.value,
true,
);
chatStore.ws = new WebSocket(wsUrl);
chatStore.ws.onmessage = (e) => {
if (e.data === 'n8n|heartbeat') {
chatStore.ws?.send('n8n|heartbeat-ack');
return;
}
if (e.data === 'n8n|continue') {
waitingForChatResponse.value = false;
chatStore.waitingForResponse.value = true;
return;
}
const newMessage: ChatMessage = {
id: uuidv4(),
text: e.data,
sender: 'bot',
};
chatStore.messages.value.push(newMessage);
waitingForChatResponse.value = true;
chatStore.waitingForResponse.value = false;
};
chatStore.ws.onclose = () => {
chatStore.ws = null;
waitingForChatResponse.value = false;
chatStore.waitingForResponse.value = false;
};
} catch (error) {
// do not throw error here as it should work with n8n versions that do not support websockets
console.error('Error setting up websocket connection', error);
}
}
}
async function processFiles(data: File[] | undefined) {
if (!data || data.length === 0) return [];
const filePromises = data.map(async (file) => {
// We do not need to await here as it will be awaited on the return by Promise.all
// eslint-disable-next-line @typescript-eslint/return-await
return new Promise<{ name: string; type: string; data: string }>((resolve, reject) => {
const reader = new FileReader();
reader.onload = () =>
resolve({
name: file.name,
type: file.type,
data: reader.result as string,
});
reader.onerror = () =>
reject(new Error(`Error reading file: ${reader.error?.message ?? 'Unknown error'}`));
reader.readAsDataURL(file);
});
});
return await Promise.all(filePromises);
}
async function respondToChatNode(ws: WebSocket, messageText: string) {
const sentMessage: ChatMessage = {
id: uuidv4(),
text: messageText,
sender: 'user',
files: files.value ? attachFiles() : undefined,
};
chatStore.messages.value.push(sentMessage);
ws.send(
JSON.stringify({
sessionId: chatStore.currentSessionId.value,
action: 'sendMessage',
chatInput: messageText,
files: await processFiles(sentMessage.files),
}),
);
chatStore.waitingForResponse.value = true;
waitingForChatResponse.value = false;
}
async function onSubmit(event: MouseEvent | KeyboardEvent) {
event.preventDefault();
@@ -137,10 +246,19 @@ async function onSubmit(event: MouseEvent | KeyboardEvent) {
const messageText = input.value;
input.value = '';
isSubmitting.value = true;
await chatStore.sendMessage(messageText, Array.from(files.value ?? []));
if (chatStore.ws && waitingForChatResponse.value) {
await respondToChatNode(chatStore.ws, messageText);
return;
}
const response = await chatStore.sendMessage(messageText, attachFiles());
if (response?.executionId) {
setupWebsocketConnection(response.executionId);
}
isSubmitting.value = false;
resetFileDialog();
files.value = null;
}
async function onSubmitKeydown(event: KeyboardEvent) {
@@ -225,7 +343,7 @@ function adjustTextAreaHeight() {
</button>
</div>
</div>
<div v-if="files?.length && !isSubmitting" class="chat-files">
<div v-if="files?.length && (!isSubmitting || waitingForChatResponse)" class="chat-files">
<ChatFile
v-for="file in files"
:key="file.name"

View File

@@ -85,7 +85,15 @@ export const ChatPlugin: Plugin<ChatOptions> = {
options,
);
let textMessage = sendMessageResponse.output ?? sendMessageResponse.text ?? '';
if (sendMessageResponse?.executionStarted) {
return sendMessageResponse;
}
let textMessage =
sendMessageResponse.output ??
sendMessageResponse.text ??
sendMessageResponse.message ??
'';
if (textMessage === '' && Object.keys(sendMessageResponse).length > 0) {
try {
@@ -107,13 +115,16 @@ export const ChatPlugin: Plugin<ChatOptions> = {
receivedMessage.value.text = 'Error: Failed to receive response';
}
console.error('Chat API error:', error);
} finally {
waitingForResponse.value = false;
}
waitingForResponse.value = false;
void nextTick(() => {
chatEventBus.emit('scrollToBottom');
});
return null;
}
async function loadPreviousSession() {

View File

@@ -2,6 +2,8 @@ import type { Ref } from 'vue';
import type { ChatMessage } from '@n8n/chat/types/messages';
import type { SendMessageResponse } from './webhook';
export interface Chat {
initialMessages: Ref<ChatMessage[]>;
messages: Ref<ChatMessage[]>;
@@ -9,5 +11,6 @@ export interface Chat {
waitingForResponse: Ref<boolean>;
loadPreviousSession?: () => Promise<string | undefined>;
startNewSession?: () => Promise<void>;
sendMessage: (text: string, files: File[]) => Promise<void>;
sendMessage: (text: string, files: File[]) => Promise<SendMessageResponse | null>;
ws?: WebSocket | null;
}

View File

@@ -15,4 +15,7 @@ export interface LoadPreviousSessionResponse {
export interface SendMessageResponse {
output?: string;
text?: string;
message?: string;
executionId?: string;
executionStarted?: boolean;
}

View File

@@ -1,2 +1,3 @@
export * from './event-bus';
export * from './mount';
export * from './utils';

View File

@@ -0,0 +1,11 @@
export function constructChatWebsocketUrl(
url: string,
executionId: string,
sessionId: string,
isPublic: boolean,
) {
const baseUrl = new URL(url).origin;
const wsProtocol = baseUrl.startsWith('https') ? 'wss' : 'ws';
const wsUrl = baseUrl.replace(/^https?/, wsProtocol);
return `${wsUrl}/chat?sessionId=${sessionId}&executionId=${executionId}${isPublic ? '&isPublic=true' : ''}`;
}

View File

@@ -577,7 +577,6 @@ describe('RunData', () => {
executionTime: 3,
// @ts-expect-error allow missing properties in test
source: [{ previousNode: 'Execute Workflow Trigger' }],
// @ts-expect-error allow missing properties in test
executionStatus: 'error',
// @ts-expect-error allow missing properties in test
error: {

View File

@@ -1043,6 +1043,7 @@ describe('useRunWorkflow({ router })', () => {
workflowsStore.activeWorkflows = ['test-wf-id'];
workflowsStore.setActiveExecutionId('test-exec-id');
workflowsStore.executionWaitingForWebhook = false;
getExecutionSpy.mockResolvedValue(executionData);

View File

@@ -155,6 +155,7 @@ export const MANUAL_TRIGGER_NODE_TYPE = 'n8n-nodes-base.manualTrigger';
export const MANUAL_CHAT_TRIGGER_NODE_TYPE = '@n8n/n8n-nodes-langchain.manualChatTrigger';
export const MCP_TRIGGER_NODE_TYPE = '@n8n/n8n-nodes-langchain.mcpTrigger';
export const CHAT_TRIGGER_NODE_TYPE = '@n8n/n8n-nodes-langchain.chatTrigger';
export const CHAT_NODE_TYPE = '@n8n/n8n-nodes-langchain.chat';
export const AGENT_NODE_TYPE = '@n8n/n8n-nodes-langchain.agent';
export const OPEN_AI_NODE_TYPE = '@n8n/n8n-nodes-langchain.openAi';
export const OPEN_AI_NODE_MESSAGE_ASSISTANT_TYPE =

View File

@@ -0,0 +1,125 @@
import { createTestingPinia } from '@pinia/testing';
import { useChatMessaging } from '../composables/useChatMessaging';
import { ref, computed } from 'vue';
import type { Ref, ComputedRef } from 'vue';
import type { IRunExecutionData } from 'n8n-workflow';
import type { IExecutionPushResponse, INodeUi } from '@/Interface';
import type { RunWorkflowChatPayload } from '../composables/useChatMessaging';
import { vi } from 'vitest';
import type { ChatMessage } from '@n8n/chat/types';
vi.mock('../logs.utils', () => {
return {
extractBotResponse: vi.fn(() => 'Last node response'),
getInputKey: vi.fn(),
processFiles: vi.fn(),
};
});
describe('useChatMessaging', () => {
let chatMessaging: ReturnType<typeof useChatMessaging>;
let chatTrigger: Ref<INodeUi | null>;
let messages: Ref<ChatMessage[]>;
let sessionId: Ref<string>;
let executionResultData: ComputedRef<IRunExecutionData['resultData'] | undefined>;
let onRunChatWorkflow: (
payload: RunWorkflowChatPayload,
) => Promise<IExecutionPushResponse | undefined>;
let ws: Ref<WebSocket | null>;
let executionData: IRunExecutionData['resultData'] | undefined = undefined;
beforeEach(() => {
executionData = undefined;
createTestingPinia();
chatTrigger = ref(null);
messages = ref([]);
sessionId = ref('session-id');
executionResultData = computed(() => executionData);
onRunChatWorkflow = vi.fn().mockResolvedValue({
executionId: 'execution-id',
} as IExecutionPushResponse);
ws = ref(null);
chatMessaging = useChatMessaging({
chatTrigger,
messages,
sessionId,
executionResultData,
onRunChatWorkflow,
ws,
});
});
it('should initialize correctly', () => {
expect(chatMessaging).toBeDefined();
expect(chatMessaging.previousMessageIndex.value).toBe(0);
expect(chatMessaging.isLoading.value).toBe(false);
});
it('should send a message and add it to messages', async () => {
const messageText = 'Hello, world!';
await chatMessaging.sendMessage(messageText);
expect(messages.value).toHaveLength(1);
});
it('should send message via WebSocket if open', async () => {
const messageText = 'Hello, WebSocket!';
ws.value = {
readyState: WebSocket.OPEN,
send: vi.fn(),
} as unknown as WebSocket;
await chatMessaging.sendMessage(messageText);
expect(ws.value.send).toHaveBeenCalledWith(
JSON.stringify({
sessionId: sessionId.value,
action: 'sendMessage',
chatInput: messageText,
}),
);
});
it('should startWorkflowWithMessage and add message to messages with final message', async () => {
const messageText = 'Hola!';
chatTrigger.value = {
id: 'trigger-id',
name: 'Trigger',
typeVersion: 1.1,
parameters: { options: {} },
} as unknown as INodeUi;
(onRunChatWorkflow as jest.Mock).mockResolvedValue({
executionId: 'execution-id',
} as IExecutionPushResponse);
executionData = {
runData: {},
} as unknown as IRunExecutionData['resultData'];
await chatMessaging.sendMessage(messageText);
expect(messages.value).toHaveLength(2);
});
it('should startWorkflowWithMessage and not add final message if responseMode is responseNode and version is 1.3', async () => {
const messageText = 'Hola!';
chatTrigger.value = {
id: 'trigger-id',
name: 'Trigger',
typeVersion: 1.3,
parameters: { options: { responseMode: 'responseNodes' } },
} as unknown as INodeUi;
(onRunChatWorkflow as jest.Mock).mockResolvedValue({
executionId: 'execution-id',
} as IExecutionPushResponse);
executionData = {
runData: {},
} as unknown as IRunExecutionData['resultData'];
await chatMessaging.sendMessage(messageText);
expect(messages.value).toHaveLength(1);
});
});

View File

@@ -658,6 +658,7 @@ describe('LogsPanel', () => {
sendMessage: vi.fn(),
previousMessageIndex: ref(0),
isLoading: computed(() => false),
setLoadingState: vi.fn(),
};
});
});
@@ -693,6 +694,7 @@ describe('LogsPanel', () => {
sendMessage: vi.fn(),
previousMessageIndex: ref(0),
isLoading: computed(() => false),
setLoadingState: vi.fn(),
});
logsStore.state = LOGS_PANEL_STATE.ATTACHED;
@@ -800,6 +802,7 @@ describe('LogsPanel', () => {
sendMessage: sendMessageSpy,
previousMessageIndex: ref(0),
isLoading: computed(() => false),
setLoadingState: vi.fn(),
};
});
});

View File

@@ -17,7 +17,8 @@ import { usePinnedData } from '@/composables/usePinnedData';
import { MODAL_CONFIRM } from '@/constants';
import { useI18n } from '@n8n/i18n';
import type { IExecutionPushResponse, INodeUi } from '@/Interface';
import { extractBotResponse, getInputKey } from '@/features/logs/logs.utils';
import { extractBotResponse, getInputKey, processFiles } from '@/features/logs/logs.utils';
export type RunWorkflowChatPayload = {
triggerNode: string;
@@ -33,6 +34,7 @@ export interface ChatMessagingDependencies {
onRunChatWorkflow: (
payload: RunWorkflowChatPayload,
) => Promise<IExecutionPushResponse | undefined>;
ws: Ref<WebSocket | null>;
}
export function useChatMessaging({
@@ -41,12 +43,17 @@ export function useChatMessaging({
sessionId,
executionResultData,
onRunChatWorkflow,
ws,
}: ChatMessagingDependencies) {
const locale = useI18n();
const { showError } = useToast();
const previousMessageIndex = ref(0);
const isLoading = ref(false);
const setLoadingState = (loading: boolean) => {
isLoading.value = loading;
};
/** Converts a file to binary data */
async function convertFileToBinaryData(file: File): Promise<IBinaryData> {
const reader = new FileReader();
@@ -140,10 +147,16 @@ export function useChatMessaging({
message,
});
isLoading.value = false;
ws.value = null;
if (!response?.executionId) {
return;
}
// Response Node mode should not return last node result if responseMode is "responseNodes"
const responseMode = (triggerNode.parameters.options as { responseMode?: string })
?.responseMode;
if (responseMode === 'responseNodes') return;
const chatMessage = executionResultData.value
? extractBotResponse(
executionResultData.value,
@@ -193,12 +206,25 @@ export function useChatMessaging({
};
messages.value.push(newMessage);
await startWorkflowWithMessage(newMessage.text, files);
if (ws.value?.readyState === WebSocket.OPEN && !isLoading.value) {
ws.value.send(
JSON.stringify({
sessionId: sessionId.value,
action: 'sendMessage',
chatInput: message,
files: await processFiles(files),
}),
);
isLoading.value = true;
} else {
await startWorkflowWithMessage(newMessage.text, files);
}
}
return {
previousMessageIndex,
isLoading: computed(() => isLoading.value),
setLoadingState,
sendMessage,
};
}

View File

@@ -5,17 +5,25 @@ import { useNodeHelpers } from '@/composables/useNodeHelpers';
import { useRunWorkflow } from '@/composables/useRunWorkflow';
import { VIEWS } from '@/constants';
import { useWorkflowsStore } from '@/stores/workflows.store';
import { ChatOptionsSymbol, ChatSymbol } from '@n8n/chat/constants';
import { useRootStore } from '@n8n/stores/useRootStore';
import { ChatOptionsSymbol } from '@n8n/chat/constants';
import { chatEventBus } from '@n8n/chat/event-buses';
import type { Chat, ChatMessage, ChatOptions } from '@n8n/chat/types';
import { v4 as uuid } from 'uuid';
import type { Ref } from 'vue';
import type { InjectionKey, Ref } from 'vue';
import { computed, provide, ref, watch } from 'vue';
import { useRouter } from 'vue-router';
import { useLogsStore } from '@/stores/logs.store';
import { restoreChatHistory } from '@/features/logs/logs.utils';
import type { INodeParameters } from 'n8n-workflow';
import { isChatNode } from '@/utils/aiUtils';
import { constructChatWebsocketUrl } from '@n8n/chat/utils';
type IntegratedChat = Omit<Chat, 'sendMessage'> & {
sendMessage: (text: string, files: File[]) => Promise<void>;
};
const ChatSymbol = 'Chat' as unknown as InjectionKey<IntegratedChat>;
interface ChatState {
currentSessionId: Ref<string>;
@@ -29,11 +37,13 @@ interface ChatState {
export function useChatState(isReadOnly: boolean): ChatState {
const locale = useI18n();
const workflowsStore = useWorkflowsStore();
const rootStore = useRootStore();
const logsStore = useLogsStore();
const router = useRouter();
const nodeHelpers = useNodeHelpers();
const { runWorkflow } = useRunWorkflow({ router });
const ws = ref<WebSocket | null>(null);
const messages = ref<ChatMessage[]>([]);
const currentSessionId = ref<string>(uuid().replace(/-/g, ''));
@@ -52,25 +62,32 @@ export function useChatState(isReadOnly: boolean): ChatState {
)?.allowedFilesMimeTypes?.toString() ?? '',
);
const { sendMessage, isLoading } = useChatMessaging({
const respondNodesResponseMode = computed(
() =>
(chatTriggerNode.value?.parameters?.options as { responseMode?: string })?.responseMode ===
'responseNodes',
);
const { sendMessage, isLoading, setLoadingState } = useChatMessaging({
chatTrigger: chatTriggerNode,
messages,
sessionId: currentSessionId,
executionResultData: computed(() => workflowsStore.getWorkflowExecution?.data?.resultData),
onRunChatWorkflow,
ws,
});
// Extracted pure functions for better testability
function createChatConfig(params: {
messages: Chat['messages'];
sendMessage: Chat['sendMessage'];
sendMessage: IntegratedChat['sendMessage'];
currentSessionId: Chat['currentSessionId'];
isLoading: Ref<boolean>;
isDisabled: Ref<boolean>;
allowFileUploads: Ref<boolean>;
locale: ReturnType<typeof useI18n>;
}): { chatConfig: Chat; chatOptions: ChatOptions } {
const chatConfig: Chat = {
}): { chatConfig: IntegratedChat; chatOptions: ChatOptions } {
const chatConfig: IntegratedChat = {
messages: params.messages,
sendMessage: params.sendMessage,
initialMessages: ref([]),
@@ -154,6 +171,43 @@ export function useChatState(isReadOnly: boolean): ChatState {
const response = await runWorkflow(runWorkflowOptions);
if (response) {
if (respondNodesResponseMode.value) {
const wsUrl = constructChatWebsocketUrl(
rootStore.urlBaseEditor,
response.executionId as string,
currentSessionId.value,
false,
);
ws.value = new WebSocket(wsUrl);
ws.value.onmessage = (event) => {
if (event.data === 'n8n|heartbeat') {
ws.value?.send('n8n|heartbeat-ack');
return;
}
if (event.data === 'n8n|continue') {
setLoadingState(true);
return;
}
setLoadingState(false);
const newMessage: ChatMessage & { sessionId: string } = {
text: event.data,
sender: 'bot',
sessionId: currentSessionId.value,
id: uuid(),
};
messages.value.push(newMessage);
if (logsStore.isOpen) {
chatEventBus.emit('focusInput');
}
};
ws.value.onclose = () => {
setLoadingState(false);
ws.value = null;
};
}
await createExecutionPromise();
workflowsStore.appendChatMessage(payload.message);
return response;

View File

@@ -14,13 +14,11 @@ import {
getTreeNodeData,
mergeStartData,
restoreChatHistory,
processFiles,
extractBotResponse,
} from './logs.utils';
import {
AGENT_LANGCHAIN_NODE_TYPE,
NodeConnectionTypes,
type ExecutionError,
type ITaskStartedData,
} from 'n8n-workflow';
import { AGENT_LANGCHAIN_NODE_TYPE, NodeConnectionTypes } from 'n8n-workflow';
import type { ExecutionError, ITaskStartedData, IRunExecutionData } from 'n8n-workflow';
import {
aiAgentNode,
aiChatWorkflow,
@@ -1170,6 +1168,115 @@ describe(createLogTree, () => {
expect(logs[0].children).toHaveLength(1);
expect(logs[0].children[0].node.name).toBe(aiModelNode.name);
});
it('should process files correctly', async () => {
const mockFile = new File(['test content'], 'test.txt', { type: 'text/plain' });
const result = await processFiles([mockFile]);
expect(result).toEqual([
{
name: 'test.txt',
type: 'text/plain',
data: 'data:text/plain;base64,dGVzdCBjb250ZW50',
},
]);
});
it('should return an empty array if no files are provided', async () => {
expect(await processFiles(undefined)).toEqual([]);
expect(await processFiles([])).toEqual([]);
});
});
describe('extractBotResponse', () => {
it('should extract a successful bot response', () => {
const resultData: IRunExecutionData['resultData'] = {
lastNodeExecuted: 'nodeA',
runData: {
nodeA: [
{
executionTime: 1,
startTime: 1,
executionIndex: 1,
source: [],
data: {
main: [[{ json: { message: 'Test output' } }]],
},
},
],
},
};
const executionId = 'test-exec-id';
const result = extractBotResponse(resultData, executionId);
expect(result).toEqual({
text: 'Test output',
sender: 'bot',
id: executionId,
});
});
it('should extract an error bot response', () => {
const resultData: IRunExecutionData['resultData'] = {
lastNodeExecuted: 'nodeA',
runData: {
nodeA: [
{
executionTime: 1,
startTime: 1,
executionIndex: 1,
source: [],
error: {
message: 'Test error',
} as unknown as ExecutionError,
},
],
},
};
const executionId = 'test-exec-id';
const result = extractBotResponse(resultData, executionId);
expect(result).toEqual({
text: '[ERROR: Test error]',
sender: 'bot',
id: 'test-exec-id',
});
});
it('should return undefined if no response data is available', () => {
const resultData = {
lastNodeExecuted: 'nodeA',
runData: {
nodeA: [
{
executionTime: 1,
startTime: 1,
executionIndex: 1,
source: [],
},
],
},
};
const executionId = 'test-exec-id';
const result = extractBotResponse(resultData, executionId);
expect(result).toBeUndefined();
});
it('should return undefined if lastNodeExecuted is not available', () => {
const resultData = {
runData: {
nodeA: [
{
executionTime: 1,
startTime: 1,
executionIndex: 1,
source: [],
},
],
},
};
const executionId = 'test-exec-id';
const result = extractBotResponse(resultData, executionId);
expect(result).toBeUndefined();
});
});
describe(deepToRaw, () => {

View File

@@ -565,7 +565,7 @@ function extractResponseText(responseData?: IDataObject): string | undefined {
}
// Paths where the response message might be located
const paths = ['output', 'text', 'response.text'];
const paths = ['output', 'text', 'response.text', 'message'];
const matchedPath = paths.find((path) => get(responseData, path));
if (!matchedPath) return JSON.stringify(responseData, null, 2);
@@ -599,6 +599,32 @@ export function restoreChatHistory(
return [...(userMessage ? [userMessage] : []), ...(botMessage ? [botMessage] : [])];
}
export async function processFiles(data: File[] | undefined) {
if (!data || data.length === 0) return [];
const filePromises = data.map(async (file) => {
// We do not need to await here as it will be awaited on the return by Promise.all
// eslint-disable-next-line @typescript-eslint/return-await
return new Promise<{ name: string; type: string; data: string }>((resolve, reject) => {
const reader = new FileReader();
reader.onload = () =>
resolve({
name: file.name,
type: file.type,
data: reader.result as string,
});
reader.onerror = () =>
reject(new Error(`Error reading file: ${reader.error?.message ?? 'Unknown error'}`));
reader.readAsDataURL(file);
});
});
return await Promise.all(filePromises);
}
export function isSubNodeLog(logEntry: LogEntry): boolean {
return logEntry.parent !== undefined && logEntry.parent.executionId === logEntry.executionId;
}

View File

@@ -19,6 +19,7 @@ import {
FORM_TRIGGER_NODE_TYPE,
CHAT_TRIGGER_NODE_TYPE,
WAIT_NODE_TYPE,
WAIT_INDEFINITELY,
} from 'n8n-workflow';
import type { Readable } from 'stream';
@@ -334,6 +335,14 @@ export class RespondToWebhook implements INodeType {
],
};
async onMessage(
context: IExecuteFunctions,
_data: INodeExecutionData,
): Promise<INodeExecutionData[][]> {
const inputData = context.getInputData();
return [inputData];
}
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const nodeVersion = this.getNode().typeVersion;
@@ -347,6 +356,10 @@ export class RespondToWebhook implements INodeType {
let response: IN8nHttpFullResponse;
const connectedNodes = this.getParentNodes(this.getNode().name, {
includeNodeParameters: true,
});
const options = this.getNodeParameter('options', 0, {});
const shouldStream =
@@ -354,7 +367,6 @@ export class RespondToWebhook implements INodeType {
try {
if (nodeVersion >= 1.1) {
const connectedNodes = this.getParentNodes(this.getNode().name);
if (!connectedNodes.some(({ type }) => WEBHOOK_NODE_TYPES.includes(type))) {
throw new NodeOperationError(
this.getNode(),
@@ -507,6 +519,40 @@ export class RespondToWebhook implements INodeType {
);
}
const chatTrigger = connectedNodes.find(
(node) => node.type === CHAT_TRIGGER_NODE_TYPE && !node.disabled,
);
const parameters = chatTrigger?.parameters as {
options: { responseMode: string };
};
// if workflow is started from chat trigger and responseMode is set to "responseNodes"
// response to chat will be send by ChatService
if (
chatTrigger &&
!chatTrigger.disabled &&
parameters.options.responseMode === 'responseNodes'
) {
let message = '';
if (responseBody && typeof responseBody === 'object' && !Array.isArray(responseBody)) {
message =
(((responseBody as IDataObject).output ??
(responseBody as IDataObject).text ??
(responseBody as IDataObject).message) as string) ?? '';
if (message === '' && Object.keys(responseBody).length > 0) {
try {
message = JSON.stringify(responseBody, null, 2);
} catch (e) {}
}
}
await this.putExecutionToWait(WAIT_INDEFINITELY);
return [[{ json: {}, sendMessage: message }]];
}
if (
hasHtmlContentType &&
respondWith !== 'text' &&

View File

@@ -8,6 +8,7 @@ import {
type INode,
type INodeExecutionData,
type NodeTypeAndVersion,
CHAT_TRIGGER_NODE_TYPE,
} from 'n8n-workflow';
import { RespondToWebhook } from '../RespondToWebhook.node';
@@ -23,6 +24,78 @@ describe('RespondToWebhook Node', () => {
});
});
describe('chatTrigger response', () => {
it('should handle chatTrigger correctly when enabled and responseBody is an object', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.4 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { options: { responseMode: 'responseNodes' } },
}),
]);
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'json';
if (paramName === 'responseBody') return { message: 'Hello World' };
if (paramName === 'options') return {};
});
mockExecuteFunctions.putExecutionToWait.mockResolvedValue();
const result = await respondToWebhook.execute.call(mockExecuteFunctions);
expect(result).toEqual([[{ json: {}, sendMessage: 'Hello World' }]]);
});
it('should handle chatTrigger correctly when enabled and responseBody is not an object', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.1 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({
type: CHAT_TRIGGER_NODE_TYPE,
disabled: false,
parameters: { options: { responseMode: 'responseNodes' } },
}),
]);
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'text';
if (paramName === 'responseBody') return 'Just a string';
if (paramName === 'options') return {};
});
mockExecuteFunctions.putExecutionToWait.mockResolvedValue();
const result = await respondToWebhook.execute.call(mockExecuteFunctions);
expect(result).toEqual([[{ json: {}, sendMessage: '' }]]);
});
it('should not handle chatTrigger when disabled', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.1 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: CHAT_TRIGGER_NODE_TYPE, disabled: true }),
]);
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'json';
if (paramName === 'responseBody') return { message: 'Hello World' };
if (paramName === 'options') return {};
});
mockExecuteFunctions.sendResponse.mockReturnValue();
await expect(respondToWebhook.execute.call(mockExecuteFunctions)).resolves.not.toThrow();
expect(mockExecuteFunctions.sendResponse).toHaveBeenCalled();
});
it('should return input data onMessage call', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
const result = await respondToWebhook.onMessage(mockExecuteFunctions, {
json: { message: '' },
});
expect(result).toEqual([[{ json: { input: true } }]]);
});
});
describe('execute method', () => {
it('should throw an error if no WEBHOOK_NODE_TYPES in parents', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([]);

View File

@@ -42,6 +42,7 @@ export const FORM_NODE_TYPE = 'n8n-nodes-base.form';
export const FORM_TRIGGER_NODE_TYPE = 'n8n-nodes-base.formTrigger';
export const CHAT_TRIGGER_NODE_TYPE = '@n8n/n8n-nodes-langchain.chatTrigger';
export const WAIT_NODE_TYPE = 'n8n-nodes-base.wait';
export const RESPOND_TO_WEBHOOK_NODE_TYPE = 'n8n-nodes-base.respondToWebhook';
export const HTML_NODE_TYPE = 'n8n-nodes-base.html';
export const MAILGUN_NODE_TYPE = 'n8n-nodes-base.mailgun';
export const POSTGRES_NODE_TYPE = 'n8n-nodes-base.postgres';
@@ -118,3 +119,5 @@ export const FROM_AI_AUTO_GENERATED_MARKER = '/*n8n-auto-generated-fromAI-overri
export const PROJECT_ROOT = '0';
export const WAITING_FORMS_EXECUTION_STATUS = 'n8n-execution-status';
export const CHAT_WAIT_USER_REPLY = 'waitUserReply';

View File

@@ -874,10 +874,14 @@ export interface FunctionsBase {
nodeName: string,
options?: { includeNodeParameters?: boolean },
): NodeTypeAndVersion[];
getParentNodes(nodeName: string): NodeTypeAndVersion[];
getParentNodes(
nodeName: string,
options?: { includeNodeParameters?: boolean },
): NodeTypeAndVersion[];
getKnownNodeTypes(): IDataObject;
getMode?: () => WorkflowExecuteMode;
getActivationMode?: () => WorkflowActivateMode;
getChatTrigger: () => INode | null;
/** @deprecated */
prepareOutputData(outputData: INodeExecutionData[]): Promise<INodeExecutionData[][]>;
@@ -1201,6 +1205,7 @@ export interface INodeExecutionData {
| NodeApiError
| NodeOperationError
| number
| string
| undefined;
json: IDataObject;
binary?: IBinaryKeyData;
@@ -1209,6 +1214,16 @@ export interface INodeExecutionData {
metadata?: {
subExecution: RelatedExecution;
};
/**
* Use this key to send a message to the chat.
*
* - Workflow has to be started by a chat node.
* - Put execution to wait after sending.
*
* See example in
* packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/Chat.node.ts
*/
sendMessage?: string;
/**
* @deprecated This key was added by accident and should not be used as it
@@ -1624,6 +1639,11 @@ export interface INodeType {
description: INodeTypeDescription;
supplyData?(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData>;
execute?(this: IExecuteFunctions): Promise<NodeOutput>;
/**
* A function called when a node receives a chat message. Allows it to react
* to the message before it gets executed.
*/
onMessage?(context: IExecuteFunctions, data: INodeExecutionData): Promise<NodeOutput>;
poll?(this: IPollFunctions): Promise<INodeExecutionData[][] | null>;
trigger?(this: ITriggerFunctions): Promise<ITriggerResponse | undefined>;
webhook?(this: IWebhookFunctions): Promise<IWebhookResponseData>;
@@ -2110,11 +2130,28 @@ export interface IWebhookResponseData {
}
export type WebhookResponseData = 'allEntries' | 'firstEntryJson' | 'firstEntryBinary' | 'noData';
/**
* Defines how and when response should be sent:
*
* onReceived: Response is sent immidiatly after node done executing
*
* lastNode: Response is sent after the last node finishes executing
*
* responseNode: Response is sent from the Responde to Webhook node
*
* formPage: Special response with executionId sent to the form trigger node
*
* hostedChat: Special response with executionId sent to the hosted chat trigger node
*
* streaming: Response added to runData to httpResponse and streamingEnabled set to true
*/
export type WebhookResponseMode =
| 'onReceived'
| 'lastNode'
| 'responseNode'
| 'formPage'
| 'hostedChat'
| 'streaming';
export interface INodeTypes {