refactor(core): Move ExecutionLifecycleHooks to core (#13042)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-02-07 18:16:37 +01:00
committed by GitHub
parent cae98e733d
commit d41ca832dc
24 changed files with 911 additions and 886 deletions

View File

@@ -6,10 +6,10 @@ import type {
IRequestOptions,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowHooks,
} from 'n8n-workflow';
import nock from 'nock';
import type { ExecutionLifecycleHooks } from '@/execution-engine/execution-lifecycle-hooks';
import {
copyInputItems,
invokeAxios,
@@ -21,12 +21,12 @@ describe('NodeExecuteFunctions', () => {
describe('proxyRequestToAxios', () => {
const baseUrl = 'http://example.de';
const workflow = mock<Workflow>();
const hooks = mock<WorkflowHooks>();
const hooks = mock<ExecutionLifecycleHooks>();
const additionalData = mock<IWorkflowExecuteAdditionalData>({ hooks });
const node = mock<INode>();
beforeEach(() => {
hooks.executeHookFunctions.mockClear();
hooks.runHook.mockClear();
});
test('should rethrow an error with `status` property', async () => {
@@ -42,10 +42,7 @@ describe('NodeExecuteFunctions', () => {
test('should not throw if the response status is 200', async () => {
nock(baseUrl).get('/test').reply(200);
await proxyRequestToAxios(workflow, additionalData, node, `${baseUrl}/test`);
expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [
workflow.id,
node,
]);
expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]);
});
test('should throw if the response status is 403', async () => {
@@ -65,7 +62,7 @@ describe('NodeExecuteFunctions', () => {
expect(error.config).toBeUndefined();
expect(error.message).toEqual('403 - "Forbidden"');
}
expect(hooks.executeHookFunctions).not.toHaveBeenCalled();
expect(hooks.runHook).not.toHaveBeenCalled();
});
test('should not throw if the response status is 404, but `simple` option is set to `false`', async () => {
@@ -76,10 +73,7 @@ describe('NodeExecuteFunctions', () => {
});
expect(response).toEqual('Not Found');
expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [
workflow.id,
node,
]);
expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]);
});
test('should return full response when `resolveWithFullResponse` is set to true', async () => {
@@ -96,10 +90,7 @@ describe('NodeExecuteFunctions', () => {
statusCode: 404,
statusMessage: 'Not Found',
});
expect(hooks.executeHookFunctions).toHaveBeenCalledWith('nodeFetchedData', [
workflow.id,
node,
]);
expect(hooks.runHook).toHaveBeenCalledWith('nodeFetchedData', [workflow.id, node]);
});
describe('redirects', () => {

View File

@@ -0,0 +1,113 @@
import { mock } from 'jest-mock-extended';
import type {
IDataObject,
IExecuteResponsePromiseData,
INode,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowBase,
Workflow,
} from 'n8n-workflow';
import type {
ExecutionLifecycleHookName,
ExecutionLifecyleHookHandlers,
} from '../execution-lifecycle-hooks';
import { ExecutionLifecycleHooks } from '../execution-lifecycle-hooks';
describe('ExecutionLifecycleHooks', () => {
const executionId = '123';
const workflowData = mock<IWorkflowBase>();
let hooks: ExecutionLifecycleHooks;
beforeEach(() => {
jest.clearAllMocks();
hooks = new ExecutionLifecycleHooks('internal', executionId, workflowData);
});
describe('constructor()', () => {
it('should initialize with correct properties', () => {
expect(hooks.mode).toBe('internal');
expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toBe(workflowData);
expect(hooks.handlers).toEqual({
nodeExecuteAfter: [],
nodeExecuteBefore: [],
nodeFetchedData: [],
sendResponse: [],
workflowExecuteAfter: [],
workflowExecuteBefore: [],
});
});
});
describe('addHandler()', () => {
const hooksHandlers =
mock<{
[K in keyof ExecutionLifecyleHookHandlers]: ExecutionLifecyleHookHandlers[K][number];
}>();
const testCases: Array<{
hook: ExecutionLifecycleHookName;
args: Parameters<ExecutionLifecyleHookHandlers[keyof ExecutionLifecyleHookHandlers][number]>;
}> = [
{ hook: 'nodeExecuteBefore', args: ['testNode'] },
{
hook: 'nodeExecuteAfter',
args: ['testNode', mock<ITaskData>(), mock<IRunExecutionData>()],
},
{ hook: 'workflowExecuteBefore', args: [mock<Workflow>(), mock<IRunExecutionData>()] },
{ hook: 'workflowExecuteAfter', args: [mock<IRun>(), mock<IDataObject>()] },
{ hook: 'sendResponse', args: [mock<IExecuteResponsePromiseData>()] },
{ hook: 'nodeFetchedData', args: ['workflow123', mock<INode>()] },
];
test.each(testCases)(
'should add handlers to $hook hook and call them',
async ({ hook, args }) => {
hooks.addHandler(hook, hooksHandlers[hook]);
await hooks.runHook(hook, args);
expect(hooksHandlers[hook]).toHaveBeenCalledWith(...args);
},
);
});
describe('runHook()', () => {
it('should execute multiple hooks in order', async () => {
const executionOrder: string[] = [];
const hook1 = jest.fn().mockImplementation(async () => {
executionOrder.push('hook1');
});
const hook2 = jest.fn().mockImplementation(async () => {
executionOrder.push('hook2');
});
hooks.addHandler('nodeExecuteBefore', hook1, hook2);
await hooks.runHook('nodeExecuteBefore', ['testNode']);
expect(executionOrder).toEqual(['hook1', 'hook2']);
expect(hook1).toHaveBeenCalled();
expect(hook2).toHaveBeenCalled();
});
it('should maintain correct "this" context', async () => {
const hook = jest.fn().mockImplementation(async function (this: ExecutionLifecycleHooks) {
expect(this.executionId).toBe(executionId);
expect(this.mode).toBe('internal');
});
hooks.addHandler('nodeExecuteBefore', hook);
await hooks.runHook('nodeExecuteBefore', ['testNode']);
expect(hook).toHaveBeenCalled();
});
it('should handle errors in hooks', async () => {
const errorHook = jest.fn().mockRejectedValue(new Error('Hook failed'));
hooks.addHandler('nodeExecuteBefore', errorHook);
await expect(hooks.runHook('nodeExecuteBefore', ['testNode'])).rejects.toThrow('Hook failed');
});
});
});

View File

@@ -9,10 +9,10 @@ import type {
INodeType,
INodeTypes,
ITriggerFunctions,
WorkflowHooks,
IRun,
} from 'n8n-workflow';
import { ExecutionLifecycleHooks } from '../execution-lifecycle-hooks';
import { TriggersAndPollers } from '../triggers-and-pollers';
describe('TriggersAndPollers', () => {
@@ -23,15 +23,8 @@ describe('TriggersAndPollers', () => {
});
const nodeTypes = mock<INodeTypes>();
const workflow = mock<Workflow>({ nodeTypes });
const hookFunctions = mock<WorkflowHooks['hookFunctions']>({
sendResponse: [],
workflowExecuteAfter: [],
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
hooks: {
hookFunctions,
},
});
const hooks = new ExecutionLifecycleHooks('internal', '123', mock());
const additionalData = mock<IWorkflowExecuteAdditionalData>({ hooks });
const triggersAndPollers = new TriggersAndPollers();
beforeEach(() => {
@@ -98,8 +91,7 @@ describe('TriggersAndPollers', () => {
getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise);
expect(hookFunctions.sendResponse?.length).toBe(1);
await hookFunctions.sendResponse![0]?.({ testResponse: true });
await hooks.runHook('sendResponse', [{ testResponse: true }]);
expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true });
});
@@ -111,10 +103,10 @@ describe('TriggersAndPollers', () => {
await runTriggerHelper('manual');
getMockTriggerFunctions()?.emit?.(mockEmitData, responsePromise, donePromise);
await hookFunctions.sendResponse![0]?.({ testResponse: true });
await hooks.runHook('sendResponse', [{ testResponse: true }]);
expect(responsePromise.resolve).toHaveBeenCalledWith({ testResponse: true });
await hookFunctions.workflowExecuteAfter?.[0]?.(mockRunData, {});
await hooks.runHook('workflowExecuteAfter', [mockRunData, {}]);
expect(donePromise.resolve).toHaveBeenCalledWith(mockRunData);
});
});

View File

@@ -44,6 +44,7 @@ import {
import * as Helpers from '@test/helpers';
import { legacyWorkflowExecuteTests, v1WorkflowExecuteTests } from '@test/helpers/constants';
import type { ExecutionLifecycleHooks } from '../execution-lifecycle-hooks';
import { DirectedGraph } from '../partial-execution-utils';
import * as partialExecutionUtils from '../partial-execution-utils';
import { createNodeData, toITaskData } from '../partial-execution-utils/__tests__/helpers';
@@ -1211,6 +1212,7 @@ describe('WorkflowExecute', () => {
let runExecutionData: IRunExecutionData;
let workflowExecute: WorkflowExecute;
let additionalData: IWorkflowExecuteAdditionalData;
beforeEach(() => {
runExecutionData = {
@@ -1224,9 +1226,12 @@ describe('WorkflowExecute', () => {
waitingExecutionSource: null,
},
};
workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData);
additionalData = mock();
additionalData.hooks = mock<ExecutionLifecycleHooks>();
jest.spyOn(workflowExecute, 'executeHook').mockResolvedValue(undefined);
workflowExecute = new WorkflowExecute(additionalData, 'manual', runExecutionData);
jest.spyOn(additionalData.hooks, 'runHook').mockResolvedValue(undefined);
jest.spyOn(workflowExecute, 'moveNodeMetadata').mockImplementation();
});
@@ -1294,7 +1299,7 @@ describe('WorkflowExecute', () => {
// Verify static data handling
expect(result).toBeDefined();
expect(workflowExecute.moveNodeMetadata).toHaveBeenCalled();
expect(workflowExecute.executeHook).toHaveBeenCalledWith('workflowExecuteAfter', [
expect(additionalData.hooks?.runHook).toHaveBeenCalledWith('workflowExecuteAfter', [
result,
workflow.staticData,
]);

View File

@@ -0,0 +1,119 @@
import type {
IDataObject,
IExecuteResponsePromiseData,
INode,
IRun,
IRunExecutionData,
ITaskData,
IWorkflowBase,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
export type ExecutionLifecyleHookHandlers = {
nodeExecuteBefore: Array<
(this: ExecutionLifecycleHooks, nodeName: string) => Promise<void> | void
>;
nodeExecuteAfter: Array<
(
this: ExecutionLifecycleHooks,
nodeName: string,
data: ITaskData,
executionData: IRunExecutionData,
) => Promise<void> | void
>;
workflowExecuteBefore: Array<
(
this: ExecutionLifecycleHooks,
workflow: Workflow,
data?: IRunExecutionData,
) => Promise<void> | void
>;
workflowExecuteAfter: Array<
(this: ExecutionLifecycleHooks, data: IRun, newStaticData: IDataObject) => Promise<void> | void
>;
/** Used by trigger and webhook nodes to respond back to the request */
sendResponse: Array<
(this: ExecutionLifecycleHooks, response: IExecuteResponsePromiseData) => Promise<void> | void
>;
/**
* Executed after a node fetches data
* - For a webhook node, after the node had been run.
* - For a http-request node, or any other node that makes http requests that still use the deprecated request* methods, after every successful http request
s */
nodeFetchedData: Array<
(this: ExecutionLifecycleHooks, workflowId: string, node: INode) => Promise<void> | void
>;
};
export type ExecutionLifecycleHookName = keyof ExecutionLifecyleHookHandlers;
/**
* Contains hooks that trigger at specific events in an execution's lifecycle. Every hook has an array of callbacks to run.
*
* Common use cases include:
* - Saving execution progress to database
* - Pushing execution status updates to the frontend
* - Recording workflow statistics
* - Running external hooks for execution events
* - Error and Cancellation handling and cleanup
*
* @example
* ```typescript
* const hooks = new ExecutionLifecycleHooks(mode, executionId, workflowData);
* hooks.add('workflowExecuteAfter, async function(fullRunData) {
* await saveToDatabase(executionId, fullRunData);
*});
* ```
*/
export class ExecutionLifecycleHooks {
readonly handlers: ExecutionLifecyleHookHandlers = {
nodeExecuteAfter: [],
nodeExecuteBefore: [],
nodeFetchedData: [],
sendResponse: [],
workflowExecuteAfter: [],
workflowExecuteBefore: [],
};
constructor(
readonly mode: WorkflowExecuteMode,
readonly executionId: string,
readonly workflowData: IWorkflowBase,
) {}
addHandler<Hook extends keyof ExecutionLifecyleHookHandlers>(
hookName: Hook,
...handlers: Array<ExecutionLifecyleHookHandlers[Hook][number]>
): void {
// @ts-expect-error FIX THIS
this.handlers[hookName].push(...handlers);
}
async runHook<
Hook extends keyof ExecutionLifecyleHookHandlers,
Params extends unknown[] = Parameters<
Exclude<ExecutionLifecyleHookHandlers[Hook], undefined>[number]
>,
>(hookName: Hook, parameters: Params) {
const hooks = this.handlers[hookName];
for (const hookFunction of hooks) {
const typedHookFunction = hookFunction as unknown as (
this: ExecutionLifecycleHooks,
...args: Params
) => Promise<void>;
await typedHookFunction.apply(this, parameters);
}
}
}
declare module 'n8n-workflow' {
interface IWorkflowExecuteAdditionalData {
hooks?: ExecutionLifecycleHooks;
}
}

View File

@@ -5,3 +5,4 @@ export * from './node-execution-context';
export * from './partial-execution-utils';
export * from './node-execution-context/utils/execution-metadata';
export * from './workflow-execute';
export { ExecutionLifecycleHooks } from './execution-lifecycle-hooks';

View File

@@ -194,7 +194,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
}
async sendResponse(response: IExecuteResponsePromiseData): Promise<void> {
await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]);
await this.additionalData.hooks?.runHook('sendResponse', [response]);
}
/** @deprecated use ISupplyDataFunctions.addInputData */

View File

@@ -258,12 +258,12 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
}
runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData;
await additionalData.hooks?.executeHookFunctions('nodeExecuteBefore', [nodeName]);
await additionalData.hooks?.runHook('nodeExecuteBefore', [nodeName]);
} else {
// Outputs
taskData.executionTime = new Date().getTime() - taskData.startTime;
await additionalData.hooks?.executeHookFunctions('nodeExecuteAfter', [
await additionalData.hooks?.runHook('nodeExecuteAfter', [
nodeName,
taskData,
this.runExecutionData,

View File

@@ -13,6 +13,7 @@ import type {
IExecuteResponsePromiseData,
IRun,
} from 'n8n-workflow';
import assert from 'node:assert';
import type { IGetExecuteTriggerFunctions } from './interfaces';
@@ -47,46 +48,34 @@ export class TriggersAndPollers {
// Add the manual trigger response which resolves when the first time data got emitted
triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => {
const { hooks } = additionalData;
assert.ok(hooks, 'Execution lifecycle hooks are not defined');
triggerFunctions.emit = (
(resolveEmit) =>
(
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun>,
) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
if (responsePromise) {
responsePromise.resolve(response);
}
},
];
if (donePromise) {
additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift(
async (runData: IRun): Promise<void> => {
return donePromise.resolve(runData);
},
);
}
resolveEmit(data);
data: INodeExecutionData[][],
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
donePromise?: IDeferredPromise<IRun>,
) => {
if (responsePromise) {
hooks.addHandler('sendResponse', (response) => responsePromise.resolve(response));
}
)(resolve);
if (donePromise) {
hooks.addHandler('workflowExecuteAfter', (runData) => donePromise.resolve(runData));
}
resolve(data);
};
triggerFunctions.emitError = (
(rejectEmit) =>
(error: Error, responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>) => {
additionalData.hooks!.hookFunctions.sendResponse = [
async (): Promise<void> => {
if (responsePromise) {
responsePromise.reject(error);
}
},
];
rejectEmit(error);
error: Error,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
) => {
if (responsePromise) {
hooks.addHandler('sendResponse', () => responsePromise.reject(error));
}
)(reject);
reject(error);
};
});
return triggerResponse;

View File

@@ -405,19 +405,6 @@ export class WorkflowExecute {
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
}
/**
* Executes the hook with the given name
*
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
async executeHook(hookName: string, parameters: any[]): Promise<void> {
if (this.additionalData.hooks === undefined) {
return;
}
return await this.additionalData.hooks.executeHookFunctions(hookName, parameters);
}
/**
* Merges temporary execution metadata into the final runData structure.
* During workflow execution, metadata is collected in a temporary location
@@ -1207,11 +1194,14 @@ export class WorkflowExecute {
this.status = 'running';
const { hooks, executionId } = this.additionalData;
assert.ok(hooks, 'Failed to run workflow due to missing execution lifecycle hooks');
if (!this.runExecutionData.executionData) {
throw new ApplicationError('Failed to run workflow due to missing execution data', {
extra: {
workflowId: workflow.id,
executionid: this.additionalData.executionId,
executionId,
mode: this.mode,
},
});
@@ -1269,14 +1259,14 @@ export class WorkflowExecute {
this.status = 'canceled';
this.abortController.abort();
const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
void hooks.runHook('workflowExecuteAfter', [fullRunData]);
});
// eslint-disable-next-line complexity
const returnPromise = (async () => {
try {
if (!this.additionalData.restartExecutionId) {
await this.executeHook('workflowExecuteBefore', [workflow, this.runExecutionData]);
await hooks.runHook('workflowExecuteBefore', [workflow, this.runExecutionData]);
}
} catch (error) {
const e = error as unknown as ExecutionBaseError;
@@ -1360,7 +1350,7 @@ export class WorkflowExecute {
node: executionNode.name,
workflowId: workflow.id,
});
await this.executeHook('nodeExecuteBefore', [executionNode.name]);
await hooks.runHook('nodeExecuteBefore', [executionNode.name]);
// Get the index of the current run
runIndex = 0;
@@ -1651,7 +1641,7 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
// Only execute the nodeExecuteAfter hook if the node did not get aborted
if (!this.isCancelled) {
await this.executeHook('nodeExecuteAfter', [
await hooks.runHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@@ -1693,7 +1683,7 @@ export class WorkflowExecute {
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (this.runExecutionData.waitTill) {
await this.executeHook('nodeExecuteAfter', [
await hooks.runHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@@ -1712,7 +1702,7 @@ export class WorkflowExecute {
) {
// Before stopping, make sure we are executing hooks so
// That frontend is notified for example for manual executions.
await this.executeHook('nodeExecuteAfter', [
await hooks.runHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@@ -1822,7 +1812,7 @@ export class WorkflowExecute {
// Execute hooks now to make sure that all hooks are executed properly
// Await is needed to make sure that we don't fall into concurrency problems
// When saving node execution data
await this.executeHook('nodeExecuteAfter', [
await hooks.runHook('nodeExecuteAfter', [
executionNode.name,
taskData,
this.runExecutionData,
@@ -2025,7 +2015,7 @@ export class WorkflowExecute {
this.moveNodeMetadata();
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(
await hooks.runHook('workflowExecuteAfter', [fullRunData, newStaticData]).catch(
// eslint-disable-next-line @typescript-eslint/no-shadow
(error) => {
console.error('There was a problem running hook "workflowExecuteAfter"', error);
@@ -2118,7 +2108,10 @@ export class WorkflowExecute {
this.moveNodeMetadata();
// Prevent from running the hook if the error is an abort error as it was already handled
if (!this.isCancelled) {
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
await this.additionalData.hooks?.runHook('workflowExecuteAfter', [
fullRunData,
newStaticData,
]);
}
if (closeFunction) {

View File

@@ -263,7 +263,7 @@ export async function proxyRequestToAxios(
} else if (body === '') {
body = axiosConfig.responseType === 'arraybuffer' ? Buffer.alloc(0) : undefined;
}
await additionalData?.hooks?.executeHookFunctions('nodeFetchedData', [workflow?.id, node]);
await additionalData?.hooks?.runHook('nodeFetchedData', [workflow?.id, node]);
return configObject.resolveWithFullResponse
? {
body,

View File

@@ -6,7 +6,6 @@ import type {
INodeType,
INodeTypes,
IRun,
ITaskData,
IVersionedNodeType,
IWorkflowBase,
IWorkflowExecuteAdditionalData,
@@ -14,10 +13,11 @@ import type {
WorkflowTestData,
INodeTypeData,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WorkflowHooks } from 'n8n-workflow';
import { ApplicationError, NodeHelpers } from 'n8n-workflow';
import path from 'path';
import { UnrecognizedNodeTypeError } from '@/errors';
import { ExecutionLifecycleHooks } from '@/execution-engine/execution-lifecycle-hooks';
import { predefinedNodesTypes } from './constants';
@@ -53,22 +53,12 @@ export function WorkflowExecuteAdditionalData(
waitPromise: IDeferredPromise<IRun>,
nodeExecutionOrder: string[],
): IWorkflowExecuteAdditionalData {
const hookFunctions = {
nodeExecuteAfter: [
async (nodeName: string, _data: ITaskData): Promise<void> => {
nodeExecutionOrder.push(nodeName);
},
],
workflowExecuteAfter: [
async (fullRunData: IRun): Promise<void> => {
waitPromise.resolve(fullRunData);
},
],
};
return mock<IWorkflowExecuteAdditionalData>({
hooks: new WorkflowHooks(hookFunctions, 'trigger', '1', mock()),
const hooks = new ExecutionLifecycleHooks('trigger', '1', mock());
hooks.addHandler('nodeExecuteAfter', (nodeName) => {
nodeExecutionOrder.push(nodeName);
});
hooks.addHandler('workflowExecuteAfter', (fullRunData) => waitPromise.resolve(fullRunData));
return mock<IWorkflowExecuteAdditionalData>({ hooks });
}
const preparePinData = (pinData: IDataObject) => {