diff --git a/packages/nodes-base/nodes/RespondToWebhook/RespondToWebhook.node.ts b/packages/nodes-base/nodes/RespondToWebhook/RespondToWebhook.node.ts index 5618478d65..0b82eced72 100644 --- a/packages/nodes-base/nodes/RespondToWebhook/RespondToWebhook.node.ts +++ b/packages/nodes-base/nodes/RespondToWebhook/RespondToWebhook.node.ts @@ -82,7 +82,9 @@ export class RespondToWebhook implements INodeType { icon: { light: 'file:webhook.svg', dark: 'file:webhook.dark.svg' }, name: 'respondToWebhook', group: ['transform'], - version: [1, 1.1, 1.2, 1.3, 1.4], + version: [1, 1.1, 1.2, 1.3, 1.4, 1.5], + // Keep the default version at 1.4 until streaming is fully supported + defaultVersion: 1.4, description: 'Returns data for Webhook', defaults: { name: 'Respond to Webhook', @@ -314,6 +316,19 @@ export class RespondToWebhook implements INodeType { description: 'The name of the response field to put all items in', placeholder: 'e.g. data', }, + { + displayName: 'Enable Streaming', + name: 'enableStreaming', + type: 'boolean', + default: true, + description: 'Whether to enable streaming to the response', + displayOptions: { + show: { + ['/respondWith']: ['allIncomingItems', 'firstIncomingItem', 'text', 'json', 'jwt'], + '@version': [{ _cnd: { gte: 1.5 } }], + }, + }, + }, ], }, ], @@ -332,6 +347,11 @@ export class RespondToWebhook implements INodeType { let response: IN8nHttpFullResponse; + const options = this.getNodeParameter('options', 0, {}); + + const shouldStream = + nodeVersion >= 1.5 && this.isStreaming() && options.enableStreaming !== false; + try { if (nodeVersion >= 1.1) { const connectedNodes = this.getParentNodes(this.getNode().name); @@ -348,7 +368,6 @@ export class RespondToWebhook implements INodeType { } const respondWith = this.getNodeParameter('respondWith', 0) as string; - const options = this.getNodeParameter('options', 0, {}); const headers = {} as IDataObject; if (options.responseHeaders) { @@ -382,6 +401,12 @@ export class RespondToWebhook implements INodeType { } } } + + if (shouldStream) { + this.sendChunk('begin', 0); + this.sendChunk('item', 0, responseBody as IDataObject); + this.sendChunk('end', 0); + } } else if (respondWith === 'jwt') { try { const { keyType, secret, algorithm, privateKey } = await this.getCredentials<{ @@ -401,13 +426,24 @@ export class RespondToWebhook implements INodeType { const payload = this.getNodeParameter('payload', 0, {}) as IDataObject; const token = jwt.sign(payload, secretOrPrivateKey, { algorithm }); responseBody = { token }; + + if (shouldStream) { + this.sendChunk('begin', 0); + this.sendChunk('item', 0, responseBody as IDataObject); + this.sendChunk('end', 0); + } } catch (error) { throw new NodeOperationError(this.getNode(), error as Error, { message: 'Error signing JWT token', }); } } else if (respondWith === 'allIncomingItems') { - const respondItems = items.map((item) => item.json); + const respondItems = items.map((item, index) => { + this.sendChunk('begin', index); + this.sendChunk('item', index, item.json); + this.sendChunk('end', index); + return item.json; + }); responseBody = options.responseKey ? set({}, options.responseKey as string, respondItems) : respondItems; @@ -415,12 +451,24 @@ export class RespondToWebhook implements INodeType { responseBody = options.responseKey ? set({}, options.responseKey as string, items[0].json) : items[0].json; + if (shouldStream) { + this.sendChunk('begin', 0); + this.sendChunk('item', 0, items[0].json); + this.sendChunk('end', 0); + } } else if (respondWith === 'text') { // If a user doesn't set the content-type header and uses html, the html can still be rendered on the browser + const rawBody = this.getNodeParameter('responseBody', 0) as string; if (hasHtmlContentType || !headers['content-type']) { - responseBody = sandboxHtmlResponse(this.getNodeParameter('responseBody', 0) as string); + responseBody = sandboxHtmlResponse(rawBody); } else { - responseBody = this.getNodeParameter('responseBody', 0) as string; + responseBody = rawBody; + } + // Send the raw body to the stream + if (shouldStream) { + this.sendChunk('begin', 0); + this.sendChunk('item', 0, rawBody); + this.sendChunk('end', 0); } } else if (respondWith === 'binary') { const item = items[0]; @@ -474,7 +522,9 @@ export class RespondToWebhook implements INodeType { statusCode, }; - this.sendResponse(response); + if (!shouldStream || respondWith === 'binary') { + this.sendResponse(response); + } } catch (error) { if (this.continueOnFail()) { const itemData = generatePairedItemData(items.length); diff --git a/packages/nodes-base/nodes/RespondToWebhook/test/RespondToWebhook.test.ts b/packages/nodes-base/nodes/RespondToWebhook/test/RespondToWebhook.test.ts index f1380eb05d..fe532ef6da 100644 --- a/packages/nodes-base/nodes/RespondToWebhook/test/RespondToWebhook.test.ts +++ b/packages/nodes-base/nodes/RespondToWebhook/test/RespondToWebhook.test.ts @@ -378,4 +378,232 @@ describe('RespondToWebhook Node', () => { ]); }); }); + + describe('streaming functionality', () => { + it('should stream JSON response when streaming is enabled', async () => { + mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'json'; + if (paramName === 'options') return { enableStreaming: true }; + if (paramName === 'responseBody') return { response: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { response: true }); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0); + expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled(); + }); + + it('should stream text response when streaming is enabled', async () => { + mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'text'; + if (paramName === 'options') return { enableStreaming: true }; + if (paramName === 'responseBody') return 'test response'; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, 'test response'); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0); + expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled(); + }); + + it('should stream JWT response when streaming is enabled', async () => { + mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getCredentials.mockResolvedValue( + mock({ + keyType: 'passphrase', + privateKey: 'privateKey', + secret: 'secret', + algorithm: 'HS256', + }), + ); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'jwt'; + if (paramName === 'options') return { enableStreaming: true }; + if (paramName === 'payload') return { test: 'payload' }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { + token: expect.any(String), + }); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0); + expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled(); + }); + + it('should stream first incoming item when streaming is enabled', async () => { + const inputItems = [{ json: { test: 'data' } }]; + mockExecuteFunctions.getInputData.mockReturnValue(inputItems); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'firstIncomingItem'; + if (paramName === 'options') return { enableStreaming: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { test: 'data' }); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0); + expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled(); + }); + + it('should stream all incoming items when streaming is enabled', async () => { + const inputItems = [{ json: { item: 1 } }, { json: { item: 2 } }]; + mockExecuteFunctions.getInputData.mockReturnValue(inputItems); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'allIncomingItems'; + if (paramName === 'options') return { enableStreaming: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { item: 1 }); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 1, { item: 2 }); + expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0); + expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled(); + }); + + it('should not stream when enableStreaming is false', async () => { + mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.sendResponse.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'json'; + if (paramName === 'options') return { enableStreaming: false }; + if (paramName === 'responseBody') return { response: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled(); + expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({ + body: { response: true }, + headers: {}, + statusCode: 200, + }); + }); + + it('should not stream when context is not streaming', async () => { + mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(false); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.sendResponse.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'json'; + if (paramName === 'options') return { enableStreaming: true }; + if (paramName === 'responseBody') return { response: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled(); + expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({ + body: { response: true }, + headers: {}, + statusCode: 200, + }); + }); + + it('should not stream binary responses', async () => { + const binary = { data: 'text', mimeType: 'text/plain' }; + const inputItems: INodeExecutionData[] = [{ binary: { data: binary }, json: {} }]; + mockExecuteFunctions.getInputData.mockReturnValue(inputItems); + mockExecuteFunctions.helpers.assertBinaryData.mockReturnValue(binary); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.5 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.sendResponse.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'binary'; + if (paramName === 'options') return { enableStreaming: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled(); + expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({ + body: Buffer.from('text', BINARY_ENCODING), + headers: { + 'content-length': 3, + 'content-type': 'text/plain', + }, + statusCode: 200, + }); + }); + + it('should use non-streaming mode for older versions', async () => { + mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]); + mockExecuteFunctions.getNode.mockReturnValue(mock({ typeVersion: 1.4 })); + mockExecuteFunctions.getParentNodes.mockReturnValue([ + mock({ type: WAIT_NODE_TYPE }), + ]); + mockExecuteFunctions.isStreaming.mockReturnValue(true); + mockExecuteFunctions.sendChunk.mockImplementation(() => {}); + mockExecuteFunctions.sendResponse.mockImplementation(() => {}); + mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => { + if (paramName === 'respondWith') return 'json'; + if (paramName === 'options') return {}; + if (paramName === 'responseBody') return { response: true }; + }); + + await respondToWebhook.execute.call(mockExecuteFunctions); + + expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled(); + expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({ + body: { response: true }, + headers: {}, + statusCode: 200, + }); + }); + }); }); diff --git a/packages/nodes-base/nodes/Webhook/utils.ts b/packages/nodes-base/nodes/Webhook/utils.ts index bcd37d9cb5..6e68fd8a71 100644 --- a/packages/nodes-base/nodes/Webhook/utils.ts +++ b/packages/nodes-base/nodes/Webhook/utils.ts @@ -166,7 +166,7 @@ export const checkResponseModeConfiguration = (context: IWebhookFunctions) => { ); } - if (isRespondToWebhookConnected && responseMode !== 'responseNode') { + if (isRespondToWebhookConnected && !['responseNode', 'streaming'].includes(responseMode)) { throw new NodeOperationError( context.getNode(), new Error('Webhook node not correctly configured'),