refactor(core): Extract code out of WorkflowHelpers.executeWebhook, and add tests - Part 1 (no-changelog) (#14331)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-04-03 14:16:02 +02:00
committed by GitHub
parent 178628a59b
commit cc465a5593
2 changed files with 486 additions and 156 deletions

View File

@@ -1,10 +1,36 @@
import type express from 'express';
import { mock, type MockProxy } from 'jest-mock-extended';
import type { Workflow, INode, IDataObject } from 'n8n-workflow';
import { FORM_NODE_TYPE, WAIT_NODE_TYPE } from 'n8n-workflow';
import { BinaryDataService, ErrorReporter, Logger } from 'n8n-core';
import type {
Workflow,
INode,
IDataObject,
IWebhookResponseData,
IDeferredPromise,
IN8nHttpFullResponse,
IWorkflowBase,
IRunExecutionData,
IExecuteData,
} from 'n8n-workflow';
import { createDeferredPromise, FORM_NODE_TYPE, WAIT_NODE_TYPE } from 'n8n-workflow';
import type { Readable } from 'stream';
import { finished } from 'stream/promises';
import { autoDetectResponseMode, handleFormRedirectionCase } from '../webhook-helpers';
import { mockInstance } from '@test/mocking';
import {
autoDetectResponseMode,
handleFormRedirectionCase,
getResponseOnReceived,
setupResponseNodePromise,
prepareExecutionData,
} from '../webhook-helpers';
import type { IWebhookResponseCallbackData } from '../webhook.types';
jest.mock('stream/promises', () => ({
finished: jest.fn(),
}));
describe('autoDetectResponseMode', () => {
let workflow: MockProxy<Workflow>;
@@ -95,3 +121,287 @@ describe('handleFormRedirectionCase', () => {
expect(result).toEqual(data);
});
});
describe('getResponseOnReceived', () => {
const responseCode = 200;
const webhookResultData = mock<IWebhookResponseData>();
beforeEach(() => {
jest.resetAllMocks();
});
test('should return response with no data when responseData is "noData"', () => {
const callbackData = getResponseOnReceived('noData', webhookResultData, responseCode);
expect(callbackData).toEqual({ responseCode });
});
test('should return response with responseData when it is defined', () => {
const responseData = JSON.stringify({ foo: 'bar' });
const callbackData = getResponseOnReceived(responseData, webhookResultData, responseCode);
expect(callbackData).toEqual({ data: responseData, responseCode });
});
test('should return response with webhookResponse when responseData is falsy but webhookResponse exists', () => {
const webhookResponse = { success: true };
webhookResultData.webhookResponse = webhookResponse;
const callbackData = getResponseOnReceived(undefined, webhookResultData, responseCode);
expect(callbackData).toEqual({ data: webhookResponse, responseCode });
});
test('should return default response message when responseData and webhookResponse are falsy', () => {
webhookResultData.webhookResponse = undefined;
const callbackData = getResponseOnReceived(undefined, webhookResultData, responseCode);
expect(callbackData).toEqual({
data: { message: 'Workflow was started' },
responseCode,
});
});
});
describe('setupResponseNodePromise', () => {
const workflowId = 'test-workflow-id';
const executionId = 'test-execution-id';
const res = mock<express.Response>();
const responseCallback = jest.fn();
const workflowStartNode = mock<INode>();
const workflow = mock<Workflow>({ id: workflowId });
const binaryDataService = mockInstance(BinaryDataService);
const errorReporter = mockInstance(ErrorReporter);
const logger = mockInstance(Logger);
let responsePromise: IDeferredPromise<IN8nHttpFullResponse>;
beforeEach(() => {
jest.resetAllMocks();
responsePromise = createDeferredPromise<IN8nHttpFullResponse>();
res.header.mockReturnValue(res);
res.end.mockReturnValue(res);
});
test('should handle regular response object', async () => {
setupResponseNodePromise(
responsePromise,
res,
responseCallback,
workflowStartNode,
executionId,
workflow,
);
responsePromise.resolve({
body: { data: 'test data' },
headers: { 'content-type': 'application/json' },
statusCode: 200,
});
await new Promise(process.nextTick);
expect(responseCallback).toHaveBeenCalledWith(null, {
data: { data: 'test data' },
headers: { 'content-type': 'application/json' },
responseCode: 200,
});
expect(res.end).toHaveBeenCalled();
});
test('should handle binary data with ID', async () => {
const mockStream = mock<Readable>();
binaryDataService.getAsStream.mockResolvedValue(mockStream);
setupResponseNodePromise(
responsePromise,
res,
responseCallback,
workflowStartNode,
executionId,
workflow,
);
responsePromise.resolve({
body: { binaryData: { id: 'binary-123' } },
headers: { 'content-type': 'image/jpeg' },
statusCode: 200,
});
await new Promise(process.nextTick);
expect(binaryDataService.getAsStream).toHaveBeenCalledWith('binary-123');
expect(res.header).toHaveBeenCalledWith({ 'content-type': 'image/jpeg' });
expect(mockStream.pipe).toHaveBeenCalledWith(res, { end: false });
expect(finished).toHaveBeenCalledWith(mockStream);
expect(responseCallback).toHaveBeenCalledWith(null, { noWebhookResponse: true });
});
test('should handle buffer response', async () => {
setupResponseNodePromise(
responsePromise,
res,
responseCallback,
workflowStartNode,
executionId,
workflow,
);
const buffer = Buffer.from('test buffer');
responsePromise.resolve({
body: buffer,
headers: { 'content-type': 'text/plain' },
statusCode: 200,
});
await new Promise(process.nextTick);
expect(res.header).toHaveBeenCalledWith({ 'content-type': 'text/plain' });
expect(res.end).toHaveBeenCalledWith(buffer);
expect(responseCallback).toHaveBeenCalledWith(null, { noWebhookResponse: true });
});
test('should handle errors properly', async () => {
setupResponseNodePromise(
responsePromise,
res,
responseCallback,
workflowStartNode,
executionId,
workflow,
);
const error = new Error('Test error');
responsePromise.reject(error);
await new Promise(process.nextTick);
expect(errorReporter.error).toHaveBeenCalledWith(error);
expect(logger.error).toHaveBeenCalledWith(
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
{ executionId, workflowId },
);
expect(responseCallback).toHaveBeenCalledWith(error, {});
});
});
describe('prepareExecutionData', () => {
const workflowStartNode = mock<INode>({ name: 'Start' });
const webhookResultData: IWebhookResponseData = {
workflowData: [[{ json: { data: 'test' } }]],
};
const workflowData = mock<IWorkflowBase>({
id: 'workflow1',
pinData: { nodeA: [{ json: { pinned: true } }] },
});
test('should create new execution data when not provided', () => {
const { runExecutionData, pinData } = prepareExecutionData(
'manual',
workflowStartNode,
webhookResultData,
undefined,
);
const nodeExecuteData = runExecutionData.executionData?.nodeExecutionStack?.[0];
expect(nodeExecuteData).toBeDefined();
expect(nodeExecuteData?.node).toBe(workflowStartNode);
expect(nodeExecuteData?.data.main).toBe(webhookResultData.workflowData);
expect(pinData).toBeUndefined();
});
test('should update existing runExecutionData when executionId is defined', () => {
const executionId = 'test-execution-id';
const nodeExecutionStack: IExecuteData[] = [
{
node: workflowStartNode,
data: { main: [[{ json: { oldData: true } }]] },
source: null,
},
];
const existingRunExecutionData = {
startData: {},
resultData: { runData: {} },
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
} as IRunExecutionData;
prepareExecutionData(
'manual',
workflowStartNode,
webhookResultData,
existingRunExecutionData,
undefined,
undefined,
executionId,
);
expect(nodeExecutionStack[0]?.data.main).toBe(webhookResultData.workflowData);
});
test('should set destination node when provided', () => {
const { runExecutionData } = prepareExecutionData(
'manual',
workflowStartNode,
webhookResultData,
undefined,
{},
'targetNode',
);
expect(runExecutionData.startData?.destinationNode).toBe('targetNode');
});
test('should update execution data with execution data merge', () => {
const runExecutionDataMerge = {
resultData: {
error: { message: 'Test error' },
},
};
const { runExecutionData } = prepareExecutionData(
'manual',
workflowStartNode,
webhookResultData,
undefined,
runExecutionDataMerge,
);
expect(runExecutionData.resultData.error).toEqual({ message: 'Test error' });
});
test('should set pinData when execution mode is manual', () => {
const { runExecutionData, pinData } = prepareExecutionData(
'manual',
workflowStartNode,
webhookResultData,
undefined,
{},
undefined,
undefined,
workflowData,
);
expect(pinData).toBe(workflowData.pinData);
expect(runExecutionData.resultData.pinData).toBe(workflowData.pinData);
});
test('should not set pinData when execution mode is not manual or evaluation', () => {
const { runExecutionData, pinData } = prepareExecutionData(
'webhook',
workflowStartNode,
webhookResultData,
undefined,
{},
undefined,
undefined,
workflowData,
);
expect(pinData).toBeUndefined();
expect(runExecutionData.resultData.pinData).toBeUndefined();
});
});

View File

@@ -30,15 +30,17 @@ import type {
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
IWorkflowBase,
WebhookResponseData,
} from 'n8n-workflow';
import {
ApplicationError,
BINARY_ENCODING,
createDeferredPromise,
ExecutionCancelledError,
FORM_NODE_TYPE,
FORM_TRIGGER_NODE_TYPE,
NodeOperationError,
OperationalError,
UnexpectedError,
WAIT_NODE_TYPE,
} from 'n8n-workflow';
import assert from 'node:assert';
@@ -107,7 +109,7 @@ export function autoDetectResponseMode(
workflowStartNode: INode,
workflow: Workflow,
method: string,
) {
): WebhookResponseMode | undefined {
if (workflowStartNode.type === FORM_TRIGGER_NODE_TYPE && method === 'POST') {
const connectedNodes = workflow.getChildNodes(workflowStartNode.name);
@@ -182,6 +184,135 @@ export const handleFormRedirectionCase = (
const { formDataFileSizeMax } = Container.get(GlobalConfig).endpoints;
const parseFormData = createMultiFormDataParser(formDataFileSizeMax);
/** Return webhook response when responseMode is set to "onReceived" */
export function getResponseOnReceived(
responseData: WebhookResponseData | string | undefined,
webhookResultData: IWebhookResponseData,
responseCode: number,
): IWebhookResponseCallbackData {
const callbackData: IWebhookResponseCallbackData = { responseCode };
// Return response directly and do not wait for the workflow to finish
if (responseData === 'noData') {
// Return without data
} else if (responseData) {
// Return the data specified in the response data option
callbackData.data = responseData as unknown as IDataObject;
} else if (webhookResultData.webhookResponse !== undefined) {
// Data to respond with is given
callbackData.data = webhookResultData.webhookResponse;
} else {
callbackData.data = { message: 'Workflow was started' };
}
return callbackData;
}
export function setupResponseNodePromise(
responsePromise: IDeferredPromise<IN8nHttpFullResponse>,
res: express.Response,
responseCallback: (error: Error | null, data: IWebhookResponseCallbackData) => void,
workflowStartNode: INode,
executionId: string | undefined,
workflow: Workflow,
): void {
void responsePromise.promise
.then(async (response: IN8nHttpFullResponse) => {
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) {
res.header(response.headers);
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
stream.pipe(res, { end: false });
await finished(stream);
responseCallback(null, { noWebhookResponse: true });
} else if (Buffer.isBuffer(response.body)) {
res.header(response.headers);
res.end(response.body);
responseCallback(null, { noWebhookResponse: true });
} else {
// TODO: This probably needs some more changes depending on the options on the
// Webhook Response node
let data: IWebhookResponseCallbackData = {
data: response.body as IDataObject,
headers: response.headers,
responseCode: response.statusCode,
};
data = handleFormRedirectionCase(data, workflowStartNode);
responseCallback(null, data);
}
process.nextTick(() => res.end());
})
.catch(async (error) => {
Container.get(ErrorReporter).error(error);
Container.get(Logger).error(
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
{ executionId, workflowId: workflow.id },
);
responseCallback(error, {});
});
}
export function prepareExecutionData(
executionMode: WorkflowExecuteMode,
workflowStartNode: INode,
webhookResultData: IWebhookResponseData,
runExecutionData: IRunExecutionData | undefined,
runExecutionDataMerge: object = {},
destinationNode?: string,
executionId?: string,
workflowData?: IWorkflowBase,
): { runExecutionData: IRunExecutionData; pinData: IPinData | undefined } {
// Initialize the data of the webhook node
const nodeExecutionStack: IExecuteData[] = [
{
node: workflowStartNode,
data: {
main: webhookResultData.workflowData ?? [],
},
source: null,
},
];
runExecutionData ??= {
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
} as IRunExecutionData;
if (destinationNode && runExecutionData.startData) {
runExecutionData.startData.destinationNode = destinationNode;
}
if (executionId !== undefined) {
// Set the data the webhook node did return on the waiting node if executionId
// already exists as it means that we are restarting an existing execution.
runExecutionData.executionData!.nodeExecutionStack[0].data.main =
webhookResultData.workflowData ?? [];
}
if (Object.keys(runExecutionDataMerge).length !== 0) {
// If data to merge got defined add it to the execution data
Object.assign(runExecutionData, runExecutionDataMerge);
}
let pinData: IPinData | undefined;
const usePinData = ['manual', 'evaluation'].includes(executionMode);
if (usePinData) {
pinData = workflowData?.pinData;
runExecutionData.resultData.pinData = pinData;
}
return { runExecutionData, pinData };
}
/**
* Executes a webhook
*/
@@ -205,11 +336,6 @@ export async function executeWebhook(
workflowStartNode.type,
workflowStartNode.typeVersion,
);
if (nodeType === undefined) {
const errorMessage = `The type of the webhook node "${workflowStartNode.name}" is not known`;
responseCallback(new ApplicationError(errorMessage), {});
throw new InternalServerError(errorMessage);
}
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId,
@@ -229,22 +355,17 @@ export async function executeWebhook(
additionalData.executionId = executionId;
}
// Get the responseMode
let responseMode;
//check if response mode should be set automatically, e.g. multipage form
responseMode = autoDetectResponseMode(workflowStartNode, workflow, req.method);
if (!responseMode) {
responseMode = workflow.expression.getSimpleParameterValue(
const responseMode =
autoDetectResponseMode(workflowStartNode, workflow, req.method) ??
(workflow.expression.getSimpleParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseMode,
executionMode,
additionalKeys,
undefined,
'onReceived',
) as WebhookResponseMode;
}
) as WebhookResponseMode);
const responseCode = workflow.expression.getSimpleParameterValue(
workflowStartNode,
@@ -255,6 +376,9 @@ export async function executeWebhook(
200,
) as number;
// This parameter is used for two different purposes:
// 1. as arbitrary string input defined in the workflow in the "respond immediately" mode,
// 2. as well as WebhookResponseData config in all the other modes
const responseData = workflow.expression.getComplexParameterValue(
workflowStartNode,
webhookData.webhookDescription.responseData,
@@ -262,14 +386,14 @@ export async function executeWebhook(
additionalKeys,
undefined,
'firstEntryJson',
);
) as WebhookResponseData | string | undefined;
if (!['onReceived', 'lastNode', 'responseNode', 'formPage'].includes(responseMode)) {
// If the mode is not known we error. Is probably best like that instead of using
// the default that people know as early as possible (probably already testing phase)
// that something does not resolve properly.
const errorMessage = `The response mode '${responseMode}' is not valid!`;
responseCallback(new ApplicationError(errorMessage), {});
responseCallback(new UnexpectedError(errorMessage), {});
throw new InternalServerError(errorMessage);
}
@@ -356,7 +480,7 @@ export async function executeWebhook(
},
});
responseCallback(new ApplicationError(errorMessage), {});
responseCallback(new UnexpectedError(errorMessage), {});
didSendResponse = true;
// Add error to execution data that it can be logged and send to Editor-UI
@@ -380,10 +504,6 @@ export async function executeWebhook(
};
}
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId,
};
if (webhookData.webhookDescription.responseHeaders !== undefined) {
const responseHeaders = workflow.expression.getComplexParameterValue(
workflowStartNode,
@@ -446,82 +566,23 @@ export async function executeWebhook(
// Now that we know that the workflow should run we can return the default response
// directly if responseMode it set to "onReceived" and a response should be sent
if (responseMode === 'onReceived' && !didSendResponse) {
// Return response directly and do not wait for the workflow to finish
if (responseData === 'noData') {
// Return without data
responseCallback(null, {
responseCode,
});
} else if (responseData) {
// Return the data specified in the response data option
responseCallback(null, {
data: responseData as IDataObject,
responseCode,
});
} else if (webhookResultData.webhookResponse !== undefined) {
// Data to respond with is given
responseCallback(null, {
data: webhookResultData.webhookResponse,
responseCode,
});
} else {
responseCallback(null, {
data: {
message: 'Workflow was started',
},
responseCode,
});
}
const callbackData = getResponseOnReceived(responseData, webhookResultData, responseCode);
responseCallback(null, callbackData);
didSendResponse = true;
}
// Initialize the data of the webhook node
const nodeExecutionStack: IExecuteData[] = [];
nodeExecutionStack.push({
node: workflowStartNode,
data: {
main: webhookResultData.workflowData,
},
source: null,
});
runExecutionData =
runExecutionData ||
({
startData: {},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
},
} as IRunExecutionData);
if (destinationNode && runExecutionData.startData) {
runExecutionData.startData.destinationNode = destinationNode;
}
if (executionId !== undefined) {
// Set the data the webhook node did return on the waiting node if executionId
// already exists as it means that we are restarting an existing execution.
runExecutionData.executionData!.nodeExecutionStack[0].data.main =
webhookResultData.workflowData;
}
if (Object.keys(runExecutionDataMerge).length !== 0) {
// If data to merge got defined add it to the execution data
Object.assign(runExecutionData, runExecutionDataMerge);
}
let pinData: IPinData | undefined;
const usePinData = ['manual', 'evaluation'].includes(executionMode);
if (usePinData) {
pinData = workflowData.pinData;
runExecutionData.resultData.pinData = pinData;
}
// Prepare execution data
const { runExecutionData: preparedRunExecutionData, pinData } = prepareExecutionData(
executionMode,
workflowStartNode,
webhookResultData,
runExecutionData,
runExecutionDataMerge,
destinationNode,
executionId,
workflowData,
);
runExecutionData = preparedRunExecutionData;
const runData: IWorkflowExecutionDataProcess = {
executionMode,
@@ -540,49 +601,14 @@ export async function executeWebhook(
let responsePromise: IDeferredPromise<IN8nHttpFullResponse> | undefined;
if (responseMode === 'responseNode') {
responsePromise = createDeferredPromise<IN8nHttpFullResponse>();
responsePromise.promise
.then(async (response: IN8nHttpFullResponse) => {
if (didSendResponse) {
return;
}
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) {
res.header(response.headers);
const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
stream.pipe(res, { end: false });
await finished(stream);
responseCallback(null, { noWebhookResponse: true });
} else if (Buffer.isBuffer(response.body)) {
res.header(response.headers);
res.end(response.body);
responseCallback(null, { noWebhookResponse: true });
} else {
// TODO: This probably needs some more changes depending on the options on the
// Webhook Response node
let data: IWebhookResponseCallbackData = {
data: response.body as IDataObject,
headers: response.headers,
responseCode: response.statusCode,
};
data = handleFormRedirectionCase(data, workflowStartNode);
responseCallback(null, data);
}
process.nextTick(() => res.end());
didSendResponse = true;
})
.catch(async (error) => {
Container.get(ErrorReporter).error(error);
Container.get(Logger).error(
`Error with Webhook-Response for execution "${executionId}": "${error.message}"`,
{ executionId, workflowId: workflow.id },
setupResponseNodePromise(
responsePromise,
res,
responseCallback,
workflowStartNode,
executionId,
workflow,
);
responseCallback(error, {});
});
}
if (
@@ -645,7 +671,7 @@ export async function executeWebhook(
return undefined;
}
if (usePinData) {
if (pinData) {
data.data.resultData.pinData = pinData;
}
@@ -683,10 +709,6 @@ export async function executeWebhook(
return data;
}
const additionalKeys: IWorkflowDataProxyAdditionalKeys = {
$executionId: executionId,
};
if (!didSendResponse) {
let data: IDataObject | IDataObject[] | undefined;
@@ -694,7 +716,7 @@ export async function executeWebhook(
// Return the JSON data of the first entry
if (returnData.data!.main[0]![0] === undefined) {
responseCallback(new ApplicationError('No item to return got found'), {});
responseCallback(new OperationalError('No item to return got found'), {});
didSendResponse = true;
return undefined;
}
@@ -748,13 +770,13 @@ export async function executeWebhook(
data = returnData.data!.main[0]![0];
if (data === undefined) {
responseCallback(new ApplicationError('No item was found to return'), {});
responseCallback(new OperationalError('No item was found to return'), {});
didSendResponse = true;
return undefined;
}
if (data.binary === undefined) {
responseCallback(new ApplicationError('No binary data was found to return'), {});
responseCallback(new OperationalError('No binary data was found to return'), {});
didSendResponse = true;
return undefined;
}
@@ -770,7 +792,7 @@ export async function executeWebhook(
if (responseBinaryPropertyName === undefined && !didSendResponse) {
responseCallback(
new ApplicationError("No 'responseBinaryPropertyName' is set"),
new OperationalError("No 'responseBinaryPropertyName' is set"),
{},
);
didSendResponse = true;
@@ -781,7 +803,7 @@ export async function executeWebhook(
];
if (binaryData === undefined && !didSendResponse) {
responseCallback(
new ApplicationError(
new OperationalError(
`The binary property '${responseBinaryPropertyName}' which should be returned does not exist`,
),
{},
@@ -830,8 +852,7 @@ export async function executeWebhook(
.catch((e) => {
if (!didSendResponse) {
responseCallback(
new ApplicationError('There was a problem executing the workflow', {
level: 'warning',
new OperationalError('There was a problem executing the workflow', {
cause: e,
}),
{},
@@ -848,8 +869,7 @@ export async function executeWebhook(
const error =
e instanceof UnprocessableRequestError
? e
: new ApplicationError('There was a problem executing the workflow', {
level: 'warning',
: new OperationalError('There was a problem executing the workflow', {
cause: e,
});
if (didSendResponse) throw error;