refactor(core): Introduce @OnLifecycleEvent decorator (#14987)

This commit is contained in:
Iván Ovejero
2025-04-30 10:15:56 +02:00
committed by GitHub
parent cf0008500c
commit a88b889309
17 changed files with 458 additions and 143 deletions

View File

@@ -1,32 +0,0 @@
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import type { BaseN8nModule, ExecutionLifecycleHooks } from '../module';
import { ModuleRegistry, N8nModule } from '../module';
let moduleRegistry: ModuleRegistry;
beforeEach(() => {
moduleRegistry = new ModuleRegistry();
});
describe('registerLifecycleHooks', () => {
@N8nModule()
class TestModule implements BaseN8nModule {
registerLifecycleHooks() {}
}
test('is called when ModuleRegistry.registerLifecycleHooks is called', () => {
// ARRANGE
const hooks = mock<ExecutionLifecycleHooks>();
const instance = Container.get(TestModule);
jest.spyOn(instance, 'registerLifecycleHooks');
// ACT
moduleRegistry.registerLifecycleHooks(hooks);
// ASSERT
expect(instance.registerLifecycleHooks).toHaveBeenCalledTimes(1);
expect(instance.registerLifecycleHooks).toHaveBeenCalledWith(hooks);
});
});

View File

@@ -0,0 +1,126 @@
import { Container, Service } from '@n8n/di';
import { NonMethodError } from '../errors';
import { LifecycleMetadata } from '../lifecycle-metadata';
import { OnLifecycleEvent } from '../on-lifecycle-event';
describe('OnLifecycleEvent', () => {
let lifecycleMetadata: LifecycleMetadata;
beforeEach(() => {
lifecycleMetadata = new LifecycleMetadata();
Container.set(LifecycleMetadata, lifecycleMetadata);
jest.spyOn(lifecycleMetadata, 'register');
});
it('should register a method decorated with OnLifecycleEvent', () => {
@Service()
class TestService {
@OnLifecycleEvent('nodeExecuteBefore')
async handleNodeExecuteBefore() {}
}
expect(lifecycleMetadata.register).toHaveBeenCalledTimes(1);
expect(lifecycleMetadata.register).toHaveBeenCalledWith({
handlerClass: TestService,
methodName: 'handleNodeExecuteBefore',
eventName: 'nodeExecuteBefore',
});
});
it('should register methods for all lifecycle event types', () => {
@Service()
// @ts-expect-error Testing
class TestService {
@OnLifecycleEvent('nodeExecuteBefore')
async handleNodeExecuteBefore() {}
@OnLifecycleEvent('nodeExecuteAfter')
async handleNodeExecuteAfter() {}
@OnLifecycleEvent('workflowExecuteBefore')
async handleWorkflowExecuteBefore() {}
@OnLifecycleEvent('workflowExecuteAfter')
async handleWorkflowExecuteAfter() {}
}
expect(lifecycleMetadata.register).toHaveBeenCalledTimes(4);
expect(lifecycleMetadata.register).toHaveBeenCalledWith(
expect.objectContaining({ eventName: 'nodeExecuteBefore' }),
);
expect(lifecycleMetadata.register).toHaveBeenCalledWith(
expect.objectContaining({ eventName: 'nodeExecuteAfter' }),
);
expect(lifecycleMetadata.register).toHaveBeenCalledWith(
expect.objectContaining({ eventName: 'workflowExecuteBefore' }),
);
expect(lifecycleMetadata.register).toHaveBeenCalledWith(
expect.objectContaining({ eventName: 'workflowExecuteAfter' }),
);
});
it('should register multiple handlers in the same class', () => {
@Service()
class TestService {
@OnLifecycleEvent('nodeExecuteBefore')
async handleNodeExecuteBefore1() {}
@OnLifecycleEvent('nodeExecuteBefore')
async handleNodeExecuteBefore2() {}
}
expect(lifecycleMetadata.register).toHaveBeenCalledTimes(2);
expect(lifecycleMetadata.register).toHaveBeenCalledWith({
handlerClass: TestService,
methodName: 'handleNodeExecuteBefore1',
eventName: 'nodeExecuteBefore',
});
expect(lifecycleMetadata.register).toHaveBeenCalledWith({
handlerClass: TestService,
methodName: 'handleNodeExecuteBefore2',
eventName: 'nodeExecuteBefore',
});
});
it('should throw an error if the decorated target is not a method', () => {
expect(() => {
@Service()
class TestService {
// @ts-expect-error Testing invalid code
@OnLifecycleEvent('nodeExecuteBefore')
notAFunction = 'string';
}
new TestService();
}).toThrow(NonMethodError);
});
it('should register handlers from multiple service classes', () => {
@Service()
class FirstService {
@OnLifecycleEvent('nodeExecuteBefore')
async handleNodeExecuteBefore() {}
}
@Service()
class SecondService {
@OnLifecycleEvent('workflowExecuteAfter')
async handleWorkflowExecuteAfter() {}
}
expect(lifecycleMetadata.register).toHaveBeenCalledTimes(2);
expect(lifecycleMetadata.register).toHaveBeenCalledWith(
expect.objectContaining({
handlerClass: FirstService,
eventName: 'nodeExecuteBefore',
}),
);
expect(lifecycleMetadata.register).toHaveBeenCalledWith(
expect.objectContaining({
handlerClass: SecondService,
eventName: 'workflowExecuteAfter',
}),
);
});
});

View File

@@ -2,9 +2,10 @@ import { Container } from '@n8n/di';
import { Service } from '@n8n/di';
import { EventEmitter } from 'node:events';
import { NonMethodError } from '../errors';
import { MultiMainMetadata } from '../multi-main-metadata';
import { LEADER_TAKEOVER_EVENT_NAME, LEADER_STEPDOWN_EVENT_NAME } from '../multi-main-metadata';
import { NonMethodError, OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event';
import { OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event';
class MockMultiMainSetup extends EventEmitter {
registerEventHandlers() {

View File

@@ -0,0 +1,7 @@
import { UnexpectedError } from 'n8n-workflow';
export class NonMethodError extends UnexpectedError {
constructor(name: string) {
super(`${name} must be a method on a class to use this decorator`);
}
}

View File

@@ -11,13 +11,22 @@ export {
LOWEST_SHUTDOWN_PRIORITY,
} from './shutdown/constants';
export { ShutdownRegistryMetadata } from './shutdown-registry-metadata';
export { ModuleRegistry } from './module';
export { OnShutdown } from './on-shutdown';
export { Redactable } from './redactable';
export { BaseN8nModule, N8nModule } from './module';
export { ModuleMetadata } from './module-metadata';
export { Debounce } from './debounce';
export type { AccessScope, Controller, RateLimit } from './types';
export type { ShutdownHandler } from './types';
export { MultiMainMetadata } from './multi-main-metadata';
export { OnLeaderTakeover, OnLeaderStepdown } from './on-multi-main-event';
export { Memoized } from './memoized';
export { OnLifecycleEvent } from './on-lifecycle-event';
export type {
LifecycleContext,
NodeExecuteBeforeContext,
NodeExecuteAfterContext,
WorkflowExecuteBeforeContext,
WorkflowExecuteAfterContext,
} from './lifecycle-metadata';
export { LifecycleMetadata } from './lifecycle-metadata';

View File

@@ -0,0 +1,78 @@
import { Service } from '@n8n/di';
import type {
IDataObject,
IRun,
IRunExecutionData,
ITaskData,
ITaskStartedData,
IWorkflowBase,
Workflow,
} from 'n8n-workflow';
import type { Class } from './types';
export type LifecycleHandlerClass = Class<
Record<string, (ctx: LifecycleContext) => Promise<void> | void>
>;
export type NodeExecuteBeforeContext = {
type: 'nodeExecuteBefore';
workflow: IWorkflowBase;
nodeName: string;
taskData: ITaskStartedData;
};
export type NodeExecuteAfterContext = {
type: 'nodeExecuteAfter';
workflow: IWorkflowBase;
nodeName: string;
taskData: ITaskData;
executionData: IRunExecutionData;
};
export type WorkflowExecuteBeforeContext = {
type: 'workflowExecuteBefore';
workflow: IWorkflowBase;
workflowInstance: Workflow;
executionData?: IRunExecutionData;
};
export type WorkflowExecuteAfterContext = {
type: 'workflowExecuteAfter';
workflow: IWorkflowBase;
runData: IRun;
newStaticData: IDataObject;
};
/** Context arg passed to a lifecycle event handler method. */
export type LifecycleContext =
| NodeExecuteBeforeContext
| NodeExecuteAfterContext
| WorkflowExecuteBeforeContext
| WorkflowExecuteAfterContext;
type LifecycleHandler = {
/** Class holding the method to call on a lifecycle event. */
handlerClass: LifecycleHandlerClass;
/** Name of the method to call on a lifecycle event. */
methodName: string;
/** Name of the lifecycle event to listen to. */
eventName: LifecycleEvent;
};
export type LifecycleEvent = LifecycleContext['type'];
@Service()
export class LifecycleMetadata {
private readonly handlers: LifecycleHandler[] = [];
register(handler: LifecycleHandler) {
this.handlers.push(handler);
}
getHandlers(): LifecycleHandler[] {
return this.handlers;
}
}

View File

@@ -0,0 +1,16 @@
import { Service } from '@n8n/di';
import type { Module } from './module';
@Service()
export class ModuleMetadata {
private readonly modules: Set<Module> = new Set();
register(module: Module) {
this.modules.add(module);
}
getModules() {
return this.modules.keys();
}
}

View File

@@ -1,43 +1,16 @@
import { Container, Service, type Constructable } from '@n8n/di';
/**
* @TODO Temporary dummy type until `ExecutionLifecycleHooks` registers hooks via decorators.
*/
export type ExecutionLifecycleHooks = object;
import { ModuleMetadata } from './module-metadata';
export interface BaseN8nModule {
initialize?(): void;
registerLifecycleHooks?(hooks: ExecutionLifecycleHooks): void;
}
type Module = Constructable<BaseN8nModule>;
export const registry = new Set<Module>();
export type Module = Constructable<BaseN8nModule>;
export const N8nModule = (): ClassDecorator => (target) => {
registry.add(target as unknown as Module);
Container.get(ModuleMetadata).register(target as unknown as Module);
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return Service()(target);
};
@Service()
export class ModuleRegistry {
initializeModules() {
for (const ModuleClass of registry.keys()) {
const instance = Container.get(ModuleClass);
if (instance.initialize) {
instance.initialize();
}
}
}
registerLifecycleHooks(hooks: ExecutionLifecycleHooks) {
for (const ModuleClass of registry.keys()) {
const instance = Container.get(ModuleClass);
if (instance.registerLifecycleHooks) {
instance.registerLifecycleHooks(hooks);
}
}
}
}

View File

@@ -0,0 +1,38 @@
import { Container } from '@n8n/di';
import { NonMethodError } from './errors';
import type { LifecycleEvent, LifecycleHandlerClass } from './lifecycle-metadata';
import { LifecycleMetadata } from './lifecycle-metadata';
/**
* Decorator that registers a method to be called when a specific lifecycle event occurs.
* For more information, see `execution-lifecyle-hooks.ts` in `cli` and `core`.
*
* @example
*
* ```ts
* @Service()
* class MyService {
* @OnLifecycleEvent('workflowExecuteAfter')
* async handleEvent(ctx: WorkflowExecuteAfterContext) {
* // ...
* }
* }
* ```
*/
export const OnLifecycleEvent =
(eventName: LifecycleEvent): MethodDecorator =>
(prototype, propertyKey, descriptor) => {
const handlerClass = prototype.constructor as LifecycleHandlerClass;
const methodName = String(propertyKey);
if (typeof descriptor?.value !== 'function') {
throw new NonMethodError(`${handlerClass.name}.${methodName}()`);
}
Container.get(LifecycleMetadata).register({
handlerClass,
methodName,
eventName,
});
};

View File

@@ -1,6 +1,6 @@
import { Container } from '@n8n/di';
import { UnexpectedError } from 'n8n-workflow';
import { NonMethodError } from './errors';
import type { EventHandlerClass, MultiMainEvent } from './multi-main-metadata';
import {
LEADER_TAKEOVER_EVENT_NAME,
@@ -8,12 +8,6 @@ import {
MultiMainMetadata,
} from './multi-main-metadata';
export class NonMethodError extends UnexpectedError {
constructor(name: string) {
super(`${name} must be a method on a class to use this decorator`);
}
}
const OnMultiMainEvent =
(eventName: MultiMainEvent): MethodDecorator =>
(prototype, propertyKey, descriptor) => {

View File

@@ -1,7 +1,6 @@
import 'reflect-metadata';
import { GlobalConfig } from '@n8n/config';
import { LICENSE_FEATURES } from '@n8n/constants';
import { ModuleRegistry } from '@n8n/decorators';
import { Container } from '@n8n/di';
import { Command, Errors } from '@oclif/core';
import {
@@ -30,6 +29,7 @@ import { ExternalHooks } from '@/external-hooks';
import { ExternalSecretsManager } from '@/external-secrets.ee/external-secrets-manager.ee';
import { License } from '@/license';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { ModuleRegistry } from '@/modules/module-registry';
import type { ModulePreInit } from '@/modules/modules.config';
import { ModulesConfig } from '@/modules/modules.config';
import { NodeTypes } from '@/node-types';

View File

@@ -1,4 +1,3 @@
import { ModuleRegistry } from '@n8n/decorators';
import { Container } from '@n8n/di';
import { stringify } from 'flatted';
import { ErrorReporter, Logger, InstanceSettings, ExecutionLifecycleHooks } from 'n8n-core';
@@ -11,6 +10,7 @@ import type {
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { EventService } from '@/events/event.service';
import { ExternalHooks } from '@/external-hooks';
import { ModuleRegistry } from '@/modules/module-registry';
import { Push } from '@/push';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { isWorkflowIdValid } from '@/utils';

View File

@@ -1,9 +1,9 @@
import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
import { Container } from '@n8n/di';
import { In, type EntityManager } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import type { Logger } from 'n8n-core';
import { type ExecutionLifecycleHooks } from 'n8n-core';
import {
createDeferredPromise,
type ExecutionStatus,
@@ -76,10 +76,10 @@ describe('workflowExecuteAfterHandler', () => {
{ status: 'crashed', type: 'failure' },
])('stores events for executions with the status `$status`', async ({ status, type }) => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
ctx.runData = mock<IRun>({
mode: 'webhook',
status,
startedAt: startedAt.toJSDate(),
@@ -88,7 +88,7 @@ describe('workflowExecuteAfterHandler', () => {
// ACT
const now = DateTime.utc().toJSDate();
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT
@@ -143,10 +143,10 @@ describe('workflowExecuteAfterHandler', () => {
{ status: 'running' },
])('does not store events for executions with the status `$status`', async ({ status }) => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
ctx.runData = mock<IRun>({
mode: 'webhook',
status,
startedAt: startedAt.toJSDate(),
@@ -154,7 +154,7 @@ describe('workflowExecuteAfterHandler', () => {
});
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT
@@ -170,10 +170,10 @@ describe('workflowExecuteAfterHandler', () => {
{ mode: 'integrated' },
])('does not store events for executions with the mode `$mode`', async ({ mode }) => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
ctx.runData = mock<IRun>({
mode,
status: 'success',
startedAt: startedAt.toJSDate(),
@@ -181,7 +181,7 @@ describe('workflowExecuteAfterHandler', () => {
});
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT
@@ -200,10 +200,10 @@ describe('workflowExecuteAfterHandler', () => {
{ mode: 'webhook' },
])('stores events for executions with the mode `$mode`', async ({ mode }) => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
ctx.runData = mock<IRun>({
mode,
status: 'success',
startedAt: startedAt.toJSDate(),
@@ -211,7 +211,7 @@ describe('workflowExecuteAfterHandler', () => {
});
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT
@@ -259,7 +259,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
const runData = mock<IRun>({
mode: 'webhook',
status: 'success',
startedAt: startedAt.toJSDate(),
@@ -316,12 +316,13 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
test('reuses cached metadata for subsequent executions of the same workflow', async () => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({
workflowData: { ...workflow, settings: undefined },
const ctx = mock<WorkflowExecuteAfterContext>({
workflow: { ...workflow, settings: undefined },
runData,
});
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT
@@ -343,7 +344,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
);
// ACT AGAIN with the same workflow
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT AGAIN
@@ -355,10 +356,10 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
test('updates cached metadata if workflow details change', async () => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT
@@ -369,7 +370,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => {
workflow.name = 'new-workflow-name';
// ACT AGAIN with the same workflow
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await insightsCollectionService.flushEvents();
// ASSERT AGAIN
@@ -410,7 +411,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
const runData = mock<IRun>({
mode: 'trigger',
status: 'success',
startedAt: startedAt.toJSDate(),
@@ -463,13 +464,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
test('flushes events to the database once buffer is full', async () => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
// ACT
// each `workflowExecuteAfterHandler` adds 3 insights (status, runtime, time saved);
// we call it 333 times be 1 away from the flushBatchSize (1000)
for (let i = 0; i < 333; i++) {
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
// await for the next tick to ensure the flush is called
await new Promise(process.nextTick);
@@ -478,7 +479,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
expect(trxMock.insert).not.toHaveBeenCalled();
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
// ASSERT
// await for the next tick to ensure the flush is called
@@ -491,12 +492,12 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
jest.useFakeTimers();
trxMock.insert.mockClear();
insightsCollectionService.startFlushingTimer();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
try {
// ACT
for (let i = 0; i < 33; i++) {
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
// ASSERT
expect(trxMock.insert).not.toHaveBeenCalled();
@@ -516,18 +517,18 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
jest.useFakeTimers();
trxMock.insert.mockClear();
insightsCollectionService.startFlushingTimer();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow });
try {
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
// // ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await jest.advanceTimersByTimeAsync(31 * 1000);
expect(trxMock.insert).toHaveBeenCalledTimes(2);
@@ -562,11 +563,11 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
test('flushes events to the database on shutdown', async () => {
// ARRANGE
trxMock.insert.mockClear();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
// ACT
for (let i = 0; i < 10; i++) {
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
await insightsCollectionService.shutdown();
@@ -583,16 +584,16 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
// reset insights async flushing
insightsCollectionService.startFlushingTimer();
trxMock.insert.mockClear();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
// ACT
for (let i = 0; i < 10; i++) {
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
void insightsCollectionService.shutdown();
// trigger a workflow after shutdown
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(2);
@@ -615,11 +616,11 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
trxMock.insert.mockClear();
trxMock.insert.mockRejectedValueOnce(new Error('Test error'));
insightsCollectionService.startFlushingTimer();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
try {
// ACT
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
@@ -646,7 +647,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
insightsCollectionService.startFlushingTimer();
trxMock.insert.mockClear();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const ctx = mock<WorkflowExecuteAfterContext>({ workflow, runData });
// Flush will hang until we manually resolve it
const { resolve: flushResolve, promise: flushPromise } = createDeferredPromise();
@@ -659,7 +660,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
// Each `workflowExecuteAfterHandler` adds 3 insights;
// we call it 4 times to exceed the flushBatchSize (10)
for (let i = 0; i < config.flushBatchSize / 3; i++) {
await insightsCollectionService.workflowExecuteAfterHandler(ctx, run);
await insightsCollectionService.handleWorkflowExecuteAfter(ctx);
}
// ACT

View File

@@ -1,14 +1,9 @@
import { OnLifecycleEvent, type WorkflowExecuteAfterContext } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { In } from '@n8n/typeorm';
import { DateTime } from 'luxon';
import { Logger } from 'n8n-core';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import {
UnexpectedError,
type ExecutionStatus,
type IRun,
type WorkflowExecuteMode,
} from 'n8n-workflow';
import { UnexpectedError, type ExecutionStatus, type WorkflowExecuteMode } from 'n8n-workflow';
import { SharedWorkflow } from '@/databases/entities/shared-workflow';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
@@ -103,16 +98,17 @@ export class InsightsCollectionService {
await Promise.all([...this.flushesInProgress, this.flushEvents()]);
}
async workflowExecuteAfterHandler(ctx: ExecutionLifecycleHooks, fullRunData: IRun) {
if (shouldSkipStatus[fullRunData.status] || shouldSkipMode[fullRunData.mode]) {
@OnLifecycleEvent('workflowExecuteAfter')
async handleWorkflowExecuteAfter(ctx: WorkflowExecuteAfterContext) {
if (shouldSkipStatus[ctx.runData.status] || shouldSkipMode[ctx.runData.mode]) {
return;
}
const status = fullRunData.status === 'success' ? 'success' : 'failure';
const status = ctx.runData.status === 'success' ? 'success' : 'failure';
const commonWorkflowData = {
workflowId: ctx.workflowData.id,
workflowName: ctx.workflowData.name,
workflowId: ctx.workflow.id,
workflowName: ctx.workflow.name,
timestamp: DateTime.utc().toJSDate(),
};
@@ -124,8 +120,8 @@ export class InsightsCollectionService {
});
// run time event
if (fullRunData.stoppedAt) {
const value = fullRunData.stoppedAt.getTime() - fullRunData.startedAt.getTime();
if (ctx.runData.stoppedAt) {
const value = ctx.runData.stoppedAt.getTime() - ctx.runData.startedAt.getTime();
this.bufferedInsights.add({
...commonWorkflowData,
type: 'runtime_ms',
@@ -134,11 +130,11 @@ export class InsightsCollectionService {
}
// time saved event
if (status === 'success' && ctx.workflowData.settings?.timeSavedPerExecution) {
if (status === 'success' && ctx.workflow.settings?.timeSavedPerExecution) {
this.bufferedInsights.add({
...commonWorkflowData,
type: 'time_saved_min',
value: ctx.workflowData.settings.timeSavedPerExecution,
value: ctx.workflow.settings.timeSavedPerExecution,
});
}

View File

@@ -1,6 +1,5 @@
import type { BaseN8nModule } from '@n8n/decorators';
import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import { InstanceSettings, Logger } from 'n8n-core';
import { InsightsService } from './insights.service';
@@ -25,14 +24,6 @@ export class InsightsModule implements BaseN8nModule {
}
}
registerLifecycleHooks(hooks: ExecutionLifecycleHooks) {
const insightsService = this.insightsService;
hooks.addHandler('workflowExecuteAfter', async function (fullRunData) {
await insightsService.workflowExecuteAfterHandler(this, fullRunData);
});
}
@OnLeaderTakeover()
startBackgroundProcess() {
this.insightsService.startBackgroundProcess();

View File

@@ -6,8 +6,7 @@ import {
import { OnShutdown } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { Logger } from 'n8n-core';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import { UserError, type IRun } from 'n8n-workflow';
import { UserError } from 'n8n-workflow';
import { License } from '@/license';
@@ -55,10 +54,6 @@ export class InsightsService {
this.compactionService.stopCompactionTimer();
}
async workflowExecuteAfterHandler(ctx: ExecutionLifecycleHooks, fullRunData: IRun) {
await this.collectionService.workflowExecuteAfterHandler(ctx, fullRunData);
}
async getInsightsSummary({
periodLengthInDays,
}: { periodLengthInDays: number }): Promise<InsightsSummary> {

View File

@@ -0,0 +1,122 @@
import type { LifecycleContext } from '@n8n/decorators';
import { LifecycleMetadata, ModuleMetadata } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import type {
IDataObject,
IRun,
IRunExecutionData,
ITaskData,
ITaskStartedData,
IWorkflowBase,
Workflow,
} from 'n8n-workflow';
@Service()
export class ModuleRegistry {
constructor(
private readonly moduleMetadata: ModuleMetadata,
private readonly lifecycleMetadata: LifecycleMetadata,
) {}
initializeModules() {
for (const ModuleClass of this.moduleMetadata.getModules()) {
Container.get(ModuleClass).initialize?.();
}
}
registerLifecycleHooks(hooks: ExecutionLifecycleHooks) {
const handlers = this.lifecycleMetadata.getHandlers();
for (const { handlerClass, methodName, eventName } of handlers) {
const instance = Container.get(handlerClass);
switch (eventName) {
case 'workflowExecuteAfter':
hooks.addHandler(
eventName,
async function (
this: { workflowData: IWorkflowBase },
runData: IRun,
newStaticData: IDataObject,
) {
const context: LifecycleContext = {
type: 'workflowExecuteAfter',
workflow: this.workflowData,
runData,
newStaticData,
};
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await
return await instance[methodName].call(instance, context);
},
);
break;
case 'nodeExecuteBefore':
hooks.addHandler(
eventName,
async function (
this: { workflowData: IWorkflowBase },
nodeName: string,
taskData: ITaskStartedData,
) {
const context: LifecycleContext = {
type: 'nodeExecuteBefore',
workflow: this.workflowData,
nodeName,
taskData,
};
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await
return await instance[methodName].call(instance, context);
},
);
break;
case 'nodeExecuteAfter':
hooks.addHandler(
eventName,
async function (
this: { workflowData: IWorkflowBase },
nodeName: string,
taskData: ITaskData,
executionData: IRunExecutionData,
) {
const context: LifecycleContext = {
type: 'nodeExecuteAfter',
workflow: this.workflowData,
nodeName,
taskData,
executionData,
};
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await
return await instance[methodName].call(instance, context);
},
);
break;
case 'workflowExecuteBefore':
hooks.addHandler(
eventName,
async function (
this: { workflowData: IWorkflowBase },
workflowInstance: Workflow,
executionData?: IRunExecutionData,
) {
const context: LifecycleContext = {
type: 'workflowExecuteBefore',
workflow: this.workflowData,
workflowInstance,
executionData,
};
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await
return await instance[methodName].call(instance, context);
},
);
break;
}
}
}
}