mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
refactor: Simplify webhook helpers (#17237)
Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,346 @@
|
||||
import { Container } from '@n8n/di';
|
||||
import { mock, type MockProxy } from 'jest-mock-extended';
|
||||
import { BinaryDataService } from 'n8n-core';
|
||||
import type { ITaskData, INodeExecutionData, IBinaryData } from 'n8n-workflow';
|
||||
import { BINARY_ENCODING, OperationalError } from 'n8n-workflow';
|
||||
import assert from 'node:assert';
|
||||
import { Readable } from 'node:stream';
|
||||
|
||||
import { extractWebhookLastNodeResponse } from '../webhook-last-node-response-extractor';
|
||||
|
||||
import type { WebhookExecutionContext } from '@/webhooks/webhook-execution-context';
|
||||
|
||||
describe('extractWebhookLastNodeResponse', () => {
|
||||
let context: MockProxy<WebhookExecutionContext>;
|
||||
let lastNodeTaskData: MockProxy<ITaskData>;
|
||||
let binaryDataService: MockProxy<BinaryDataService>;
|
||||
|
||||
beforeEach(() => {
|
||||
context = mock<WebhookExecutionContext>();
|
||||
lastNodeTaskData = mock<ITaskData>();
|
||||
binaryDataService = mock<BinaryDataService>();
|
||||
|
||||
Container.set(BinaryDataService, binaryDataService);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
describe('responseDataType: firstEntryJson', () => {
|
||||
it('should return first entry JSON data', async () => {
|
||||
const jsonData = { foo: 'bar', test: 123 };
|
||||
lastNodeTaskData.data = {
|
||||
main: [[{ json: jsonData }]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: jsonData,
|
||||
contentType: undefined,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should return error when no item to return', async () => {
|
||||
lastNodeTaskData.data = {
|
||||
main: [[]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
expect(result.error).toBeInstanceOf(OperationalError);
|
||||
});
|
||||
|
||||
it('should extract specific property when responsePropertyName is set', async () => {
|
||||
const jsonData = { foo: 'bar', nested: { value: 'test' } };
|
||||
lastNodeTaskData.data = {
|
||||
main: [[{ json: jsonData }]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression
|
||||
.mockReturnValueOnce('nested.value')
|
||||
.mockReturnValueOnce(undefined);
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: 'test',
|
||||
contentType: undefined,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should set content type when responseContentType is provided', async () => {
|
||||
const jsonData = { foo: 'bar' };
|
||||
lastNodeTaskData.data = {
|
||||
main: [[{ json: jsonData }]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression
|
||||
.mockReturnValueOnce(undefined)
|
||||
.mockReturnValueOnce('application/xml');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryJson',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: jsonData,
|
||||
contentType: 'application/xml',
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('responseDataType: firstEntryBinary', () => {
|
||||
it('should return binary data as buffer when no ID is present', async () => {
|
||||
const binaryData: IBinaryData = {
|
||||
data: Buffer.from('test binary data').toString(BINARY_ENCODING),
|
||||
mimeType: 'text/plain',
|
||||
};
|
||||
const nodeExecutionData: INodeExecutionData = {
|
||||
json: {},
|
||||
binary: { data: binaryData },
|
||||
};
|
||||
lastNodeTaskData.data = {
|
||||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('data');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: Buffer.from('test binary data'),
|
||||
contentType: 'text/plain',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should return binary data as stream when ID is present', async () => {
|
||||
const mockStream = new Readable();
|
||||
binaryDataService.getAsStream.mockResolvedValue(mockStream);
|
||||
|
||||
const binaryData: IBinaryData = {
|
||||
id: 'binary-123',
|
||||
mimeType: 'image/jpeg',
|
||||
data: '',
|
||||
};
|
||||
const nodeExecutionData: INodeExecutionData = {
|
||||
json: {},
|
||||
binary: { data: binaryData },
|
||||
};
|
||||
lastNodeTaskData.data = {
|
||||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('data');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'stream',
|
||||
stream: mockStream,
|
||||
contentType: 'image/jpeg',
|
||||
},
|
||||
});
|
||||
assert(binaryDataService.getAsStream.mock.calls[0][0] === 'binary-123');
|
||||
});
|
||||
|
||||
it('should return error when no item found', async () => {
|
||||
lastNodeTaskData.data = {
|
||||
main: [[]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
expect(result.error).toBeInstanceOf(OperationalError);
|
||||
expect(result.error.message).toBe('No item was found to return');
|
||||
});
|
||||
|
||||
it('should return error when no binary data found', async () => {
|
||||
const nodeExecutionData: INodeExecutionData = {
|
||||
json: { foo: 'bar' },
|
||||
};
|
||||
lastNodeTaskData.data = {
|
||||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
expect(result.error).toBeInstanceOf(OperationalError);
|
||||
expect(result.error.message).toBe('No binary data was found to return');
|
||||
});
|
||||
|
||||
it('should return error when responseBinaryPropertyName is undefined', async () => {
|
||||
const nodeExecutionData: INodeExecutionData = {
|
||||
json: {},
|
||||
binary: { data: { data: 'test', mimeType: 'text/plain' } },
|
||||
};
|
||||
lastNodeTaskData.data = {
|
||||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue(undefined);
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
expect(result.error).toBeInstanceOf(OperationalError);
|
||||
expect(result.error.message).toBe("No 'responseBinaryPropertyName' is set");
|
||||
});
|
||||
|
||||
it('should return error when responseBinaryPropertyName is not a string', async () => {
|
||||
const nodeExecutionData: INodeExecutionData = {
|
||||
json: {},
|
||||
binary: { data: { data: 'test', mimeType: 'text/plain' } },
|
||||
};
|
||||
lastNodeTaskData.data = {
|
||||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue(123);
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
expect(result.error).toBeInstanceOf(OperationalError);
|
||||
expect(result.error.message).toBe("'responseBinaryPropertyName' is not a string");
|
||||
});
|
||||
|
||||
it('should return error when specified binary property does not exist', async () => {
|
||||
const nodeExecutionData: INodeExecutionData = {
|
||||
json: {},
|
||||
binary: { otherProperty: { data: 'test', mimeType: 'text/plain' } },
|
||||
};
|
||||
lastNodeTaskData.data = {
|
||||
main: [[nodeExecutionData]],
|
||||
};
|
||||
|
||||
context.evaluateSimpleWebhookDescriptionExpression.mockReturnValue('nonExistentProperty');
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
'firstEntryBinary',
|
||||
lastNodeTaskData,
|
||||
);
|
||||
|
||||
assert(!result.ok);
|
||||
expect(result.error).toBeInstanceOf(OperationalError);
|
||||
expect(result.error.message).toBe(
|
||||
"The binary property 'nonExistentProperty' which should be returned does not exist",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('responseDataType: noData', () => {
|
||||
it('should return undefined body and contentType', async () => {
|
||||
const result = await extractWebhookLastNodeResponse(context, 'noData', lastNodeTaskData);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: undefined,
|
||||
contentType: undefined,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('responseDataType: default (allEntries)', () => {
|
||||
it('should return all entries as JSON array', async () => {
|
||||
const jsonData1 = { foo: 'bar' };
|
||||
const jsonData2 = { test: 123 };
|
||||
const jsonData3 = { nested: { value: 'test' } };
|
||||
lastNodeTaskData.data = {
|
||||
main: [[{ json: jsonData1 }, { json: jsonData2 }, { json: jsonData3 }]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: [jsonData1, jsonData2, jsonData3],
|
||||
contentType: undefined,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('should return empty array when no entries', async () => {
|
||||
lastNodeTaskData.data = {
|
||||
main: [[]],
|
||||
};
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(context, 'allEntries', lastNodeTaskData);
|
||||
|
||||
expect(result).toEqual({
|
||||
ok: true,
|
||||
result: {
|
||||
type: 'static',
|
||||
body: [],
|
||||
contentType: undefined,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
60
packages/cli/src/webhooks/webhook-execution-context.ts
Normal file
60
packages/cli/src/webhooks/webhook-execution-context.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import type {
|
||||
IWebhookData,
|
||||
INode,
|
||||
IWorkflowDataProxyAdditionalKeys,
|
||||
Workflow,
|
||||
WorkflowExecuteMode,
|
||||
IExecuteData,
|
||||
IWebhookDescription,
|
||||
NodeParameterValueType,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
/**
|
||||
* A helper class that holds the context for the webhook execution.
|
||||
* Provides quality of life methods for evaluating expressions.
|
||||
*/
|
||||
export class WebhookExecutionContext {
|
||||
constructor(
|
||||
readonly workflow: Workflow,
|
||||
readonly workflowStartNode: INode,
|
||||
readonly webhookData: IWebhookData,
|
||||
readonly executionMode: WorkflowExecuteMode,
|
||||
readonly additionalKeys: IWorkflowDataProxyAdditionalKeys,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Evaluates a simple expression from the webhook description.
|
||||
*/
|
||||
evaluateSimpleWebhookDescriptionExpression<T extends boolean | number | string | unknown[]>(
|
||||
propertyName: keyof IWebhookDescription,
|
||||
executeData?: IExecuteData,
|
||||
defaultValue?: T,
|
||||
): T | undefined {
|
||||
return this.workflow.expression.getSimpleParameterValue(
|
||||
this.workflowStartNode,
|
||||
this.webhookData.webhookDescription[propertyName],
|
||||
this.executionMode,
|
||||
this.additionalKeys,
|
||||
executeData,
|
||||
defaultValue,
|
||||
) as T | undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluates a complex expression from the webhook description.
|
||||
*/
|
||||
evaluateComplexWebhookDescriptionExpression<T extends NodeParameterValueType>(
|
||||
propertyName: keyof IWebhookDescription,
|
||||
executeData?: IExecuteData,
|
||||
defaultValue?: T,
|
||||
): T | undefined {
|
||||
return this.workflow.expression.getComplexParameterValue(
|
||||
this.workflowStartNode,
|
||||
this.webhookData.webhookDescription[propertyName],
|
||||
this.executionMode,
|
||||
this.additionalKeys,
|
||||
executeData,
|
||||
defaultValue,
|
||||
) as T | undefined;
|
||||
}
|
||||
}
|
||||
@@ -1,21 +1,18 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-argument */
|
||||
/* eslint-disable @typescript-eslint/prefer-optional-chain */
|
||||
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
|
||||
/* eslint-disable id-denylist */
|
||||
/* eslint-disable prefer-spread */
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
/* eslint-disable @typescript-eslint/restrict-template-expressions */
|
||||
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import type { Project } from '@n8n/db';
|
||||
import { Container } from '@n8n/di';
|
||||
import type express from 'express';
|
||||
import get from 'lodash/get';
|
||||
import { BinaryDataService, ErrorReporter } from 'n8n-core';
|
||||
import type {
|
||||
IBinaryData,
|
||||
IBinaryKeyData,
|
||||
IDataObject,
|
||||
IDeferredPromise,
|
||||
IExecuteData,
|
||||
@@ -35,7 +32,6 @@ import type {
|
||||
WebhookResponseData,
|
||||
} from 'n8n-workflow';
|
||||
import {
|
||||
BINARY_ENCODING,
|
||||
createDeferredPromise,
|
||||
ExecutionCancelledError,
|
||||
FORM_NODE_TYPE,
|
||||
@@ -47,6 +43,14 @@ import {
|
||||
} from 'n8n-workflow';
|
||||
import { finished } from 'stream/promises';
|
||||
|
||||
import { WebhookService } from './webhook.service';
|
||||
import type {
|
||||
IWebhookResponseCallbackData,
|
||||
WebhookRequest,
|
||||
WebhookNodeResponseHeaders,
|
||||
WebhookResponseHeaders,
|
||||
} from './webhook.types';
|
||||
|
||||
import { ActiveExecutions } from '@/active-executions';
|
||||
import { MCP_TRIGGER_NODE_TYPE } from '@/constants';
|
||||
import { InternalServerError } from '@/errors/response-errors/internal-server.error';
|
||||
@@ -56,14 +60,15 @@ import { parseBody } from '@/middlewares';
|
||||
import { OwnershipService } from '@/services/ownership.service';
|
||||
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
|
||||
import { WaitTracker } from '@/wait-tracker';
|
||||
import { WebhookExecutionContext } from '@/webhooks/webhook-execution-context';
|
||||
import { createMultiFormDataParser } from '@/webhooks/webhook-form-data';
|
||||
import { extractWebhookLastNodeResponse } from '@/webhooks/webhook-last-node-response-extractor';
|
||||
import type { WebhookResponse } from '@/webhooks/webhook-response';
|
||||
import { createStaticResponse, createStreamResponse } from '@/webhooks/webhook-response';
|
||||
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
|
||||
import * as WorkflowHelpers from '@/workflow-helpers';
|
||||
import { WorkflowRunner } from '@/workflow-runner';
|
||||
|
||||
import { WebhookService } from './webhook.service';
|
||||
import type { IWebhookResponseCallbackData, WebhookRequest } from './webhook.types';
|
||||
|
||||
/**
|
||||
* Returns all the webhooks which should be created for the given workflow
|
||||
*/
|
||||
@@ -344,7 +349,10 @@ export async function executeWebhook(
|
||||
executionId: string | undefined,
|
||||
req: WebhookRequest,
|
||||
res: express.Response,
|
||||
responseCallback: (error: Error | null, data: IWebhookResponseCallbackData) => void,
|
||||
responseCallback: (
|
||||
error: Error | null,
|
||||
data: IWebhookResponseCallbackData | WebhookResponse,
|
||||
) => void,
|
||||
destinationNode?: string,
|
||||
): Promise<string | undefined> {
|
||||
// Get the nodeType to know which responseMode is set
|
||||
@@ -357,6 +365,14 @@ export async function executeWebhook(
|
||||
$executionId: executionId,
|
||||
};
|
||||
|
||||
const context = new WebhookExecutionContext(
|
||||
workflow,
|
||||
workflowStartNode,
|
||||
webhookData,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
);
|
||||
|
||||
let project: Project | undefined = undefined;
|
||||
try {
|
||||
project = await Container.get(OwnershipService).getWorkflowProjectCached(workflowData.id);
|
||||
@@ -486,30 +502,12 @@ export async function executeWebhook(
|
||||
};
|
||||
}
|
||||
|
||||
if (webhookData.webhookDescription.responseHeaders !== undefined) {
|
||||
const responseHeaders = workflow.expression.getComplexParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseHeaders,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
) as {
|
||||
entries?:
|
||||
| Array<{
|
||||
name: string;
|
||||
value: string;
|
||||
}>
|
||||
| undefined;
|
||||
};
|
||||
const responseHeaders = evaluateResponseHeaders(context);
|
||||
|
||||
if (!res.headersSent) {
|
||||
// Only set given headers if they haven't been sent yet, e.g. for streaming
|
||||
if (responseHeaders?.entries !== undefined) {
|
||||
for (const item of responseHeaders.entries) {
|
||||
res.setHeader(item.name, item.value);
|
||||
}
|
||||
}
|
||||
if (!res.headersSent && responseHeaders) {
|
||||
// Only set given headers if they haven't been sent yet, e.g. for streaming
|
||||
for (const [name, value] of responseHeaders.entries()) {
|
||||
res.setHeader(name, value);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -643,9 +641,8 @@ export async function executeWebhook(
|
||||
|
||||
if (!didSendResponse) {
|
||||
executePromise
|
||||
// eslint-disable-next-line complexity
|
||||
.then(async (data) => {
|
||||
if (data === undefined) {
|
||||
.then(async (runData) => {
|
||||
if (runData === undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
@@ -659,11 +656,11 @@ export async function executeWebhook(
|
||||
}
|
||||
|
||||
if (pinData) {
|
||||
data.data.resultData.pinData = pinData;
|
||||
runData.data.resultData.pinData = pinData;
|
||||
}
|
||||
|
||||
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
|
||||
if (data.data.resultData.error || returnData?.error !== undefined) {
|
||||
const lastNodeTaskData = WorkflowHelpers.getDataLastExecutedNodeData(runData);
|
||||
if (runData.data.resultData.error || lastNodeTaskData?.error !== undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
@@ -673,7 +670,7 @@ export async function executeWebhook(
|
||||
});
|
||||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
return runData;
|
||||
}
|
||||
|
||||
// in `responseNode` mode `responseCallback` is called by `responsePromise`
|
||||
@@ -682,7 +679,7 @@ export async function executeWebhook(
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (returnData === undefined) {
|
||||
if (lastNodeTaskData === undefined) {
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data: {
|
||||
@@ -693,148 +690,38 @@ export async function executeWebhook(
|
||||
});
|
||||
}
|
||||
didSendResponse = true;
|
||||
return data;
|
||||
return runData;
|
||||
}
|
||||
|
||||
if (!didSendResponse) {
|
||||
let data: IDataObject | IDataObject[] | undefined;
|
||||
|
||||
if (responseData === 'firstEntryJson') {
|
||||
// Return the JSON data of the first entry
|
||||
|
||||
if (returnData.data!.main[0]![0] === undefined) {
|
||||
responseCallback(new OperationalError('No item to return got found'), {});
|
||||
didSendResponse = true;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
data = returnData.data!.main[0]![0].json;
|
||||
|
||||
const responsePropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responsePropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (responsePropertyName !== undefined) {
|
||||
data = get(data, responsePropertyName as string) as IDataObject;
|
||||
}
|
||||
|
||||
const responseContentType = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseContentType,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
|
||||
if (responseContentType !== undefined) {
|
||||
// Send the webhook response manually to be able to set the content-type
|
||||
res.setHeader('Content-Type', responseContentType as string);
|
||||
|
||||
// Returning an object, boolean, number, ... causes problems so make sure to stringify if needed
|
||||
if (
|
||||
data !== null &&
|
||||
data !== undefined &&
|
||||
['Buffer', 'String'].includes(data.constructor.name)
|
||||
) {
|
||||
res.end(data);
|
||||
} else {
|
||||
res.end(JSON.stringify(data));
|
||||
}
|
||||
|
||||
responseCallback(null, {
|
||||
noWebhookResponse: true,
|
||||
});
|
||||
didSendResponse = true;
|
||||
}
|
||||
} else if (responseData === 'firstEntryBinary') {
|
||||
// Return the binary data of the first entry
|
||||
data = returnData.data!.main[0]![0];
|
||||
|
||||
if (data === undefined) {
|
||||
responseCallback(new OperationalError('No item was found to return'), {});
|
||||
didSendResponse = true;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (data.binary === undefined) {
|
||||
responseCallback(new OperationalError('No binary data was found to return'), {});
|
||||
didSendResponse = true;
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const responseBinaryPropertyName = workflow.expression.getSimpleParameterValue(
|
||||
workflowStartNode,
|
||||
webhookData.webhookDescription.responseBinaryPropertyName,
|
||||
executionMode,
|
||||
additionalKeys,
|
||||
undefined,
|
||||
'data',
|
||||
);
|
||||
|
||||
if (responseBinaryPropertyName === undefined && !didSendResponse) {
|
||||
responseCallback(
|
||||
new OperationalError("No 'responseBinaryPropertyName' is set"),
|
||||
{},
|
||||
);
|
||||
didSendResponse = true;
|
||||
}
|
||||
|
||||
const binaryData = (data.binary as IBinaryKeyData)[
|
||||
responseBinaryPropertyName as string
|
||||
];
|
||||
if (binaryData === undefined && !didSendResponse) {
|
||||
responseCallback(
|
||||
new OperationalError(
|
||||
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
|
||||
),
|
||||
{},
|
||||
);
|
||||
didSendResponse = true;
|
||||
}
|
||||
|
||||
if (!didSendResponse) {
|
||||
// Send the webhook response manually
|
||||
res.setHeader('Content-Type', binaryData.mimeType);
|
||||
if (binaryData.id) {
|
||||
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
||||
stream.pipe(res, { end: false });
|
||||
await finished(stream);
|
||||
} else {
|
||||
res.write(Buffer.from(binaryData.data, BINARY_ENCODING));
|
||||
}
|
||||
|
||||
responseCallback(null, {
|
||||
noWebhookResponse: true,
|
||||
});
|
||||
process.nextTick(() => res.end());
|
||||
}
|
||||
} else if (responseData === 'noData') {
|
||||
// Return without data
|
||||
data = undefined;
|
||||
} else {
|
||||
// Return the JSON data of all the entries
|
||||
data = [];
|
||||
for (const entry of returnData.data!.main[0]!) {
|
||||
data.push(entry.json);
|
||||
}
|
||||
}
|
||||
|
||||
if (!didSendResponse) {
|
||||
responseCallback(null, {
|
||||
data,
|
||||
responseCode,
|
||||
});
|
||||
}
|
||||
if (didSendResponse) {
|
||||
return runData;
|
||||
}
|
||||
|
||||
const result = await extractWebhookLastNodeResponse(
|
||||
context,
|
||||
responseData as WebhookResponseData,
|
||||
lastNodeTaskData,
|
||||
);
|
||||
if (!result.ok) {
|
||||
responseCallback(result.error, {});
|
||||
didSendResponse = true;
|
||||
return runData;
|
||||
}
|
||||
|
||||
const response = result.result;
|
||||
// Apply potential content-type override
|
||||
if (response.contentType) {
|
||||
responseHeaders.set('content-type', response.contentType);
|
||||
}
|
||||
|
||||
responseCallback(
|
||||
null,
|
||||
response.type === 'static'
|
||||
? createStaticResponse(response.body, responseCode, responseHeaders)
|
||||
: createStreamResponse(response.stream, responseCode, responseHeaders),
|
||||
);
|
||||
didSendResponse = true;
|
||||
|
||||
return data;
|
||||
return runData;
|
||||
})
|
||||
.catch((e) => {
|
||||
if (!didSendResponse) {
|
||||
@@ -963,3 +850,28 @@ async function parseRequestBody(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluates the `responseHeaders` parameter of a webhook node
|
||||
*/
|
||||
function evaluateResponseHeaders(context: WebhookExecutionContext): WebhookResponseHeaders {
|
||||
const headers = new Map<string, string>();
|
||||
|
||||
if (context.webhookData.webhookDescription.responseHeaders === undefined) {
|
||||
return headers;
|
||||
}
|
||||
|
||||
const evaluatedHeaders =
|
||||
context.evaluateComplexWebhookDescriptionExpression<WebhookNodeResponseHeaders>(
|
||||
'responseHeaders',
|
||||
);
|
||||
if (evaluatedHeaders?.entries === undefined) {
|
||||
return headers;
|
||||
}
|
||||
|
||||
for (const entry of evaluatedHeaders.entries) {
|
||||
headers.set(entry.name.toLowerCase(), entry.value);
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
import { Container } from '@n8n/di';
|
||||
import get from 'lodash/get';
|
||||
import { BinaryDataService } from 'n8n-core';
|
||||
import type { INodeExecutionData, ITaskData, Result, WebhookResponseData } from 'n8n-workflow';
|
||||
import { BINARY_ENCODING, createResultError, createResultOk, OperationalError } from 'n8n-workflow';
|
||||
import type { Readable } from 'node:stream';
|
||||
|
||||
import type { WebhookExecutionContext } from '@/webhooks/webhook-execution-context';
|
||||
|
||||
/** Response that is not a stream */
|
||||
type StaticResponse = {
|
||||
type: 'static';
|
||||
body: unknown;
|
||||
contentType: string | undefined;
|
||||
};
|
||||
|
||||
type StreamResponse = {
|
||||
type: 'stream';
|
||||
stream: Readable;
|
||||
contentType: string | undefined;
|
||||
};
|
||||
|
||||
/**
|
||||
+ * Extracts the response for a webhook when the response mode is set to
|
||||
* `lastNode`.
|
||||
*/
|
||||
export async function extractWebhookLastNodeResponse(
|
||||
context: WebhookExecutionContext,
|
||||
responseDataType: WebhookResponseData | undefined,
|
||||
lastNodeTaskData: ITaskData,
|
||||
): Promise<Result<StaticResponse | StreamResponse, OperationalError>> {
|
||||
if (responseDataType === 'firstEntryJson') {
|
||||
return extractFirstEntryJsonFromTaskData(context, lastNodeTaskData);
|
||||
}
|
||||
|
||||
if (responseDataType === 'firstEntryBinary') {
|
||||
return await extractFirstEntryBinaryFromTaskData(context, lastNodeTaskData);
|
||||
}
|
||||
|
||||
if (responseDataType === 'noData') {
|
||||
return createResultOk({
|
||||
type: 'static',
|
||||
body: undefined,
|
||||
contentType: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
// Default to all entries JSON
|
||||
return extractAllEntriesJsonFromTaskData(lastNodeTaskData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the JSON data of the first item of the last node
|
||||
*/
|
||||
function extractFirstEntryJsonFromTaskData(
|
||||
context: WebhookExecutionContext,
|
||||
lastNodeTaskData: ITaskData,
|
||||
): Result<StaticResponse, OperationalError> {
|
||||
if (lastNodeTaskData.data!.main[0]![0] === undefined) {
|
||||
return createResultError(new OperationalError('No item to return was found'));
|
||||
}
|
||||
|
||||
let lastNodeFirstJsonItem: unknown = lastNodeTaskData.data!.main[0]![0].json;
|
||||
|
||||
const responsePropertyName =
|
||||
context.evaluateSimpleWebhookDescriptionExpression<string>('responsePropertyName');
|
||||
|
||||
if (responsePropertyName !== undefined) {
|
||||
lastNodeFirstJsonItem = get(lastNodeFirstJsonItem, responsePropertyName);
|
||||
}
|
||||
|
||||
// User can set the content type of the response and also the headers.
|
||||
// The `responseContentType` only applies to `firstEntryJson` mode.
|
||||
const responseContentType =
|
||||
context.evaluateSimpleWebhookDescriptionExpression<string>('responseContentType');
|
||||
|
||||
return createResultOk({
|
||||
type: 'static',
|
||||
body: lastNodeFirstJsonItem,
|
||||
contentType: responseContentType,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the binary data of the first item of the last node
|
||||
*/
|
||||
async function extractFirstEntryBinaryFromTaskData(
|
||||
context: WebhookExecutionContext,
|
||||
lastNodeTaskData: ITaskData,
|
||||
): Promise<Result<StaticResponse | StreamResponse, OperationalError>> {
|
||||
// Return the binary data of the first entry
|
||||
const lastNodeFirstJsonItem: INodeExecutionData = lastNodeTaskData.data!.main[0]![0];
|
||||
|
||||
if (lastNodeFirstJsonItem === undefined) {
|
||||
return createResultError(new OperationalError('No item was found to return'));
|
||||
}
|
||||
|
||||
if (lastNodeFirstJsonItem.binary === undefined) {
|
||||
return createResultError(new OperationalError('No binary data was found to return'));
|
||||
}
|
||||
|
||||
const responseBinaryPropertyName = context.evaluateSimpleWebhookDescriptionExpression<string>(
|
||||
'responseBinaryPropertyName',
|
||||
undefined,
|
||||
'data',
|
||||
);
|
||||
|
||||
if (responseBinaryPropertyName === undefined) {
|
||||
return createResultError(new OperationalError("No 'responseBinaryPropertyName' is set"));
|
||||
} else if (typeof responseBinaryPropertyName !== 'string') {
|
||||
return createResultError(new OperationalError("'responseBinaryPropertyName' is not a string"));
|
||||
}
|
||||
|
||||
const binaryData = lastNodeFirstJsonItem.binary[responseBinaryPropertyName];
|
||||
if (binaryData === undefined) {
|
||||
return createResultError(
|
||||
new OperationalError(
|
||||
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
// In binary data's case, the mime type takes precedence over any manually
|
||||
// set content type header
|
||||
if (binaryData.id) {
|
||||
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
|
||||
return createResultOk({
|
||||
type: 'stream',
|
||||
stream,
|
||||
contentType: binaryData.mimeType,
|
||||
});
|
||||
} else {
|
||||
return createResultOk({
|
||||
type: 'static',
|
||||
body: Buffer.from(binaryData.data, BINARY_ENCODING),
|
||||
contentType: binaryData.mimeType,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the JSON data of all the items of the last node
|
||||
*/
|
||||
function extractAllEntriesJsonFromTaskData(
|
||||
lastNodeTaskData: ITaskData,
|
||||
): Result<StaticResponse, OperationalError> {
|
||||
const data: unknown[] = [];
|
||||
for (const entry of lastNodeTaskData.data!.main[0]!) {
|
||||
data.push(entry.json);
|
||||
}
|
||||
|
||||
return createResultOk({
|
||||
type: 'static',
|
||||
body: data,
|
||||
// No content-type override in this case. User can set the content-type
|
||||
// header if they wish. We default to application/json later on when the
|
||||
// response is sent.
|
||||
contentType: undefined,
|
||||
});
|
||||
}
|
||||
@@ -2,17 +2,30 @@ import { Logger } from '@n8n/backend-common';
|
||||
import { Container } from '@n8n/di';
|
||||
import type express from 'express';
|
||||
import { ensureError, type IHttpRequestMethods } from 'n8n-workflow';
|
||||
import { finished } from 'stream/promises';
|
||||
|
||||
import { WebhookService } from './webhook.service';
|
||||
|
||||
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
|
||||
import * as ResponseHelper from '@/response-helper';
|
||||
import type {
|
||||
WebhookStaticResponse,
|
||||
WebhookResponse,
|
||||
WebhookResponseStream,
|
||||
} from '@/webhooks/webhook-response';
|
||||
import {
|
||||
isWebhookNoResponse,
|
||||
isWebhookStaticResponse,
|
||||
isWebhookResponse,
|
||||
isWebhookStreamResponse,
|
||||
} from '@/webhooks/webhook-response';
|
||||
import type {
|
||||
IWebhookManager,
|
||||
WebhookOptionsRequest,
|
||||
WebhookRequest,
|
||||
WebhookResponseHeaders,
|
||||
} from '@/webhooks/webhook.types';
|
||||
|
||||
import { WebhookService } from './webhook.service';
|
||||
|
||||
const WEBHOOK_METHODS: IHttpRequestMethods[] = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT'];
|
||||
|
||||
class WebhookRequestHandler {
|
||||
@@ -47,15 +60,23 @@ class WebhookRequestHandler {
|
||||
try {
|
||||
const response = await this.webhookManager.executeWebhook(req, res);
|
||||
|
||||
// Don't respond, if already responded
|
||||
if (response.noWebhookResponse !== true) {
|
||||
ResponseHelper.sendSuccessResponse(
|
||||
res,
|
||||
response.data,
|
||||
true,
|
||||
response.responseCode,
|
||||
response.headers,
|
||||
);
|
||||
// Modern way of responding to webhooks
|
||||
if (isWebhookResponse(response)) {
|
||||
await this.sendWebhookResponse(res, response);
|
||||
} else {
|
||||
// Legacy way of responding to webhooks. `WebhookResponse` should be used to
|
||||
// pass the response from the webhookManager. However, we still have code
|
||||
// that doesn't use that yet. We need to keep this here until all codepaths
|
||||
// return a `WebhookResponse` instead.
|
||||
if (response.noWebhookResponse !== true) {
|
||||
ResponseHelper.sendSuccessResponse(
|
||||
res,
|
||||
response.data,
|
||||
true,
|
||||
response.responseCode,
|
||||
response.headers,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
const error = ensureError(e);
|
||||
@@ -78,6 +99,60 @@ class WebhookRequestHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private async sendWebhookResponse(res: express.Response, webhookResponse: WebhookResponse) {
|
||||
if (isWebhookNoResponse(webhookResponse)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isWebhookStaticResponse(webhookResponse)) {
|
||||
this.sendStaticResponse(res, webhookResponse);
|
||||
return;
|
||||
}
|
||||
|
||||
if (isWebhookStreamResponse(webhookResponse)) {
|
||||
await this.sendStreamResponse(res, webhookResponse);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private async sendStreamResponse(res: express.Response, webhookResponse: WebhookResponseStream) {
|
||||
const { stream, code, headers } = webhookResponse;
|
||||
|
||||
this.setResponseStatus(res, code);
|
||||
this.setResponseHeaders(res, headers);
|
||||
|
||||
stream.pipe(res, { end: false });
|
||||
await finished(stream);
|
||||
process.nextTick(() => res.end());
|
||||
}
|
||||
|
||||
private sendStaticResponse(res: express.Response, webhookResponse: WebhookStaticResponse) {
|
||||
const { body, code, headers } = webhookResponse;
|
||||
|
||||
this.setResponseStatus(res, code);
|
||||
this.setResponseHeaders(res, headers);
|
||||
|
||||
if (typeof body === 'string') {
|
||||
res.send(body);
|
||||
} else {
|
||||
res.json(body);
|
||||
}
|
||||
}
|
||||
|
||||
private setResponseStatus(res: express.Response, statusCode?: number) {
|
||||
if (statusCode !== undefined) {
|
||||
res.status(statusCode);
|
||||
}
|
||||
}
|
||||
|
||||
private setResponseHeaders(res: express.Response, headers?: WebhookResponseHeaders) {
|
||||
if (headers) {
|
||||
for (const [name, value] of headers.entries()) {
|
||||
res.setHeader(name, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async setupCorsHeaders(
|
||||
req: WebhookRequest | WebhookOptionsRequest,
|
||||
res: express.Response,
|
||||
|
||||
83
packages/cli/src/webhooks/webhook-response.ts
Normal file
83
packages/cli/src/webhooks/webhook-response.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import type { Readable } from 'stream';
|
||||
|
||||
import type { WebhookResponseHeaders } from './webhook.types';
|
||||
|
||||
export const WebhookResponseTag = Symbol('WebhookResponse');
|
||||
|
||||
/**
|
||||
* Result that indicates that no response needs to be sent. This is used
|
||||
* when the node or something else has already sent a response.
|
||||
*/
|
||||
export type WebhookNoResponse = {
|
||||
[WebhookResponseTag]: 'noResponse';
|
||||
};
|
||||
|
||||
/**
|
||||
* Result that indicates that a non-stream response needs to be sent.
|
||||
*/
|
||||
export type WebhookStaticResponse = {
|
||||
[WebhookResponseTag]: 'static';
|
||||
body: unknown;
|
||||
headers: WebhookResponseHeaders | undefined;
|
||||
code: number | undefined;
|
||||
};
|
||||
|
||||
/**
|
||||
* Result that indicates that a stream response needs to be sent.
|
||||
*/
|
||||
export type WebhookResponseStream = {
|
||||
[WebhookResponseTag]: 'stream';
|
||||
stream: Readable;
|
||||
code: number | undefined;
|
||||
headers: WebhookResponseHeaders | undefined;
|
||||
};
|
||||
|
||||
export type WebhookResponse = WebhookNoResponse | WebhookStaticResponse | WebhookResponseStream;
|
||||
|
||||
export const isWebhookResponse = (response: unknown): response is WebhookResponse => {
|
||||
return typeof response === 'object' && response !== null && WebhookResponseTag in response;
|
||||
};
|
||||
|
||||
export const isWebhookNoResponse = (response: unknown): response is WebhookNoResponse => {
|
||||
return isWebhookResponse(response) && response[WebhookResponseTag] === 'noResponse';
|
||||
};
|
||||
|
||||
export const isWebhookStaticResponse = (response: unknown): response is WebhookStaticResponse => {
|
||||
return isWebhookResponse(response) && response[WebhookResponseTag] === 'static';
|
||||
};
|
||||
|
||||
export const isWebhookStreamResponse = (response: unknown): response is WebhookResponseStream => {
|
||||
return isWebhookResponse(response) && response[WebhookResponseTag] === 'stream';
|
||||
};
|
||||
|
||||
export const createNoResponse = (): WebhookNoResponse => {
|
||||
return {
|
||||
[WebhookResponseTag]: 'noResponse',
|
||||
};
|
||||
};
|
||||
|
||||
export const createStaticResponse = (
|
||||
body: unknown,
|
||||
code: number | undefined,
|
||||
headers: WebhookResponseHeaders | undefined,
|
||||
): WebhookStaticResponse => {
|
||||
return {
|
||||
[WebhookResponseTag]: 'static',
|
||||
body,
|
||||
code,
|
||||
headers,
|
||||
};
|
||||
};
|
||||
|
||||
export const createStreamResponse = (
|
||||
stream: Readable,
|
||||
code: number | undefined,
|
||||
headers: WebhookResponseHeaders | undefined,
|
||||
): WebhookResponseStream => {
|
||||
return {
|
||||
[WebhookResponseTag]: 'stream',
|
||||
stream,
|
||||
code,
|
||||
headers,
|
||||
};
|
||||
};
|
||||
@@ -37,3 +37,16 @@ export interface IWebhookResponseCallbackData {
|
||||
}
|
||||
|
||||
export type Method = NonNullable<IHttpRequestMethods>;
|
||||
|
||||
/** Response headers. Keys are always lower-cased. */
|
||||
export type WebhookResponseHeaders = Map<string, string>;
|
||||
|
||||
/**
|
||||
* The headers object that node's `responseHeaders` property can return
|
||||
*/
|
||||
export type WebhookNodeResponseHeaders = {
|
||||
entries?: Array<{
|
||||
name: string;
|
||||
value: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user