diff --git a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts index 948a6b714b..1c65225e24 100644 --- a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts @@ -35,6 +35,173 @@ const allowedFileMimeTypeOption: INodeProperties = { 'Allowed file types for upload. Comma-separated list of MIME types.', }; +const responseModeOptions = [ + { + name: 'When Last Node Finishes', + value: 'lastNode', + description: 'Returns data of the last-executed node', + }, + { + name: "Using 'Respond to Webhook' Node", + value: 'responseNode', + description: 'Response defined in that node', + }, +]; + +const responseModeWithStreamingOptions = [ + ...responseModeOptions, + { + name: 'Streaming Response', + value: 'streaming', + description: 'Streaming response from specified nodes (e.g. Agents)', + }, +]; + +const commonOptionsFields: INodeProperties[] = [ + // CORS parameters are only valid for when chat is used in hosted or webhook mode + { + displayName: 'Allowed Origins (CORS)', + name: 'allowedOrigins', + type: 'string', + default: '*', + description: + 'Comma-separated list of URLs allowed for cross-origin non-preflight requests. Use * (default) to allow all origins.', + displayOptions: { + show: { + '/mode': ['hostedChat', 'webhook'], + }, + }, + }, + { + ...allowFileUploadsOption, + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + }, + { + ...allowedFileMimeTypeOption, + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + }, + { + displayName: 'Input Placeholder', + name: 'inputPlaceholder', + type: 'string', + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + default: 'Type your question..', + placeholder: 'e.g. Type your message here', + description: 'Shown as placeholder text in the chat input field', + }, + { + displayName: 'Load Previous Session', + name: 'loadPreviousSession', + type: 'options', + options: [ + { + name: 'Off', + value: 'notSupported', + description: 'Loading messages of previous session is turned off', + }, + { + name: 'From Memory', + value: 'memory', + description: 'Load session messages from memory', + }, + { + name: 'Manually', + value: 'manually', + description: 'Manually return messages of session', + }, + ], + default: 'notSupported', + description: 'If loading messages of a previous session should be enabled', + }, + { + displayName: 'Require Button Click to Start Chat', + name: 'showWelcomeScreen', + type: 'boolean', + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + default: false, + description: 'Whether to show the welcome screen at the start of the chat', + }, + { + displayName: 'Start Conversation Button Text', + name: 'getStarted', + type: 'string', + displayOptions: { + show: { + showWelcomeScreen: [true], + '/mode': ['hostedChat'], + }, + }, + default: 'New Conversation', + placeholder: 'e.g. New Conversation', + description: 'Shown as part of the welcome screen, in the middle of the chat window', + }, + { + displayName: 'Subtitle', + name: 'subtitle', + type: 'string', + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + default: "Start a chat. We're here to help you 24/7.", + placeholder: "e.g. We're here for you", + description: 'Shown at the top of the chat, under the title', + }, + { + displayName: 'Title', + name: 'title', + type: 'string', + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + default: 'Hi there! 👋', + placeholder: 'e.g. Welcome', + description: 'Shown at the top of the chat', + }, + { + displayName: 'Custom Chat Styling', + name: 'customCss', + type: 'string', + typeOptions: { + rows: 10, + editor: 'cssEditor', + }, + displayOptions: { + show: { + '/mode': ['hostedChat'], + }, + }, + default: ` +${cssVariables} + +/* You can override any class styles, too. Right-click inspect in Chat UI to find class to override. */ +.chat-message { + max-width: 50%; +} +`.trim(), + description: 'Override default styling of the public chat interface with CSS', + }, +]; + export class ChatTrigger extends Node { description: INodeTypeDescription = { displayName: 'Chat Trigger', @@ -42,7 +209,9 @@ export class ChatTrigger extends Node { icon: 'fa:comments', iconColor: 'black', group: ['trigger'], - version: [1, 1.1], + version: [1, 1.1, 1.2], + // Keep the default version as 1.1 to avoid releasing streaming in broken state + defaultVersion: 1.1, description: 'Runs the workflow when an n8n generated webchat is submitted', defaults: { name: 'When chat message received', @@ -228,6 +397,7 @@ export class ChatTrigger extends Node { default: {}, options: [allowFileUploadsOption, allowedFileMimeTypeOption], }, + // Options for versions 1.0 and 1.1 (without streaming) { displayName: 'Options', name: 'options', @@ -236,171 +406,46 @@ export class ChatTrigger extends Node { show: { mode: ['hostedChat', 'webhook'], public: [true], + '@version': [1, 1.1], }, }, placeholder: 'Add Field', default: {}, options: [ - // CORS parameters are only valid for when chat is used in hosted or webhook mode - { - displayName: 'Allowed Origins (CORS)', - name: 'allowedOrigins', - type: 'string', - default: '*', - description: - 'Comma-separated list of URLs allowed for cross-origin non-preflight requests. Use * (default) to allow all origins.', - displayOptions: { - show: { - '/mode': ['hostedChat', 'webhook'], - }, - }, - }, - { - ...allowFileUploadsOption, - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - }, - { - ...allowedFileMimeTypeOption, - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - }, - { - displayName: 'Input Placeholder', - name: 'inputPlaceholder', - type: 'string', - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - default: 'Type your question..', - placeholder: 'e.g. Type your message here', - description: 'Shown as placeholder text in the chat input field', - }, - { - displayName: 'Load Previous Session', - name: 'loadPreviousSession', - type: 'options', - options: [ - { - name: 'Off', - value: 'notSupported', - description: 'Loading messages of previous session is turned off', - }, - { - name: 'From Memory', - value: 'memory', - description: 'Load session messages from memory', - }, - { - name: 'Manually', - value: 'manually', - description: 'Manually return messages of session', - }, - ], - default: 'notSupported', - description: 'If loading messages of a previous session should be enabled', - }, + ...commonOptionsFields, { displayName: 'Response Mode', name: 'responseMode', type: 'options', - options: [ - { - name: 'When Last Node Finishes', - value: 'lastNode', - description: 'Returns data of the last-executed node', - }, - { - name: "Using 'Respond to Webhook' Node", - value: 'responseNode', - description: 'Response defined in that node', - }, - ], + options: responseModeOptions, default: 'lastNode', description: 'When and how to respond to the webhook', }, - { - displayName: 'Require Button Click to Start Chat', - name: 'showWelcomeScreen', - type: 'boolean', - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - default: false, - description: 'Whether to show the welcome screen at the start of the chat', + ], + }, + // Options for version 1.2+ (with streaming) + { + displayName: 'Options', + name: 'options', + type: 'collection', + displayOptions: { + show: { + mode: ['hostedChat', 'webhook'], + public: [true], + '@version': [{ _cnd: { gte: 1.2 } }], }, + }, + placeholder: 'Add Field', + default: {}, + options: [ + ...commonOptionsFields, { - displayName: 'Start Conversation Button Text', - name: 'getStarted', - type: 'string', - displayOptions: { - show: { - showWelcomeScreen: [true], - '/mode': ['hostedChat'], - }, - }, - default: 'New Conversation', - placeholder: 'e.g. New Conversation', - description: 'Shown as part of the welcome screen, in the middle of the chat window', - }, - { - displayName: 'Subtitle', - name: 'subtitle', - type: 'string', - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - default: "Start a chat. We're here to help you 24/7.", - placeholder: "e.g. We're here for you", - description: 'Shown at the top of the chat, under the title', - }, - { - displayName: 'Title', - name: 'title', - type: 'string', - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - default: 'Hi there! 👋', - placeholder: 'e.g. Welcome', - description: 'Shown at the top of the chat', - }, - { - displayName: 'Custom Chat Styling', - name: 'customCss', - type: 'string', - typeOptions: { - rows: 10, - editor: 'cssEditor', - }, - displayOptions: { - show: { - '/mode': ['hostedChat'], - }, - }, - default: ` -${cssVariables} - -/* You can override any class styles, too. Right-click inspect in Chat UI to find class to override. */ -.chat-message { - max-width: 50%; -} -`.trim(), - description: 'Override default styling of the public chat interface with CSS', + displayName: 'Response Mode', + name: 'responseMode', + type: 'options', + options: responseModeWithStreamingOptions, + default: 'lastNode', + description: 'When and how to respond to the webhook', }, ], }, @@ -493,6 +538,9 @@ ${cssVariables} customCss?: string; }; + const responseMode = ctx.getNodeParameter('options.responseMode', 'lastNode') as string; + const enableStreaming = responseMode === 'streaming'; + const req = ctx.getRequestObject(); const webhookName = ctx.getWebhookName(); const mode = ctx.getMode() === 'manual' ? 'test' : 'production'; @@ -573,6 +621,32 @@ ${cssVariables} let returnData: INodeExecutionData[]; const webhookResponse: IDataObject = { status: 200 }; + + // Handle streaming responses + if (enableStreaming) { + // Set up streaming response headers + res.writeHead(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + + // Flush headers immediately + res.flushHeaders(); + + if (req.contentType === 'multipart/form-data') { + returnData = [await this.handleFormData(ctx)]; + } else { + returnData = [{ json: bodyData }]; + } + + return { + workflowData: [ctx.helpers.returnJsonArray(returnData)], + noWebhookResponse: true, + }; + } + if (req.contentType === 'multipart/form-data') { returnData = [await this.handleFormData(ctx)]; return { diff --git a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/__test__/ChatTrigger.node.test.ts b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/__test__/ChatTrigger.node.test.ts index 2f5510b4ed..1dd55ff0b5 100644 --- a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/__test__/ChatTrigger.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/__test__/ChatTrigger.node.test.ts @@ -130,4 +130,119 @@ describe('ChatTrigger Node', () => { }); }); }); + + describe('webhook method: streaming response mode', () => { + beforeEach(() => { + mockContext.getWebhookName.mockReturnValue('default'); + mockContext.getMode.mockReturnValue('production' as any); + mockContext.getBodyData.mockReturnValue({ message: 'Hello' }); + (mockContext.helpers.returnJsonArray as any) = jest.fn().mockReturnValue([]); + mockResponse.writeHead.mockImplementation(() => mockResponse); + mockResponse.flushHeaders.mockImplementation(() => undefined); + }); + + it('should enable streaming when responseMode is "streaming"', async () => { + // Mock options with streaming responseMode + mockContext.getNodeParameter.mockImplementation( + ( + paramName: string, + defaultValue?: boolean | string | object, + ): boolean | string | object | undefined => { + if (paramName === 'public') return true; + if (paramName === 'mode') return 'hostedChat'; + if (paramName === 'options') return {}; + if (paramName === 'options.responseMode') return 'streaming'; + return defaultValue; + }, + ); + + // Call the webhook method + const result = await chatTrigger.webhook(mockContext); + + // Verify streaming headers are set + expect(mockResponse.writeHead).toHaveBeenCalledWith(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + expect(mockResponse.flushHeaders).toHaveBeenCalled(); + + // Verify response structure for streaming + expect(result).toEqual({ + workflowData: expect.any(Array), + noWebhookResponse: true, + }); + }); + + it('should not enable streaming when responseMode is not "streaming"', async () => { + // Mock options with lastNode responseMode + mockContext.getNodeParameter.mockImplementation( + ( + paramName: string, + defaultValue?: boolean | string | object, + ): boolean | string | object | undefined => { + if (paramName === 'public') return true; + if (paramName === 'mode') return 'hostedChat'; + if (paramName === 'options') return {}; + if (paramName === 'options.responseMode') return 'lastNode'; + return defaultValue; + }, + ); + + // Call the webhook method + const result = await chatTrigger.webhook(mockContext); + + // Verify streaming headers are NOT set + expect(mockResponse.writeHead).not.toHaveBeenCalled(); + expect(mockResponse.flushHeaders).not.toHaveBeenCalled(); + + // Verify normal response structure + expect(result).toEqual({ + webhookResponse: { status: 200 }, + workflowData: expect.any(Array), + }); + }); + + it('should handle multipart form data with streaming enabled', async () => { + // Mock multipart form data request + mockRequest.contentType = 'multipart/form-data'; + mockRequest.body = { + data: { message: 'Hello' }, + files: {}, + }; + + // Mock options with streaming responseMode + mockContext.getNodeParameter.mockImplementation( + ( + paramName: string, + defaultValue?: boolean | string | object, + ): boolean | string | object | undefined => { + if (paramName === 'public') return true; + if (paramName === 'mode') return 'hostedChat'; + if (paramName === 'options') return {}; + if (paramName === 'options.responseMode') return 'streaming'; + return defaultValue; + }, + ); + + // Call the webhook method + const result = await chatTrigger.webhook(mockContext); + + // Verify streaming headers are set + expect(mockResponse.writeHead).toHaveBeenCalledWith(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + expect(mockResponse.flushHeaders).toHaveBeenCalled(); + + // Verify response structure for streaming + expect(result).toEqual({ + workflowData: expect.any(Array), + noWebhookResponse: true, + }); + }); + }); }); diff --git a/packages/nodes-base/nodes/Webhook/Webhook.node.ts b/packages/nodes-base/nodes/Webhook/Webhook.node.ts index 9255dab346..f14ff4ecaa 100644 --- a/packages/nodes-base/nodes/Webhook/Webhook.node.ts +++ b/packages/nodes-base/nodes/Webhook/Webhook.node.ts @@ -27,6 +27,7 @@ import { responseCodeProperty, responseDataProperty, responseModeProperty, + responseModePropertyStreaming, } from './description'; import { WebhookAuthorizationError } from './error'; import { @@ -45,7 +46,9 @@ export class Webhook extends Node { icon: { light: 'file:webhook.svg', dark: 'file:webhook.dark.svg' }, name: 'webhook', group: ['trigger'], - version: [1, 1.1, 2], + version: [1, 1.1, 2, 2.1], + // Keep the default version as 2 to avoid releasing streaming in broken state + defaultVersion: 2, description: 'Starts the workflow when a webhook is called', eventTriggerDescription: 'Waiting for you to call the Test URL', activationMessage: 'You can now make calls to your production webhook URL.', @@ -136,6 +139,7 @@ export class Webhook extends Node { }, authenticationProperty(this.authPropertyName), responseModeProperty, + responseModePropertyStreaming, { displayName: 'Insert a \'Respond to Webhook\' node to control when and how you respond. More details', @@ -148,6 +152,18 @@ export class Webhook extends Node { }, default: '', }, + { + displayName: + 'Insert a node that supports streaming (e.g. \'AI Agent\') and enable streaming to stream directly to the response while the workflow is executed. More details', + name: 'webhookStreamingNotice', + type: 'notice', + displayOptions: { + show: { + responseMode: ['streaming'], + }, + }, + default: '', + }, { ...responseCodeProperty, displayOptions: { @@ -179,6 +195,7 @@ export class Webhook extends Node { async webhook(context: IWebhookFunctions): Promise { const { typeVersion: nodeVersion, type: nodeType } = context.getNode(); + const responseMode = context.getNodeParameter('responseMode', 'onReceived') as string; if (nodeVersion >= 2 && nodeType === 'n8n-nodes-base.webhook') { checkResponseModeConfiguration(context); @@ -254,6 +271,26 @@ export class Webhook extends Node { : undefined, }; + if (responseMode === 'streaming') { + const res = context.getResponseObject(); + + // Set up streaming response headers + res.writeHead(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + + // Flush headers immediately + res.flushHeaders(); + + return { + noWebhookResponse: true, + workflowData: prepareOutput(response), + }; + } + return { webhookResponse: options.responseData, workflowData: prepareOutput(response), diff --git a/packages/nodes-base/nodes/Webhook/description.ts b/packages/nodes-base/nodes/Webhook/description.ts index 42a81aab71..2eb314390c 100644 --- a/packages/nodes-base/nodes/Webhook/description.ts +++ b/packages/nodes-base/nodes/Webhook/description.ts @@ -125,29 +125,57 @@ export const responseCodeProperty: INodeProperties = { description: 'The HTTP Response code to return', }; +const responseModeOptions = [ + { + name: 'Immediately', + value: 'onReceived', + description: 'As soon as this node executes', + }, + { + name: 'When Last Node Finishes', + value: 'lastNode', + description: 'Returns data of the last-executed node', + }, + { + name: "Using 'Respond to Webhook' Node", + value: 'responseNode', + description: 'Response defined in that node', + }, +]; + export const responseModeProperty: INodeProperties = { + displayName: 'Respond', + name: 'responseMode', + type: 'options', + options: responseModeOptions, + default: 'onReceived', + description: 'When and how to respond to the webhook', + displayOptions: { + show: { + '@version': [1, 1.1, 2], + }, + }, +}; + +export const responseModePropertyStreaming: INodeProperties = { displayName: 'Respond', name: 'responseMode', type: 'options', options: [ + ...responseModeOptions, { - name: 'Immediately', - value: 'onReceived', - description: 'As soon as this node executes', - }, - { - name: 'When Last Node Finishes', - value: 'lastNode', - description: 'Returns data of the last-executed node', - }, - { - name: "Using 'Respond to Webhook' Node", - value: 'responseNode', - description: 'Response defined in that node', + name: 'Streaming Response', + value: 'streaming', + description: 'Returns data in real time from streaming enabled nodes', }, ], default: 'onReceived', description: 'When and how to respond to the webhook', + displayOptions: { + hide: { + '@version': [1, 1.1, 2], + }, + }, }; export const responseDataProperty: INodeProperties = { diff --git a/packages/nodes-base/nodes/Webhook/test/Webhook.test.ts b/packages/nodes-base/nodes/Webhook/test/Webhook.test.ts index 483e3077f4..b8c2d3d82d 100644 --- a/packages/nodes-base/nodes/Webhook/test/Webhook.test.ts +++ b/packages/nodes-base/nodes/Webhook/test/Webhook.test.ts @@ -1,5 +1,5 @@ import { NodeTestHarness } from '@nodes-testing/node-test-harness'; -import type { Request } from 'express'; +import type { Request, Response } from 'express'; import { mock } from 'jest-mock-extended'; import type { IWebhookFunctions } from 'n8n-workflow'; @@ -40,4 +40,93 @@ describe('Test Webhook Node', () => { expect(context.nodeHelpers.copyBinaryFile).toHaveBeenCalled(); }); }); + + describe('streaming response mode', () => { + const node = new Webhook(); + const context = mock({ + nodeHelpers: mock(), + }); + const req = mock(); + const res = mock(); + + beforeEach(() => { + jest.clearAllMocks(); + context.getRequestObject.mockReturnValue(req); + context.getResponseObject.mockReturnValue(res); + context.getChildNodes.mockReturnValue([]); + context.getNode.mockReturnValue({ + type: 'n8n-nodes-base.webhook', + typeVersion: 2, + name: 'Webhook', + } as any); + context.getNodeParameter.mockImplementation((paramName: string) => { + if (paramName === 'options') return {}; + if (paramName === 'responseMode') return 'streaming'; + return undefined; + }); + req.headers = {}; + req.params = {}; + req.query = {}; + req.body = { message: 'test' }; + Object.defineProperty(req, 'ips', { value: [], configurable: true }); + Object.defineProperty(req, 'ip', { value: '127.0.0.1', configurable: true }); + res.writeHead.mockImplementation(() => res); + res.flushHeaders.mockImplementation(() => undefined); + }); + + it('should enable streaming when responseMode is "streaming"', async () => { + const result = await node.webhook(context); + + // Verify streaming headers are set + expect(res.writeHead).toHaveBeenCalledWith(200, { + 'Content-Type': 'application/json; charset=utf-8', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + expect(res.flushHeaders).toHaveBeenCalled(); + + // Verify response structure for streaming + expect(result).toEqual({ + noWebhookResponse: true, + workflowData: expect.any(Array), + }); + }); + + it('should not enable streaming when responseMode is not "streaming"', async () => { + context.getNodeParameter.mockImplementation((paramName: string) => { + if (paramName === 'options') return {}; + if (paramName === 'responseMode') return 'onReceived'; + return undefined; + }); + + const result = await node.webhook(context); + + // Verify streaming headers are NOT set + expect(res.writeHead).not.toHaveBeenCalled(); + expect(res.flushHeaders).not.toHaveBeenCalled(); + + // Verify normal response structure + expect(result).toEqual({ + webhookResponse: undefined, + workflowData: expect.any(Array), + }); + }); + + it('should handle multipart form data with streaming enabled', async () => { + req.contentType = 'multipart/form-data'; + req.body = { + data: { message: 'Hello' }, + files: {}, + }; + + const result = await node.webhook(context); + + // For multipart form data, streaming is handled in handleFormData method + // The current implementation returns normal workflowData for form data + expect(result).toEqual({ + workflowData: expect.any(Array), + }); + }); + }); });