mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
feat: Implement streaming response node on ChatTrigger and Webhook (no-changelog) (#16761)
This commit is contained in:
@@ -27,6 +27,7 @@ import {
|
||||
responseCodeProperty,
|
||||
responseDataProperty,
|
||||
responseModeProperty,
|
||||
responseModePropertyStreaming,
|
||||
} from './description';
|
||||
import { WebhookAuthorizationError } from './error';
|
||||
import {
|
||||
@@ -45,7 +46,9 @@ export class Webhook extends Node {
|
||||
icon: { light: 'file:webhook.svg', dark: 'file:webhook.dark.svg' },
|
||||
name: 'webhook',
|
||||
group: ['trigger'],
|
||||
version: [1, 1.1, 2],
|
||||
version: [1, 1.1, 2, 2.1],
|
||||
// Keep the default version as 2 to avoid releasing streaming in broken state
|
||||
defaultVersion: 2,
|
||||
description: 'Starts the workflow when a webhook is called',
|
||||
eventTriggerDescription: 'Waiting for you to call the Test URL',
|
||||
activationMessage: 'You can now make calls to your production webhook URL.',
|
||||
@@ -136,6 +139,7 @@ export class Webhook extends Node {
|
||||
},
|
||||
authenticationProperty(this.authPropertyName),
|
||||
responseModeProperty,
|
||||
responseModePropertyStreaming,
|
||||
{
|
||||
displayName:
|
||||
'Insert a \'Respond to Webhook\' node to control when and how you respond. <a href="https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.respondtowebhook/" target="_blank">More details</a>',
|
||||
@@ -148,6 +152,18 @@ export class Webhook extends Node {
|
||||
},
|
||||
default: '',
|
||||
},
|
||||
{
|
||||
displayName:
|
||||
'Insert a node that supports streaming (e.g. \'AI Agent\') and enable streaming to stream directly to the response while the workflow is executed. <a href="https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.respondtowebhook/" target="_blank">More details</a>',
|
||||
name: 'webhookStreamingNotice',
|
||||
type: 'notice',
|
||||
displayOptions: {
|
||||
show: {
|
||||
responseMode: ['streaming'],
|
||||
},
|
||||
},
|
||||
default: '',
|
||||
},
|
||||
{
|
||||
...responseCodeProperty,
|
||||
displayOptions: {
|
||||
@@ -179,6 +195,7 @@ export class Webhook extends Node {
|
||||
|
||||
async webhook(context: IWebhookFunctions): Promise<IWebhookResponseData> {
|
||||
const { typeVersion: nodeVersion, type: nodeType } = context.getNode();
|
||||
const responseMode = context.getNodeParameter('responseMode', 'onReceived') as string;
|
||||
|
||||
if (nodeVersion >= 2 && nodeType === 'n8n-nodes-base.webhook') {
|
||||
checkResponseModeConfiguration(context);
|
||||
@@ -254,6 +271,26 @@ export class Webhook extends Node {
|
||||
: undefined,
|
||||
};
|
||||
|
||||
if (responseMode === 'streaming') {
|
||||
const res = context.getResponseObject();
|
||||
|
||||
// Set up streaming response headers
|
||||
res.writeHead(200, {
|
||||
'Content-Type': 'application/json; charset=utf-8',
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
});
|
||||
|
||||
// Flush headers immediately
|
||||
res.flushHeaders();
|
||||
|
||||
return {
|
||||
noWebhookResponse: true,
|
||||
workflowData: prepareOutput(response),
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
webhookResponse: options.responseData,
|
||||
workflowData: prepareOutput(response),
|
||||
|
||||
@@ -125,29 +125,57 @@ export const responseCodeProperty: INodeProperties = {
|
||||
description: 'The HTTP Response code to return',
|
||||
};
|
||||
|
||||
const responseModeOptions = [
|
||||
{
|
||||
name: 'Immediately',
|
||||
value: 'onReceived',
|
||||
description: 'As soon as this node executes',
|
||||
},
|
||||
{
|
||||
name: 'When Last Node Finishes',
|
||||
value: 'lastNode',
|
||||
description: 'Returns data of the last-executed node',
|
||||
},
|
||||
{
|
||||
name: "Using 'Respond to Webhook' Node",
|
||||
value: 'responseNode',
|
||||
description: 'Response defined in that node',
|
||||
},
|
||||
];
|
||||
|
||||
export const responseModeProperty: INodeProperties = {
|
||||
displayName: 'Respond',
|
||||
name: 'responseMode',
|
||||
type: 'options',
|
||||
options: responseModeOptions,
|
||||
default: 'onReceived',
|
||||
description: 'When and how to respond to the webhook',
|
||||
displayOptions: {
|
||||
show: {
|
||||
'@version': [1, 1.1, 2],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const responseModePropertyStreaming: INodeProperties = {
|
||||
displayName: 'Respond',
|
||||
name: 'responseMode',
|
||||
type: 'options',
|
||||
options: [
|
||||
...responseModeOptions,
|
||||
{
|
||||
name: 'Immediately',
|
||||
value: 'onReceived',
|
||||
description: 'As soon as this node executes',
|
||||
},
|
||||
{
|
||||
name: 'When Last Node Finishes',
|
||||
value: 'lastNode',
|
||||
description: 'Returns data of the last-executed node',
|
||||
},
|
||||
{
|
||||
name: "Using 'Respond to Webhook' Node",
|
||||
value: 'responseNode',
|
||||
description: 'Response defined in that node',
|
||||
name: 'Streaming Response',
|
||||
value: 'streaming',
|
||||
description: 'Returns data in real time from streaming enabled nodes',
|
||||
},
|
||||
],
|
||||
default: 'onReceived',
|
||||
description: 'When and how to respond to the webhook',
|
||||
displayOptions: {
|
||||
hide: {
|
||||
'@version': [1, 1.1, 2],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
export const responseDataProperty: INodeProperties = {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { NodeTestHarness } from '@nodes-testing/node-test-harness';
|
||||
import type { Request } from 'express';
|
||||
import type { Request, Response } from 'express';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { IWebhookFunctions } from 'n8n-workflow';
|
||||
|
||||
@@ -40,4 +40,93 @@ describe('Test Webhook Node', () => {
|
||||
expect(context.nodeHelpers.copyBinaryFile).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('streaming response mode', () => {
|
||||
const node = new Webhook();
|
||||
const context = mock<IWebhookFunctions>({
|
||||
nodeHelpers: mock(),
|
||||
});
|
||||
const req = mock<Request>();
|
||||
const res = mock<Response>();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks();
|
||||
context.getRequestObject.mockReturnValue(req);
|
||||
context.getResponseObject.mockReturnValue(res);
|
||||
context.getChildNodes.mockReturnValue([]);
|
||||
context.getNode.mockReturnValue({
|
||||
type: 'n8n-nodes-base.webhook',
|
||||
typeVersion: 2,
|
||||
name: 'Webhook',
|
||||
} as any);
|
||||
context.getNodeParameter.mockImplementation((paramName: string) => {
|
||||
if (paramName === 'options') return {};
|
||||
if (paramName === 'responseMode') return 'streaming';
|
||||
return undefined;
|
||||
});
|
||||
req.headers = {};
|
||||
req.params = {};
|
||||
req.query = {};
|
||||
req.body = { message: 'test' };
|
||||
Object.defineProperty(req, 'ips', { value: [], configurable: true });
|
||||
Object.defineProperty(req, 'ip', { value: '127.0.0.1', configurable: true });
|
||||
res.writeHead.mockImplementation(() => res);
|
||||
res.flushHeaders.mockImplementation(() => undefined);
|
||||
});
|
||||
|
||||
it('should enable streaming when responseMode is "streaming"', async () => {
|
||||
const result = await node.webhook(context);
|
||||
|
||||
// Verify streaming headers are set
|
||||
expect(res.writeHead).toHaveBeenCalledWith(200, {
|
||||
'Content-Type': 'application/json; charset=utf-8',
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
});
|
||||
expect(res.flushHeaders).toHaveBeenCalled();
|
||||
|
||||
// Verify response structure for streaming
|
||||
expect(result).toEqual({
|
||||
noWebhookResponse: true,
|
||||
workflowData: expect.any(Array),
|
||||
});
|
||||
});
|
||||
|
||||
it('should not enable streaming when responseMode is not "streaming"', async () => {
|
||||
context.getNodeParameter.mockImplementation((paramName: string) => {
|
||||
if (paramName === 'options') return {};
|
||||
if (paramName === 'responseMode') return 'onReceived';
|
||||
return undefined;
|
||||
});
|
||||
|
||||
const result = await node.webhook(context);
|
||||
|
||||
// Verify streaming headers are NOT set
|
||||
expect(res.writeHead).not.toHaveBeenCalled();
|
||||
expect(res.flushHeaders).not.toHaveBeenCalled();
|
||||
|
||||
// Verify normal response structure
|
||||
expect(result).toEqual({
|
||||
webhookResponse: undefined,
|
||||
workflowData: expect.any(Array),
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle multipart form data with streaming enabled', async () => {
|
||||
req.contentType = 'multipart/form-data';
|
||||
req.body = {
|
||||
data: { message: 'Hello' },
|
||||
files: {},
|
||||
};
|
||||
|
||||
const result = await node.webhook(context);
|
||||
|
||||
// For multipart form data, streaming is handled in handleFormData method
|
||||
// The current implementation returns normal workflowData for form data
|
||||
expect(result).toEqual({
|
||||
workflowData: expect.any(Array),
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user