feat(Respond to Webhook Node): Implement streaming to response (no-changelog) (#17219)

This commit is contained in:
Benjamin Schroth
2025-07-11 17:17:54 +02:00
committed by GitHub
parent 2f7ed14a23
commit 6a2edf83ab
3 changed files with 285 additions and 7 deletions

View File

@@ -378,4 +378,232 @@ describe('RespondToWebhook Node', () => {
]);
});
});
describe('streaming functionality', () => {
it('should stream JSON response when streaming is enabled', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'json';
if (paramName === 'options') return { enableStreaming: true };
if (paramName === 'responseBody') return { response: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { response: true });
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0);
expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled();
});
it('should stream text response when streaming is enabled', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'text';
if (paramName === 'options') return { enableStreaming: true };
if (paramName === 'responseBody') return 'test response';
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, 'test response');
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0);
expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled();
});
it('should stream JWT response when streaming is enabled', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getCredentials.mockResolvedValue(
mock({
keyType: 'passphrase',
privateKey: 'privateKey',
secret: 'secret',
algorithm: 'HS256',
}),
);
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'jwt';
if (paramName === 'options') return { enableStreaming: true };
if (paramName === 'payload') return { test: 'payload' };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, {
token: expect.any(String),
});
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0);
expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled();
});
it('should stream first incoming item when streaming is enabled', async () => {
const inputItems = [{ json: { test: 'data' } }];
mockExecuteFunctions.getInputData.mockReturnValue(inputItems);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'firstIncomingItem';
if (paramName === 'options') return { enableStreaming: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { test: 'data' });
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0);
expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled();
});
it('should stream all incoming items when streaming is enabled', async () => {
const inputItems = [{ json: { item: 1 } }, { json: { item: 2 } }];
mockExecuteFunctions.getInputData.mockReturnValue(inputItems);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'allIncomingItems';
if (paramName === 'options') return { enableStreaming: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('begin', 0);
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 0, { item: 1 });
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('item', 1, { item: 2 });
expect(mockExecuteFunctions.sendChunk).toHaveBeenCalledWith('end', 0);
expect(mockExecuteFunctions.sendResponse).not.toHaveBeenCalled();
});
it('should not stream when enableStreaming is false', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.sendResponse.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'json';
if (paramName === 'options') return { enableStreaming: false };
if (paramName === 'responseBody') return { response: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled();
expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({
body: { response: true },
headers: {},
statusCode: 200,
});
});
it('should not stream when context is not streaming', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(false);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.sendResponse.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'json';
if (paramName === 'options') return { enableStreaming: true };
if (paramName === 'responseBody') return { response: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled();
expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({
body: { response: true },
headers: {},
statusCode: 200,
});
});
it('should not stream binary responses', async () => {
const binary = { data: 'text', mimeType: 'text/plain' };
const inputItems: INodeExecutionData[] = [{ binary: { data: binary }, json: {} }];
mockExecuteFunctions.getInputData.mockReturnValue(inputItems);
mockExecuteFunctions.helpers.assertBinaryData.mockReturnValue(binary);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.5 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.sendResponse.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'binary';
if (paramName === 'options') return { enableStreaming: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled();
expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({
body: Buffer.from('text', BINARY_ENCODING),
headers: {
'content-length': 3,
'content-type': 'text/plain',
},
statusCode: 200,
});
});
it('should use non-streaming mode for older versions', async () => {
mockExecuteFunctions.getInputData.mockReturnValue([{ json: { input: true } }]);
mockExecuteFunctions.getNode.mockReturnValue(mock<INode>({ typeVersion: 1.4 }));
mockExecuteFunctions.getParentNodes.mockReturnValue([
mock<NodeTypeAndVersion>({ type: WAIT_NODE_TYPE }),
]);
mockExecuteFunctions.isStreaming.mockReturnValue(true);
mockExecuteFunctions.sendChunk.mockImplementation(() => {});
mockExecuteFunctions.sendResponse.mockImplementation(() => {});
mockExecuteFunctions.getNodeParameter.mockImplementation((paramName) => {
if (paramName === 'respondWith') return 'json';
if (paramName === 'options') return {};
if (paramName === 'responseBody') return { response: true };
});
await respondToWebhook.execute.call(mockExecuteFunctions);
expect(mockExecuteFunctions.sendChunk).not.toHaveBeenCalled();
expect(mockExecuteFunctions.sendResponse).toHaveBeenCalledWith({
body: { response: true },
headers: {},
statusCode: 200,
});
});
});
});