feat(core): Cache test webhook registrations (#8176)

In a multi-main setup, we have the following issue. The user's client
connects to main A and runs a test webhook, so main A starts listening
for a webhook call. A third-party service sends a request to the test
webhook URL. The request is forwarded by the load balancer to main B,
who is not listening for this webhook call. Therefore, the webhook call
is unhandled.

To start addressing this, cache test webhook registrations, using Redis
for queue mode and in-memory for regular mode. When the third-party
service sends a request to the test webhook URL, the request is
forwarded by the load balancer to main B, who fetches test webhooks from
the cache and, if it finds a match, executes the test webhook. This
should be transparent - test webhook behavior should remain the same as
so far.

Notes:
- Test webhook timeouts are not cached. A timeout is only relevant to
the process it was created in, so another process retrieving from Redis
a "foreign" timeout will be unable to act on it. A timeout also has
circular references, so `cache-manager-ioredis-yet` is unable to
serialize it.
- In a single-main scenario, the timeout remains in the single process
and is cleared on test webhook expiration, successful execution, and
manual cancellation - all as usual.
- In a multi-main scenario, we will need to have the process who
received the webhook call send a message to the process who created the
webhook directing this originating process to clear the timeout. This
will likely be implemented via execution lifecycle hooks and Redis
channel messages checking session ID. This implementation is out of
scope for this PR and will come next.
- Additional data in test webhooks is not cached. From what I can tell,
additional data is not needed for test webhooks to be executed.
Additional data also has circular references, so
`cache-manager-ioredis-yet` is unable to serialize it.

Follow-up to: #8155
This commit is contained in:
Iván Ovejero
2024-01-03 16:58:33 +01:00
committed by GitHub
parent 053503531f
commit 22a5f5258d
8 changed files with 356 additions and 193 deletions

View File

@@ -1,53 +1,39 @@
import type express from 'express';
import { Service } from 'typedi';
import {
type IWebhookData,
type IWorkflowExecuteAdditionalData,
type IHttpRequestMethods,
type Workflow,
type WorkflowActivateMode,
type WorkflowExecuteMode,
WebhookPathTakenError,
Workflow,
} from 'n8n-workflow';
import type {
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
WebhookRegistration,
WebhookAccessControlOptions,
WebhookRequest,
} from '@/Interfaces';
import { Push } from '@/push';
import { NodeTypes } from '@/NodeTypes';
import * as WebhookHelpers from '@/WebhookHelpers';
import { NotFoundError } from './errors/response-errors/not-found.error';
import { TIME } from './constants';
import { WorkflowMissingIdError } from './errors/workflow-missing-id.error';
import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error';
import { TIME } from '@/constants';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import * as NodeExecuteFunctions from 'n8n-core';
import { removeTrailingSlash } from './utils';
import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service';
@Service()
export class TestWebhooks implements IWebhookManager {
constructor(
private readonly push: Push,
private readonly nodeTypes: NodeTypes,
private readonly registrations: TestWebhookRegistrationsService,
) {}
private registrations: { [webhookKey: string]: WebhookRegistration } = {};
private get webhooksByWorkflow() {
const result: { [workflowId: string]: IWebhookData[] } = {};
for (const registration of Object.values(this.registrations)) {
result[registration.webhook.workflowId] ||= [];
result[registration.webhook.workflowId].push(registration.webhook);
}
return result;
}
private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {};
/**
* Return a promise that resolves when the test webhook is called.
@@ -63,7 +49,7 @@ export class TestWebhooks implements IWebhookManager {
request.params = {} as WebhookRequest['params'];
let webhook = this.getActiveWebhook(httpMethod, path);
let webhook = await this.getActiveWebhook(httpMethod, path);
if (!webhook) {
// no static webhook, so check if dynamic
@@ -71,7 +57,7 @@ export class TestWebhooks implements IWebhookManager {
const [webhookId, ...segments] = path.split('/');
webhook = this.getActiveWebhook(httpMethod, segments.join('/'), webhookId);
webhook = await this.getActiveWebhook(httpMethod, segments.join('/'), webhookId);
if (!webhook)
throw new WebhookNotFoundError({
@@ -89,17 +75,22 @@ export class TestWebhooks implements IWebhookManager {
});
}
const key = this.toWebhookKey(webhook);
const key = this.registrations.toKey(webhook);
if (!this.registrations[key])
const registration = await this.registrations.get(key);
if (!registration) {
throw new WebhookNotFoundError({
path,
httpMethod,
webhookMethods: await this.getWebhookMethods(path),
});
}
const { destinationNode, sessionId, workflow, workflowEntity, timeout } =
this.registrations[key];
const { destinationNode, sessionId, workflowEntity } = registration;
const timeout = this.timeouts[key];
const workflow = this.toWorkflow(workflowEntity);
const workflowStartNode = workflow.getNode(webhook.node);
@@ -117,8 +108,8 @@ export class TestWebhooks implements IWebhookManager {
workflowStartNode,
executionMode,
sessionId,
undefined,
undefined,
undefined, // IRunExecutionData
undefined, // executionId
request,
response,
(error: Error | null, data: IResponseCallbackData) => {
@@ -145,14 +136,17 @@ export class TestWebhooks implements IWebhookManager {
// Delete webhook also if an error is thrown
if (timeout) clearTimeout(timeout);
delete this.registrations[key];
await this.registrations.deregisterAll();
await this.deactivateWebhooks(workflow);
});
}
async getWebhookMethods(path: string) {
const webhookMethods = Object.keys(this.registrations)
const allKeys = await this.registrations.getAllKeys();
const webhookMethods = allKeys
.filter((key) => key.includes(path))
.map((key) => key.split('|')[0] as IHttpRequestMethods);
@@ -162,13 +156,20 @@ export class TestWebhooks implements IWebhookManager {
}
async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) {
const webhookKey = Object.keys(this.registrations).find(
(key) => key.includes(path) && key.startsWith(httpMethod),
);
const allKeys = await this.registrations.getAllKeys();
const webhookKey = allKeys.find((key) => key.includes(path) && key.startsWith(httpMethod));
if (!webhookKey) return;
const { workflow } = this.registrations[webhookKey];
const registration = await this.registrations.get(webhookKey);
if (!registration) return;
const { workflowEntity } = registration;
const workflow = this.toWorkflow(workflowEntity);
const webhookNode = Object.values(workflow.nodes).find(
({ type, parameters, typeVersion }) =>
parameters?.path === path &&
@@ -185,14 +186,13 @@ export class TestWebhooks implements IWebhookManager {
*/
async needsWebhook(
workflowEntity: IWorkflowDb,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
executionMode: WorkflowExecuteMode,
activationMode: WorkflowActivateMode,
sessionId?: string,
destinationNode?: string,
) {
if (!workflow.id) throw new WorkflowMissingIdError(workflow);
if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity);
const workflow = this.toWorkflow(workflowEntity);
const webhooks = WebhookHelpers.getWorkflowWebhooks(
workflow,
@@ -205,37 +205,43 @@ export class TestWebhooks implements IWebhookManager {
return false; // no webhooks found to start a workflow
}
// 1+ webhook(s) required, so activate webhook(s)
const timeout = setTimeout(() => this.cancelWebhook(workflow.id), 2 * TIME.MINUTE);
const activatedKeys: string[] = [];
const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), 2 * TIME.MINUTE);
for (const webhook of webhooks) {
const key = this.toWebhookKey(webhook);
const key = this.registrations.toKey(webhook);
const registration = await this.registrations.get(key);
if (this.registrations[key] && !webhook.webhookId) {
if (registration && !webhook.webhookId) {
throw new WebhookPathTakenError(webhook.node);
}
activatedKeys.push(key);
webhook.path = removeTrailingSlash(webhook.path);
webhook.isTest = true;
this.setRegistration({
sessionId,
timeout,
workflow,
workflowEntity,
destinationNode,
webhook,
});
/**
* Remove additional data from webhook because:
*
* - It is not needed for the test webhook to be executed.
* - It contains circular refs that cannot be cached.
*/
const { workflowExecuteAdditionalData: _, ...rest } = webhook;
try {
await this.activateWebhook(workflow, webhook, executionMode, activationMode);
} catch (error) {
activatedKeys.forEach((k) => delete this.registrations[k]);
await workflow.createWebhookIfNotExists(webhook, NodeExecuteFunctions, 'manual', 'manual');
await this.registrations.register({
sessionId,
workflowEntity,
destinationNode,
webhook: rest as IWebhookData,
});
this.timeouts[key] = timeout;
} catch (error) {
await this.deactivateWebhooks(workflow);
delete this.timeouts[key];
throw error;
}
}
@@ -243,11 +249,21 @@ export class TestWebhooks implements IWebhookManager {
return true;
}
cancelWebhook(workflowId: string) {
async cancelWebhook(workflowId: string) {
let foundWebhook = false;
for (const key of Object.keys(this.registrations)) {
const { sessionId, timeout, workflow, workflowEntity } = this.registrations[key];
const allWebhookKeys = await this.registrations.getAllKeys();
for (const key of allWebhookKeys) {
const registration = await this.registrations.get(key);
if (!registration) continue;
const { sessionId, workflowEntity } = registration;
const timeout = this.timeouts[key];
const workflow = this.toWorkflow(workflowEntity);
if (workflowEntity.id !== workflowId) continue;
@@ -261,8 +277,6 @@ export class TestWebhooks implements IWebhookManager {
}
}
delete this.registrations[key];
if (!foundWebhook) {
// As it removes all webhooks of the workflow execute only once
void this.deactivateWebhooks(workflow);
@@ -274,45 +288,19 @@ export class TestWebhooks implements IWebhookManager {
return foundWebhook;
}
async activateWebhook(
workflow: Workflow,
webhook: IWebhookData,
executionMode: WorkflowExecuteMode,
activationMode: WorkflowActivateMode,
) {
webhook.path = removeTrailingSlash(webhook.path);
const key = this.toWebhookKey(webhook);
webhook.isTest = true;
this.registrations[key].webhook = webhook;
try {
await workflow.createWebhookIfNotExists(
webhook,
NodeExecuteFunctions,
executionMode,
activationMode,
);
} catch (error) {
if (this.registrations[key]) delete this.registrations[key];
throw error;
}
}
getActiveWebhook(httpMethod: IHttpRequestMethods, path: string, webhookId?: string) {
const key = this.toWebhookKey({ httpMethod, path, webhookId });
if (this.registrations[key] === undefined) {
return undefined;
}
async getActiveWebhook(httpMethod: IHttpRequestMethods, path: string, webhookId?: string) {
const key = this.registrations.toKey({ httpMethod, path, webhookId });
let webhook: IWebhookData | undefined;
let maxMatches = 0;
const pathElementsSet = new Set(path.split('/'));
// check if static elements match in path
// if more results have been returned choose the one with the most static-route matches
const dynamicWebhook = this.registrations[key].webhook;
const registration = await this.registrations.get(key);
if (!registration) return;
const { webhook: dynamicWebhook } = registration;
const staticElements = dynamicWebhook.path.split('/').filter((ele) => !ele.startsWith(':'));
const allStaticExist = staticElements.every((staticEle) => pathElementsSet.has(staticEle));
@@ -329,48 +317,47 @@ export class TestWebhooks implements IWebhookManager {
return webhook;
}
private toWebhookKey(webhook: Pick<IWebhookData, 'webhookId' | 'httpMethod' | 'path'>) {
const { webhookId, httpMethod, path: webhookPath } = webhook;
if (!webhookId) return `${httpMethod}|${webhookPath}`;
let path = webhookPath;
if (path.startsWith(webhookId)) {
const cutFromIndex = path.indexOf('/') + 1;
path = path.slice(cutFromIndex);
}
return `${httpMethod}|${webhookId}|${path.split('/').length}`;
}
/**
* Deactivate all registered webhooks of a workflow.
* Deactivate all registered test webhooks of a workflow.
*/
async deactivateWebhooks(workflow: Workflow) {
const webhooks = this.webhooksByWorkflow[workflow.id];
const allRegistrations = await this.registrations.getAllRegistrations();
if (!webhooks) return false; // nothing to deactivate
type WebhooksByWorkflow = { [workflowId: string]: IWebhookData[] };
const webhooksByWorkflow = allRegistrations.reduce<WebhooksByWorkflow>((acc, cur) => {
const { workflowId } = cur.webhook;
acc[workflowId] ||= [];
acc[workflowId].push(cur.webhook);
return acc;
}, {});
const webhooks = webhooksByWorkflow[workflow.id];
if (!webhooks) return; // nothing to deactivate
for (const webhook of webhooks) {
await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update');
const key = this.toWebhookKey(webhook);
delete this.registrations[key];
await this.registrations.deregister(webhook);
}
return true;
}
clearRegistrations() {
this.registrations = {};
}
setRegistration(registration: WebhookRegistration) {
const key = this.toWebhookKey(registration.webhook);
this.registrations[key] = registration;
/**
* Convert a `WorkflowEntity` from `typeorm` to a `Workflow` from `n8n-workflow`.
*/
private toWorkflow(workflowEntity: IWorkflowDb) {
return new Workflow({
id: workflowEntity.id,
name: workflowEntity.name,
nodes: workflowEntity.nodes,
connections: workflowEntity.connections,
active: false,
nodeTypes: this.nodeTypes,
staticData: undefined,
settings: workflowEntity.settings,
});
}
}