mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-21 03:42:16 +00:00
refactor(core): Improve test-webhooks (no-changelog) (#8069)
Remove duplication, improve readability, and expand tests for `TestWebhooks.ts` - in anticipation for storing test webhooks in Redis. --------- Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
@@ -8,45 +8,38 @@ import {
|
||||
type Workflow,
|
||||
type WorkflowActivateMode,
|
||||
type WorkflowExecuteMode,
|
||||
ApplicationError,
|
||||
WebhookPathTakenError,
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { ActiveWebhooks } from '@/ActiveWebhooks';
|
||||
import type {
|
||||
IResponseCallbackData,
|
||||
IWebhookManager,
|
||||
IWorkflowDb,
|
||||
RegisteredWebhook,
|
||||
WebhookAccessControlOptions,
|
||||
WebhookRequest,
|
||||
} from '@/Interfaces';
|
||||
import { Push } from '@/push';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import * as WebhookHelpers from '@/WebhookHelpers';
|
||||
import { webhookNotFoundErrorMessage } from './utils';
|
||||
import { NotFoundError } from './errors/response-errors/not-found.error';
|
||||
|
||||
const WEBHOOK_TEST_UNREGISTERED_HINT =
|
||||
"Click the 'Execute workflow' button on the canvas, then try again. (In test mode, the webhook only works for one call after you click this button)";
|
||||
import { TIME } from './constants';
|
||||
import { WorkflowMissingIdError } from './errors/workflow-missing-id.error';
|
||||
import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error';
|
||||
import * as NodeExecuteFunctions from 'n8n-core';
|
||||
|
||||
@Service()
|
||||
export class TestWebhooks implements IWebhookManager {
|
||||
private testWebhookData: {
|
||||
[key: string]: {
|
||||
sessionId?: string;
|
||||
timeout: NodeJS.Timeout;
|
||||
workflowData: IWorkflowDb;
|
||||
workflow: Workflow;
|
||||
destinationNode?: string;
|
||||
};
|
||||
} = {};
|
||||
|
||||
constructor(
|
||||
private readonly activeWebhooks: ActiveWebhooks,
|
||||
private readonly push: Push,
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
) {
|
||||
activeWebhooks.testWebhooks = true;
|
||||
}
|
||||
) {}
|
||||
|
||||
private registeredWebhooks: { [webhookKey: string]: RegisteredWebhook } = {};
|
||||
|
||||
private workflowWebhooks: { [workflowId: string]: IWebhookData[] } = {};
|
||||
|
||||
private webhookUrls: { [webhookUrl: string]: IWebhookData[] } = {};
|
||||
|
||||
/**
|
||||
* Executes a test-webhook and returns the data. It also makes sure that the
|
||||
@@ -58,69 +51,57 @@ export class TestWebhooks implements IWebhookManager {
|
||||
response: express.Response,
|
||||
): Promise<IResponseCallbackData> {
|
||||
const httpMethod = request.method;
|
||||
let path = request.params.path;
|
||||
|
||||
// Reset request parameters
|
||||
let path = request.params.path.endsWith('/')
|
||||
? request.params.path.slice(0, -1)
|
||||
: request.params.path;
|
||||
|
||||
request.params = {} as WebhookRequest['params'];
|
||||
|
||||
// Remove trailing slash
|
||||
if (path.endsWith('/')) {
|
||||
path = path.slice(0, -1);
|
||||
}
|
||||
let webhook = this.getActiveWebhook(httpMethod, path);
|
||||
|
||||
const { activeWebhooks, push, testWebhookData } = this;
|
||||
if (!webhook) {
|
||||
// no static webhook, so check if dynamic
|
||||
// e.g. `/webhook-test/<uuid>/user/:id/create`
|
||||
|
||||
let webhookData: IWebhookData | undefined = activeWebhooks.get(httpMethod, path);
|
||||
const [webhookId, ...segments] = path.split('/');
|
||||
|
||||
// check if path is dynamic
|
||||
if (webhookData === undefined) {
|
||||
const pathElements = path.split('/');
|
||||
const webhookId = pathElements.shift();
|
||||
webhook = this.getActiveWebhook(httpMethod, segments.join('/'), webhookId);
|
||||
|
||||
webhookData = activeWebhooks.get(httpMethod, pathElements.join('/'), webhookId);
|
||||
if (webhookData === undefined) {
|
||||
// The requested webhook is not registered
|
||||
const methods = await this.getWebhookMethods(path);
|
||||
throw new NotFoundError(
|
||||
webhookNotFoundErrorMessage(path, httpMethod, methods),
|
||||
WEBHOOK_TEST_UNREGISTERED_HINT,
|
||||
);
|
||||
}
|
||||
if (!webhook)
|
||||
throw new WebhookNotFoundError({
|
||||
path,
|
||||
httpMethod,
|
||||
webhookMethods: await this.getWebhookMethods(path),
|
||||
});
|
||||
|
||||
path = webhookData.path;
|
||||
// extracting params from path
|
||||
path.split('/').forEach((ele, index) => {
|
||||
if (ele.startsWith(':')) {
|
||||
// write params to req.params
|
||||
// @ts-ignore
|
||||
request.params[ele.slice(1)] = pathElements[index];
|
||||
path = webhook.path;
|
||||
|
||||
path.split('/').forEach((segment, index) => {
|
||||
if (segment.startsWith(':')) {
|
||||
request.params[segment.slice(1)] = segments[index];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const { workflowId } = webhookData;
|
||||
const webhookKey = `${activeWebhooks.getWebhookKey(
|
||||
webhookData.httpMethod,
|
||||
webhookData.path,
|
||||
webhookData.webhookId,
|
||||
)}|${workflowId}`;
|
||||
const key = [
|
||||
this.toWebhookKey(webhook.httpMethod, webhook.path, webhook.webhookId),
|
||||
webhook.workflowId,
|
||||
].join('|');
|
||||
|
||||
// TODO: Clean that duplication up one day and improve code generally
|
||||
if (testWebhookData[webhookKey] === undefined) {
|
||||
// The requested webhook is not registered
|
||||
const methods = await this.getWebhookMethods(path);
|
||||
throw new NotFoundError(
|
||||
webhookNotFoundErrorMessage(path, httpMethod, methods),
|
||||
WEBHOOK_TEST_UNREGISTERED_HINT,
|
||||
);
|
||||
}
|
||||
if (!(key in this.registeredWebhooks))
|
||||
throw new WebhookNotFoundError({
|
||||
path,
|
||||
httpMethod,
|
||||
webhookMethods: await this.getWebhookMethods(path),
|
||||
});
|
||||
|
||||
const { destinationNode, sessionId, workflow, workflowData, timeout } =
|
||||
testWebhookData[webhookKey];
|
||||
const { destinationNode, sessionId, workflow, workflowEntity, timeout } =
|
||||
this.registeredWebhooks[key];
|
||||
|
||||
// Get the node which has the webhook defined to know where to start from and to
|
||||
// get additional data
|
||||
const workflowStartNode = workflow.getNode(webhookData.node);
|
||||
const workflowStartNode = workflow.getNode(webhook.node);
|
||||
if (workflowStartNode === null) {
|
||||
throw new NotFoundError('Could not find node to process webhook.');
|
||||
}
|
||||
@@ -130,8 +111,8 @@ export class TestWebhooks implements IWebhookManager {
|
||||
const executionMode = 'manual';
|
||||
const executionId = await WebhookHelpers.executeWebhook(
|
||||
workflow,
|
||||
webhookData!,
|
||||
workflowData,
|
||||
webhook!,
|
||||
workflowEntity,
|
||||
workflowStartNode,
|
||||
executionMode,
|
||||
sessionId,
|
||||
@@ -153,107 +134,101 @@ export class TestWebhooks implements IWebhookManager {
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (sessionId !== undefined) {
|
||||
push.send('testWebhookReceived', { workflowId, executionId }, sessionId);
|
||||
this.push.send(
|
||||
'testWebhookReceived',
|
||||
{ workflowId: webhook?.workflowId, executionId },
|
||||
sessionId,
|
||||
);
|
||||
}
|
||||
} catch {}
|
||||
|
||||
// Delete webhook also if an error is thrown
|
||||
if (timeout) clearTimeout(timeout);
|
||||
delete testWebhookData[webhookKey];
|
||||
delete this.registeredWebhooks[key];
|
||||
|
||||
await activeWebhooks.removeWorkflow(workflow);
|
||||
await this.deactivateWebhooksFor(workflow);
|
||||
});
|
||||
}
|
||||
|
||||
async getWebhookMethods(path: string): Promise<IHttpRequestMethods[]> {
|
||||
const webhookMethods = this.activeWebhooks.getWebhookMethods(path);
|
||||
if (!webhookMethods.length) {
|
||||
// The requested webhook is not registered
|
||||
throw new NotFoundError(webhookNotFoundErrorMessage(path), WEBHOOK_TEST_UNREGISTERED_HINT);
|
||||
}
|
||||
async getWebhookMethods(path: string) {
|
||||
const webhookMethods = Object.keys(this.webhookUrls)
|
||||
.filter((key) => key.includes(path))
|
||||
.map((key) => key.split('|')[0] as IHttpRequestMethods);
|
||||
|
||||
if (!webhookMethods.length) throw new WebhookNotFoundError({ path });
|
||||
|
||||
return webhookMethods;
|
||||
}
|
||||
|
||||
async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) {
|
||||
const webhookKey = Object.keys(this.testWebhookData).find(
|
||||
const webhookKey = Object.keys(this.registeredWebhooks).find(
|
||||
(key) => key.includes(path) && key.startsWith(httpMethod),
|
||||
);
|
||||
|
||||
if (!webhookKey) return;
|
||||
|
||||
const { workflow } = this.testWebhookData[webhookKey];
|
||||
const { workflow } = this.registeredWebhooks[webhookKey];
|
||||
const webhookNode = Object.values(workflow.nodes).find(
|
||||
({ type, parameters, typeVersion }) =>
|
||||
parameters?.path === path &&
|
||||
(parameters?.httpMethod ?? 'GET') === httpMethod &&
|
||||
'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion),
|
||||
);
|
||||
|
||||
return webhookNode?.parameters?.options as WebhookAccessControlOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if it has to wait for webhook data to execute the workflow.
|
||||
* If yes it waits for it and resolves with the result of the workflow if not it simply resolves with undefined
|
||||
*/
|
||||
async needsWebhookData(
|
||||
workflowData: IWorkflowDb,
|
||||
async needsWebhook(
|
||||
workflowEntity: IWorkflowDb,
|
||||
workflow: Workflow,
|
||||
additionalData: IWorkflowExecuteAdditionalData,
|
||||
mode: WorkflowExecuteMode,
|
||||
activation: WorkflowActivateMode,
|
||||
executionMode: WorkflowExecuteMode,
|
||||
activationMode: WorkflowActivateMode,
|
||||
sessionId?: string,
|
||||
destinationNode?: string,
|
||||
): Promise<boolean> {
|
||||
) {
|
||||
if (!workflow.id) throw new WorkflowMissingIdError(workflow);
|
||||
|
||||
const webhooks = WebhookHelpers.getWorkflowWebhooks(
|
||||
workflow,
|
||||
additionalData,
|
||||
destinationNode,
|
||||
true,
|
||||
);
|
||||
if (!webhooks.find((webhook) => webhook.webhookDescription.restartWebhook !== true)) {
|
||||
// No webhooks found to start a workflow
|
||||
return false;
|
||||
|
||||
if (!webhooks.find((w) => w.webhookDescription.restartWebhook !== true)) {
|
||||
return false; // no webhooks found to start a workflow
|
||||
}
|
||||
|
||||
if (workflow.id === undefined) {
|
||||
throw new ApplicationError(
|
||||
'Webhooks can only be added for saved workflows as an ID is needed',
|
||||
);
|
||||
}
|
||||
|
||||
// Remove test-webhooks automatically if they do not get called (after 120 seconds)
|
||||
const timeout = setTimeout(() => {
|
||||
this.cancelTestWebhook(workflowData.id);
|
||||
}, 120000);
|
||||
this.cancelTestWebhook(workflowEntity.id);
|
||||
}, 2 * TIME.MINUTE);
|
||||
|
||||
const { activeWebhooks, testWebhookData } = this;
|
||||
const activatedKeys: string[] = [];
|
||||
|
||||
let key: string;
|
||||
const activatedKey: string[] = [];
|
||||
for (const webhook of webhooks) {
|
||||
const key = [
|
||||
this.toWebhookKey(webhook.httpMethod, webhook.path, webhook.webhookId),
|
||||
workflowEntity.id,
|
||||
].join('|');
|
||||
|
||||
for (const webhookData of webhooks) {
|
||||
key = `${activeWebhooks.getWebhookKey(
|
||||
webhookData.httpMethod,
|
||||
webhookData.path,
|
||||
webhookData.webhookId,
|
||||
)}|${workflowData.id}`;
|
||||
activatedKeys.push(key);
|
||||
|
||||
activatedKey.push(key);
|
||||
|
||||
testWebhookData[key] = {
|
||||
this.registeredWebhooks[key] = {
|
||||
sessionId,
|
||||
timeout,
|
||||
workflow,
|
||||
workflowData,
|
||||
workflowEntity,
|
||||
destinationNode,
|
||||
};
|
||||
|
||||
try {
|
||||
await activeWebhooks.add(workflow, webhookData, mode, activation);
|
||||
await this.activateWebhook(workflow, webhook, executionMode, activationMode);
|
||||
} catch (error) {
|
||||
activatedKey.forEach((deleteKey) => delete testWebhookData[deleteKey]);
|
||||
activatedKeys.forEach((ak) => delete this.registeredWebhooks[ak]);
|
||||
|
||||
await this.deactivateWebhooksFor(workflow);
|
||||
|
||||
await activeWebhooks.removeWorkflow(workflow);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
@@ -261,38 +236,29 @@ export class TestWebhooks implements IWebhookManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a test webhook of the workflow with the given id
|
||||
*
|
||||
*/
|
||||
cancelTestWebhook(workflowId: string): boolean {
|
||||
cancelTestWebhook(workflowId: string) {
|
||||
let foundWebhook = false;
|
||||
const { activeWebhooks, push, testWebhookData } = this;
|
||||
|
||||
for (const webhookKey of Object.keys(testWebhookData)) {
|
||||
const { sessionId, timeout, workflow, workflowData } = testWebhookData[webhookKey];
|
||||
for (const key of Object.keys(this.registeredWebhooks)) {
|
||||
const { sessionId, timeout, workflow, workflowEntity } = this.registeredWebhooks[key];
|
||||
|
||||
if (workflowData.id !== workflowId) {
|
||||
continue;
|
||||
}
|
||||
if (workflowEntity.id !== workflowId) continue;
|
||||
|
||||
clearTimeout(timeout);
|
||||
|
||||
// Inform editor-ui that webhook got received
|
||||
if (sessionId !== undefined) {
|
||||
try {
|
||||
push.send('testWebhookDeleted', { workflowId }, sessionId);
|
||||
this.push.send('testWebhookDeleted', { workflowId }, sessionId);
|
||||
} catch {
|
||||
// Could not inform editor, probably is not connected anymore. So simply go on.
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the webhook
|
||||
delete testWebhookData[webhookKey];
|
||||
delete this.registeredWebhooks[key];
|
||||
|
||||
if (!foundWebhook) {
|
||||
// As it removes all webhooks of the workflow execute only once
|
||||
void activeWebhooks.removeWorkflow(workflow);
|
||||
void this.deactivateWebhooksFor(workflow);
|
||||
}
|
||||
|
||||
foundWebhook = true;
|
||||
@@ -300,4 +266,127 @@ export class TestWebhooks implements IWebhookManager {
|
||||
|
||||
return foundWebhook;
|
||||
}
|
||||
|
||||
async activateWebhook(
|
||||
workflow: Workflow,
|
||||
webhook: IWebhookData,
|
||||
executionMode: WorkflowExecuteMode,
|
||||
activationMode: WorkflowActivateMode,
|
||||
) {
|
||||
if (!workflow.id) throw new WorkflowMissingIdError(workflow);
|
||||
|
||||
if (webhook.path.endsWith('/')) {
|
||||
webhook.path = webhook.path.slice(0, -1);
|
||||
}
|
||||
|
||||
const key = this.toWebhookKey(webhook.httpMethod, webhook.path, webhook.webhookId);
|
||||
|
||||
// check that there is not a webhook already registered with that path/method
|
||||
if (this.webhookUrls[key] && !webhook.webhookId) {
|
||||
throw new WebhookPathTakenError(webhook.node);
|
||||
}
|
||||
|
||||
if (this.workflowWebhooks[webhook.workflowId] === undefined) {
|
||||
this.workflowWebhooks[webhook.workflowId] = [];
|
||||
}
|
||||
|
||||
// Make the webhook available directly because sometimes to create it successfully
|
||||
// it gets called
|
||||
if (!this.webhookUrls[key]) {
|
||||
this.webhookUrls[key] = [];
|
||||
}
|
||||
webhook.isTest = true;
|
||||
this.webhookUrls[key].push(webhook);
|
||||
|
||||
try {
|
||||
await workflow.createWebhookIfNotExists(
|
||||
webhook,
|
||||
NodeExecuteFunctions,
|
||||
executionMode,
|
||||
activationMode,
|
||||
);
|
||||
} catch (error) {
|
||||
// If there was a problem unregister the webhook again
|
||||
if (this.webhookUrls[key].length <= 1) {
|
||||
delete this.webhookUrls[key];
|
||||
} else {
|
||||
this.webhookUrls[key] = this.webhookUrls[key].filter((w) => w.path !== w.path);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
this.workflowWebhooks[webhook.workflowId].push(webhook);
|
||||
}
|
||||
|
||||
getActiveWebhook(httpMethod: IHttpRequestMethods, path: string, webhookId?: string) {
|
||||
const webhookKey = this.toWebhookKey(httpMethod, path, webhookId);
|
||||
if (this.webhookUrls[webhookKey] === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
let webhook: IWebhookData | undefined;
|
||||
let maxMatches = 0;
|
||||
const pathElementsSet = new Set(path.split('/'));
|
||||
// check if static elements match in path
|
||||
// if more results have been returned choose the one with the most static-route matches
|
||||
this.webhookUrls[webhookKey].forEach((dynamicWebhook) => {
|
||||
const staticElements = dynamicWebhook.path.split('/').filter((ele) => !ele.startsWith(':'));
|
||||
const allStaticExist = staticElements.every((staticEle) => pathElementsSet.has(staticEle));
|
||||
|
||||
if (allStaticExist && staticElements.length > maxMatches) {
|
||||
maxMatches = staticElements.length;
|
||||
webhook = dynamicWebhook;
|
||||
}
|
||||
// handle routes with no static elements
|
||||
else if (staticElements.length === 0 && !webhook) {
|
||||
webhook = dynamicWebhook;
|
||||
}
|
||||
});
|
||||
|
||||
return webhook;
|
||||
}
|
||||
|
||||
toWebhookKey(httpMethod: IHttpRequestMethods, path: string, webhookId?: string) {
|
||||
if (!webhookId) return `${httpMethod}|${path}`;
|
||||
|
||||
if (path.startsWith(webhookId)) {
|
||||
const cutFromIndex = path.indexOf('/') + 1;
|
||||
|
||||
path = path.slice(cutFromIndex);
|
||||
}
|
||||
|
||||
return `${httpMethod}|${webhookId}|${path.split('/').length}`;
|
||||
}
|
||||
|
||||
async deactivateWebhooksFor(workflow: Workflow) {
|
||||
const workflowId = workflow.id;
|
||||
|
||||
if (this.workflowWebhooks[workflowId] === undefined) {
|
||||
// If it did not exist then there is nothing to remove
|
||||
return false;
|
||||
}
|
||||
|
||||
const webhooks = this.workflowWebhooks[workflowId];
|
||||
|
||||
const mode = 'internal';
|
||||
|
||||
// Go through all the registered webhooks of the workflow and remove them
|
||||
|
||||
for (const webhookData of webhooks) {
|
||||
await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update');
|
||||
|
||||
const key = this.toWebhookKey(
|
||||
webhookData.httpMethod,
|
||||
webhookData.path,
|
||||
webhookData.webhookId,
|
||||
);
|
||||
|
||||
delete this.webhookUrls[key];
|
||||
}
|
||||
|
||||
// Remove also the workflow-webhook entry
|
||||
delete this.workflowWebhooks[workflowId];
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user