diff --git a/packages/@n8n/decorators/src/__tests__/module.test.ts b/packages/@n8n/decorators/src/__tests__/module.test.ts deleted file mode 100644 index 540690f44a..0000000000 --- a/packages/@n8n/decorators/src/__tests__/module.test.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { Container } from '@n8n/di'; -import { mock } from 'jest-mock-extended'; - -import type { BaseN8nModule, ExecutionLifecycleHooks } from '../module'; -import { ModuleRegistry, N8nModule } from '../module'; - -let moduleRegistry: ModuleRegistry; - -beforeEach(() => { - moduleRegistry = new ModuleRegistry(); -}); - -describe('registerLifecycleHooks', () => { - @N8nModule() - class TestModule implements BaseN8nModule { - registerLifecycleHooks() {} - } - - test('is called when ModuleRegistry.registerLifecycleHooks is called', () => { - // ARRANGE - const hooks = mock(); - const instance = Container.get(TestModule); - jest.spyOn(instance, 'registerLifecycleHooks'); - - // ACT - moduleRegistry.registerLifecycleHooks(hooks); - - // ASSERT - expect(instance.registerLifecycleHooks).toHaveBeenCalledTimes(1); - expect(instance.registerLifecycleHooks).toHaveBeenCalledWith(hooks); - }); -}); diff --git a/packages/@n8n/decorators/src/__tests__/on-lifecycle-event.test.ts b/packages/@n8n/decorators/src/__tests__/on-lifecycle-event.test.ts new file mode 100644 index 0000000000..be7258f5b3 --- /dev/null +++ b/packages/@n8n/decorators/src/__tests__/on-lifecycle-event.test.ts @@ -0,0 +1,126 @@ +import { Container, Service } from '@n8n/di'; + +import { NonMethodError } from '../errors'; +import { LifecycleMetadata } from '../lifecycle-metadata'; +import { OnLifecycleEvent } from '../on-lifecycle-event'; + +describe('OnLifecycleEvent', () => { + let lifecycleMetadata: LifecycleMetadata; + + beforeEach(() => { + lifecycleMetadata = new LifecycleMetadata(); + Container.set(LifecycleMetadata, lifecycleMetadata); + jest.spyOn(lifecycleMetadata, 'register'); + }); + + it('should register a method decorated with OnLifecycleEvent', () => { + @Service() + class TestService { + @OnLifecycleEvent('nodeExecuteBefore') + async handleNodeExecuteBefore() {} + } + + expect(lifecycleMetadata.register).toHaveBeenCalledTimes(1); + expect(lifecycleMetadata.register).toHaveBeenCalledWith({ + handlerClass: TestService, + methodName: 'handleNodeExecuteBefore', + eventName: 'nodeExecuteBefore', + }); + }); + + it('should register methods for all lifecycle event types', () => { + @Service() + // @ts-expect-error Testing + class TestService { + @OnLifecycleEvent('nodeExecuteBefore') + async handleNodeExecuteBefore() {} + + @OnLifecycleEvent('nodeExecuteAfter') + async handleNodeExecuteAfter() {} + + @OnLifecycleEvent('workflowExecuteBefore') + async handleWorkflowExecuteBefore() {} + + @OnLifecycleEvent('workflowExecuteAfter') + async handleWorkflowExecuteAfter() {} + } + + expect(lifecycleMetadata.register).toHaveBeenCalledTimes(4); + expect(lifecycleMetadata.register).toHaveBeenCalledWith( + expect.objectContaining({ eventName: 'nodeExecuteBefore' }), + ); + expect(lifecycleMetadata.register).toHaveBeenCalledWith( + expect.objectContaining({ eventName: 'nodeExecuteAfter' }), + ); + expect(lifecycleMetadata.register).toHaveBeenCalledWith( + expect.objectContaining({ eventName: 'workflowExecuteBefore' }), + ); + expect(lifecycleMetadata.register).toHaveBeenCalledWith( + expect.objectContaining({ eventName: 'workflowExecuteAfter' }), + ); + }); + + it('should register multiple handlers in the same class', () => { + @Service() + class TestService { + @OnLifecycleEvent('nodeExecuteBefore') + async handleNodeExecuteBefore1() {} + + @OnLifecycleEvent('nodeExecuteBefore') + async handleNodeExecuteBefore2() {} + } + + expect(lifecycleMetadata.register).toHaveBeenCalledTimes(2); + expect(lifecycleMetadata.register).toHaveBeenCalledWith({ + handlerClass: TestService, + methodName: 'handleNodeExecuteBefore1', + eventName: 'nodeExecuteBefore', + }); + expect(lifecycleMetadata.register).toHaveBeenCalledWith({ + handlerClass: TestService, + methodName: 'handleNodeExecuteBefore2', + eventName: 'nodeExecuteBefore', + }); + }); + + it('should throw an error if the decorated target is not a method', () => { + expect(() => { + @Service() + class TestService { + // @ts-expect-error Testing invalid code + @OnLifecycleEvent('nodeExecuteBefore') + notAFunction = 'string'; + } + + new TestService(); + }).toThrow(NonMethodError); + }); + + it('should register handlers from multiple service classes', () => { + @Service() + class FirstService { + @OnLifecycleEvent('nodeExecuteBefore') + async handleNodeExecuteBefore() {} + } + + @Service() + class SecondService { + @OnLifecycleEvent('workflowExecuteAfter') + async handleWorkflowExecuteAfter() {} + } + + expect(lifecycleMetadata.register).toHaveBeenCalledTimes(2); + expect(lifecycleMetadata.register).toHaveBeenCalledWith( + expect.objectContaining({ + handlerClass: FirstService, + eventName: 'nodeExecuteBefore', + }), + ); + expect(lifecycleMetadata.register).toHaveBeenCalledWith( + expect.objectContaining({ + handlerClass: SecondService, + eventName: 'workflowExecuteAfter', + }), + ); + }); +}); diff --git a/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts b/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts index 96a36c53e7..efa5a65abf 100644 --- a/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts +++ b/packages/@n8n/decorators/src/__tests__/on-multi-main-event.test.ts @@ -2,9 +2,10 @@ import { Container } from '@n8n/di'; import { Service } from '@n8n/di'; import { EventEmitter } from 'node:events'; +import { NonMethodError } from '../errors'; import { MultiMainMetadata } from '../multi-main-metadata'; import { LEADER_TAKEOVER_EVENT_NAME, LEADER_STEPDOWN_EVENT_NAME } from '../multi-main-metadata'; -import { NonMethodError, OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event'; +import { OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event'; class MockMultiMainSetup extends EventEmitter { registerEventHandlers() { diff --git a/packages/@n8n/decorators/src/errors.ts b/packages/@n8n/decorators/src/errors.ts new file mode 100644 index 0000000000..ebe714c632 --- /dev/null +++ b/packages/@n8n/decorators/src/errors.ts @@ -0,0 +1,7 @@ +import { UnexpectedError } from 'n8n-workflow'; + +export class NonMethodError extends UnexpectedError { + constructor(name: string) { + super(`${name} must be a method on a class to use this decorator`); + } +} diff --git a/packages/@n8n/decorators/src/index.ts b/packages/@n8n/decorators/src/index.ts index e1e2f7eea7..1a3f08a6ce 100644 --- a/packages/@n8n/decorators/src/index.ts +++ b/packages/@n8n/decorators/src/index.ts @@ -11,13 +11,22 @@ export { LOWEST_SHUTDOWN_PRIORITY, } from './shutdown/constants'; export { ShutdownRegistryMetadata } from './shutdown-registry-metadata'; -export { ModuleRegistry } from './module'; export { OnShutdown } from './on-shutdown'; export { Redactable } from './redactable'; export { BaseN8nModule, N8nModule } from './module'; +export { ModuleMetadata } from './module-metadata'; export { Debounce } from './debounce'; export type { AccessScope, Controller, RateLimit } from './types'; export type { ShutdownHandler } from './types'; export { MultiMainMetadata } from './multi-main-metadata'; export { OnLeaderTakeover, OnLeaderStepdown } from './on-multi-main-event'; export { Memoized } from './memoized'; +export { OnLifecycleEvent } from './on-lifecycle-event'; +export type { + LifecycleContext, + NodeExecuteBeforeContext, + NodeExecuteAfterContext, + WorkflowExecuteBeforeContext, + WorkflowExecuteAfterContext, +} from './lifecycle-metadata'; +export { LifecycleMetadata } from './lifecycle-metadata'; diff --git a/packages/@n8n/decorators/src/lifecycle-metadata.ts b/packages/@n8n/decorators/src/lifecycle-metadata.ts new file mode 100644 index 0000000000..90aaa5d3ad --- /dev/null +++ b/packages/@n8n/decorators/src/lifecycle-metadata.ts @@ -0,0 +1,78 @@ +import { Service } from '@n8n/di'; +import type { + IDataObject, + IRun, + IRunExecutionData, + ITaskData, + ITaskStartedData, + IWorkflowBase, + Workflow, +} from 'n8n-workflow'; + +import type { Class } from './types'; + +export type LifecycleHandlerClass = Class< + Record Promise | void> +>; + +export type NodeExecuteBeforeContext = { + type: 'nodeExecuteBefore'; + workflow: IWorkflowBase; + nodeName: string; + taskData: ITaskStartedData; +}; + +export type NodeExecuteAfterContext = { + type: 'nodeExecuteAfter'; + workflow: IWorkflowBase; + nodeName: string; + taskData: ITaskData; + executionData: IRunExecutionData; +}; + +export type WorkflowExecuteBeforeContext = { + type: 'workflowExecuteBefore'; + workflow: IWorkflowBase; + workflowInstance: Workflow; + executionData?: IRunExecutionData; +}; + +export type WorkflowExecuteAfterContext = { + type: 'workflowExecuteAfter'; + workflow: IWorkflowBase; + runData: IRun; + newStaticData: IDataObject; +}; + +/** Context arg passed to a lifecycle event handler method. */ +export type LifecycleContext = + | NodeExecuteBeforeContext + | NodeExecuteAfterContext + | WorkflowExecuteBeforeContext + | WorkflowExecuteAfterContext; + +type LifecycleHandler = { + /** Class holding the method to call on a lifecycle event. */ + handlerClass: LifecycleHandlerClass; + + /** Name of the method to call on a lifecycle event. */ + methodName: string; + + /** Name of the lifecycle event to listen to. */ + eventName: LifecycleEvent; +}; + +export type LifecycleEvent = LifecycleContext['type']; + +@Service() +export class LifecycleMetadata { + private readonly handlers: LifecycleHandler[] = []; + + register(handler: LifecycleHandler) { + this.handlers.push(handler); + } + + getHandlers(): LifecycleHandler[] { + return this.handlers; + } +} diff --git a/packages/@n8n/decorators/src/module-metadata.ts b/packages/@n8n/decorators/src/module-metadata.ts new file mode 100644 index 0000000000..891dedfebe --- /dev/null +++ b/packages/@n8n/decorators/src/module-metadata.ts @@ -0,0 +1,16 @@ +import { Service } from '@n8n/di'; + +import type { Module } from './module'; + +@Service() +export class ModuleMetadata { + private readonly modules: Set = new Set(); + + register(module: Module) { + this.modules.add(module); + } + + getModules() { + return this.modules.keys(); + } +} diff --git a/packages/@n8n/decorators/src/module.ts b/packages/@n8n/decorators/src/module.ts index 0969114e7f..a5a30b6e39 100644 --- a/packages/@n8n/decorators/src/module.ts +++ b/packages/@n8n/decorators/src/module.ts @@ -1,43 +1,16 @@ import { Container, Service, type Constructable } from '@n8n/di'; -/** - * @TODO Temporary dummy type until `ExecutionLifecycleHooks` registers hooks via decorators. - */ -export type ExecutionLifecycleHooks = object; +import { ModuleMetadata } from './module-metadata'; export interface BaseN8nModule { initialize?(): void; - registerLifecycleHooks?(hooks: ExecutionLifecycleHooks): void; } -type Module = Constructable; - -export const registry = new Set(); +export type Module = Constructable; export const N8nModule = (): ClassDecorator => (target) => { - registry.add(target as unknown as Module); + Container.get(ModuleMetadata).register(target as unknown as Module); // eslint-disable-next-line @typescript-eslint/no-unsafe-return return Service()(target); }; - -@Service() -export class ModuleRegistry { - initializeModules() { - for (const ModuleClass of registry.keys()) { - const instance = Container.get(ModuleClass); - if (instance.initialize) { - instance.initialize(); - } - } - } - - registerLifecycleHooks(hooks: ExecutionLifecycleHooks) { - for (const ModuleClass of registry.keys()) { - const instance = Container.get(ModuleClass); - if (instance.registerLifecycleHooks) { - instance.registerLifecycleHooks(hooks); - } - } - } -} diff --git a/packages/@n8n/decorators/src/on-lifecycle-event.ts b/packages/@n8n/decorators/src/on-lifecycle-event.ts new file mode 100644 index 0000000000..8d31a5c16e --- /dev/null +++ b/packages/@n8n/decorators/src/on-lifecycle-event.ts @@ -0,0 +1,38 @@ +import { Container } from '@n8n/di'; + +import { NonMethodError } from './errors'; +import type { LifecycleEvent, LifecycleHandlerClass } from './lifecycle-metadata'; +import { LifecycleMetadata } from './lifecycle-metadata'; + +/** + * Decorator that registers a method to be called when a specific lifecycle event occurs. + * For more information, see `execution-lifecyle-hooks.ts` in `cli` and `core`. + * + * @example + * + * ```ts + * @Service() + * class MyService { + * @OnLifecycleEvent('workflowExecuteAfter') + * async handleEvent(ctx: WorkflowExecuteAfterContext) { + * // ... + * } + * } + * ``` + */ +export const OnLifecycleEvent = + (eventName: LifecycleEvent): MethodDecorator => + (prototype, propertyKey, descriptor) => { + const handlerClass = prototype.constructor as LifecycleHandlerClass; + const methodName = String(propertyKey); + + if (typeof descriptor?.value !== 'function') { + throw new NonMethodError(`${handlerClass.name}.${methodName}()`); + } + + Container.get(LifecycleMetadata).register({ + handlerClass, + methodName, + eventName, + }); + }; diff --git a/packages/@n8n/decorators/src/on-multi-main-event.ts b/packages/@n8n/decorators/src/on-multi-main-event.ts index d8e4118d53..7891380563 100644 --- a/packages/@n8n/decorators/src/on-multi-main-event.ts +++ b/packages/@n8n/decorators/src/on-multi-main-event.ts @@ -1,6 +1,6 @@ import { Container } from '@n8n/di'; -import { UnexpectedError } from 'n8n-workflow'; +import { NonMethodError } from './errors'; import type { EventHandlerClass, MultiMainEvent } from './multi-main-metadata'; import { LEADER_TAKEOVER_EVENT_NAME, @@ -8,12 +8,6 @@ import { MultiMainMetadata, } from './multi-main-metadata'; -export class NonMethodError extends UnexpectedError { - constructor(name: string) { - super(`${name} must be a method on a class to use this decorator`); - } -} - const OnMultiMainEvent = (eventName: MultiMainEvent): MethodDecorator => (prototype, propertyKey, descriptor) => { diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index 14f722793a..6bb1055d70 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -1,7 +1,6 @@ import 'reflect-metadata'; import { GlobalConfig } from '@n8n/config'; import { LICENSE_FEATURES } from '@n8n/constants'; -import { ModuleRegistry } from '@n8n/decorators'; import { Container } from '@n8n/di'; import { Command, Errors } from '@oclif/core'; import { @@ -30,6 +29,7 @@ import { ExternalHooks } from '@/external-hooks'; import { ExternalSecretsManager } from '@/external-secrets.ee/external-secrets-manager.ee'; import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; +import { ModuleRegistry } from '@/modules/module-registry'; import type { ModulePreInit } from '@/modules/modules.config'; import { ModulesConfig } from '@/modules/modules.config'; import { NodeTypes } from '@/node-types'; diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts index f588da4d7c..b2630d6c33 100644 --- a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -1,4 +1,3 @@ -import { ModuleRegistry } from '@n8n/decorators'; import { Container } from '@n8n/di'; import { stringify } from 'flatted'; import { ErrorReporter, Logger, InstanceSettings, ExecutionLifecycleHooks } from 'n8n-core'; @@ -11,6 +10,7 @@ import type { import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { EventService } from '@/events/event.service'; import { ExternalHooks } from '@/external-hooks'; +import { ModuleRegistry } from '@/modules/module-registry'; import { Push } from '@/push'; import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; import { isWorkflowIdValid } from '@/utils'; diff --git a/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts index 8004fd632e..e20e5ecb7e 100644 --- a/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights-collection.service.test.ts @@ -1,9 +1,9 @@ +import type { WorkflowExecuteAfterContext } from '@n8n/decorators'; import { Container } from '@n8n/di'; import { In, type EntityManager } from '@n8n/typeorm'; import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; import type { Logger } from 'n8n-core'; -import { type ExecutionLifecycleHooks } from 'n8n-core'; import { createDeferredPromise, type ExecutionStatus, @@ -76,10 +76,10 @@ describe('workflowExecuteAfterHandler', () => { { status: 'crashed', type: 'failure' }, ])('stores events for executions with the status `$status`', async ({ status, type }) => { // ARRANGE - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow }); const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); - const run = mock({ + ctx.runData = mock({ mode: 'webhook', status, startedAt: startedAt.toJSDate(), @@ -88,7 +88,7 @@ describe('workflowExecuteAfterHandler', () => { // ACT const now = DateTime.utc().toJSDate(); - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT @@ -143,10 +143,10 @@ describe('workflowExecuteAfterHandler', () => { { status: 'running' }, ])('does not store events for executions with the status `$status`', async ({ status }) => { // ARRANGE - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow }); const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); - const run = mock({ + ctx.runData = mock({ mode: 'webhook', status, startedAt: startedAt.toJSDate(), @@ -154,7 +154,7 @@ describe('workflowExecuteAfterHandler', () => { }); // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT @@ -170,10 +170,10 @@ describe('workflowExecuteAfterHandler', () => { { mode: 'integrated' }, ])('does not store events for executions with the mode `$mode`', async ({ mode }) => { // ARRANGE - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow }); const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); - const run = mock({ + ctx.runData = mock({ mode, status: 'success', startedAt: startedAt.toJSDate(), @@ -181,7 +181,7 @@ describe('workflowExecuteAfterHandler', () => { }); // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT @@ -200,10 +200,10 @@ describe('workflowExecuteAfterHandler', () => { { mode: 'webhook' }, ])('stores events for executions with the mode `$mode`', async ({ mode }) => { // ARRANGE - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow }); const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); - const run = mock({ + ctx.runData = mock({ mode, status: 'success', startedAt: startedAt.toJSDate(), @@ -211,7 +211,7 @@ describe('workflowExecuteAfterHandler', () => { }); // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT @@ -259,7 +259,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); - const run = mock({ + const runData = mock({ mode: 'webhook', status: 'success', startedAt: startedAt.toJSDate(), @@ -316,12 +316,13 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { test('reuses cached metadata for subsequent executions of the same workflow', async () => { // ARRANGE - const ctx = mock({ - workflowData: { ...workflow, settings: undefined }, + const ctx = mock({ + workflow: { ...workflow, settings: undefined }, + runData, }); // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT @@ -343,7 +344,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { ); // ACT AGAIN with the same workflow - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT AGAIN @@ -355,10 +356,10 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { test('updates cached metadata if workflow details change', async () => { // ARRANGE - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow }); // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT @@ -369,7 +370,7 @@ describe('workflowExecuteAfterHandler - cacheMetadata', () => { workflow.name = 'new-workflow-name'; // ACT AGAIN with the same workflow - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await insightsCollectionService.flushEvents(); // ASSERT AGAIN @@ -410,7 +411,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { const startedAt = DateTime.utc(); const stoppedAt = startedAt.plus({ seconds: 5 }); - const run = mock({ + const runData = mock({ mode: 'trigger', status: 'success', startedAt: startedAt.toJSDate(), @@ -463,13 +464,13 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { test('flushes events to the database once buffer is full', async () => { // ARRANGE - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow, runData }); // ACT // each `workflowExecuteAfterHandler` adds 3 insights (status, runtime, time saved); // we call it 333 times be 1 away from the flushBatchSize (1000) for (let i = 0; i < 333; i++) { - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); } // await for the next tick to ensure the flush is called await new Promise(process.nextTick); @@ -478,7 +479,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { expect(trxMock.insert).not.toHaveBeenCalled(); // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); // ASSERT // await for the next tick to ensure the flush is called @@ -491,12 +492,12 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { jest.useFakeTimers(); trxMock.insert.mockClear(); insightsCollectionService.startFlushingTimer(); - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow, runData }); try { // ACT for (let i = 0; i < 33; i++) { - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); } // ASSERT expect(trxMock.insert).not.toHaveBeenCalled(); @@ -516,18 +517,18 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { jest.useFakeTimers(); trxMock.insert.mockClear(); insightsCollectionService.startFlushingTimer(); - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow }); try { // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await jest.advanceTimersByTimeAsync(31 * 1000); // ASSERT expect(trxMock.insert).toHaveBeenCalledTimes(1); // // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await jest.advanceTimersByTimeAsync(31 * 1000); expect(trxMock.insert).toHaveBeenCalledTimes(2); @@ -562,11 +563,11 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { test('flushes events to the database on shutdown', async () => { // ARRANGE trxMock.insert.mockClear(); - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow, runData }); // ACT for (let i = 0; i < 10; i++) { - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); } await insightsCollectionService.shutdown(); @@ -583,16 +584,16 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { // reset insights async flushing insightsCollectionService.startFlushingTimer(); trxMock.insert.mockClear(); - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow, runData }); // ACT for (let i = 0; i < 10; i++) { - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); } void insightsCollectionService.shutdown(); // trigger a workflow after shutdown - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); // ASSERT expect(trxMock.insert).toHaveBeenCalledTimes(2); @@ -615,11 +616,11 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { trxMock.insert.mockClear(); trxMock.insert.mockRejectedValueOnce(new Error('Test error')); insightsCollectionService.startFlushingTimer(); - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow, runData }); try { // ACT - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); await jest.advanceTimersByTimeAsync(31 * 1000); // ASSERT @@ -646,7 +647,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { insightsCollectionService.startFlushingTimer(); trxMock.insert.mockClear(); - const ctx = mock({ workflowData: workflow }); + const ctx = mock({ workflow, runData }); // Flush will hang until we manually resolve it const { resolve: flushResolve, promise: flushPromise } = createDeferredPromise(); @@ -659,7 +660,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => { // Each `workflowExecuteAfterHandler` adds 3 insights; // we call it 4 times to exceed the flushBatchSize (10) for (let i = 0; i < config.flushBatchSize / 3; i++) { - await insightsCollectionService.workflowExecuteAfterHandler(ctx, run); + await insightsCollectionService.handleWorkflowExecuteAfter(ctx); } // ACT diff --git a/packages/cli/src/modules/insights/insights-collection.service.ts b/packages/cli/src/modules/insights/insights-collection.service.ts index da740b9d02..0762760ee8 100644 --- a/packages/cli/src/modules/insights/insights-collection.service.ts +++ b/packages/cli/src/modules/insights/insights-collection.service.ts @@ -1,14 +1,9 @@ +import { OnLifecycleEvent, type WorkflowExecuteAfterContext } from '@n8n/decorators'; import { Service } from '@n8n/di'; import { In } from '@n8n/typeorm'; import { DateTime } from 'luxon'; import { Logger } from 'n8n-core'; -import type { ExecutionLifecycleHooks } from 'n8n-core'; -import { - UnexpectedError, - type ExecutionStatus, - type IRun, - type WorkflowExecuteMode, -} from 'n8n-workflow'; +import { UnexpectedError, type ExecutionStatus, type WorkflowExecuteMode } from 'n8n-workflow'; import { SharedWorkflow } from '@/databases/entities/shared-workflow'; import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository'; @@ -103,16 +98,17 @@ export class InsightsCollectionService { await Promise.all([...this.flushesInProgress, this.flushEvents()]); } - async workflowExecuteAfterHandler(ctx: ExecutionLifecycleHooks, fullRunData: IRun) { - if (shouldSkipStatus[fullRunData.status] || shouldSkipMode[fullRunData.mode]) { + @OnLifecycleEvent('workflowExecuteAfter') + async handleWorkflowExecuteAfter(ctx: WorkflowExecuteAfterContext) { + if (shouldSkipStatus[ctx.runData.status] || shouldSkipMode[ctx.runData.mode]) { return; } - const status = fullRunData.status === 'success' ? 'success' : 'failure'; + const status = ctx.runData.status === 'success' ? 'success' : 'failure'; const commonWorkflowData = { - workflowId: ctx.workflowData.id, - workflowName: ctx.workflowData.name, + workflowId: ctx.workflow.id, + workflowName: ctx.workflow.name, timestamp: DateTime.utc().toJSDate(), }; @@ -124,8 +120,8 @@ export class InsightsCollectionService { }); // run time event - if (fullRunData.stoppedAt) { - const value = fullRunData.stoppedAt.getTime() - fullRunData.startedAt.getTime(); + if (ctx.runData.stoppedAt) { + const value = ctx.runData.stoppedAt.getTime() - ctx.runData.startedAt.getTime(); this.bufferedInsights.add({ ...commonWorkflowData, type: 'runtime_ms', @@ -134,11 +130,11 @@ export class InsightsCollectionService { } // time saved event - if (status === 'success' && ctx.workflowData.settings?.timeSavedPerExecution) { + if (status === 'success' && ctx.workflow.settings?.timeSavedPerExecution) { this.bufferedInsights.add({ ...commonWorkflowData, type: 'time_saved_min', - value: ctx.workflowData.settings.timeSavedPerExecution, + value: ctx.workflow.settings.timeSavedPerExecution, }); } diff --git a/packages/cli/src/modules/insights/insights.module.ts b/packages/cli/src/modules/insights/insights.module.ts index b6604e7739..28f6aff731 100644 --- a/packages/cli/src/modules/insights/insights.module.ts +++ b/packages/cli/src/modules/insights/insights.module.ts @@ -1,6 +1,5 @@ import type { BaseN8nModule } from '@n8n/decorators'; import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators'; -import type { ExecutionLifecycleHooks } from 'n8n-core'; import { InstanceSettings, Logger } from 'n8n-core'; import { InsightsService } from './insights.service'; @@ -25,14 +24,6 @@ export class InsightsModule implements BaseN8nModule { } } - registerLifecycleHooks(hooks: ExecutionLifecycleHooks) { - const insightsService = this.insightsService; - - hooks.addHandler('workflowExecuteAfter', async function (fullRunData) { - await insightsService.workflowExecuteAfterHandler(this, fullRunData); - }); - } - @OnLeaderTakeover() startBackgroundProcess() { this.insightsService.startBackgroundProcess(); diff --git a/packages/cli/src/modules/insights/insights.service.ts b/packages/cli/src/modules/insights/insights.service.ts index 9262806aab..cc2466c06c 100644 --- a/packages/cli/src/modules/insights/insights.service.ts +++ b/packages/cli/src/modules/insights/insights.service.ts @@ -6,8 +6,7 @@ import { import { OnShutdown } from '@n8n/decorators'; import { Service } from '@n8n/di'; import { Logger } from 'n8n-core'; -import type { ExecutionLifecycleHooks } from 'n8n-core'; -import { UserError, type IRun } from 'n8n-workflow'; +import { UserError } from 'n8n-workflow'; import { License } from '@/license'; @@ -55,10 +54,6 @@ export class InsightsService { this.compactionService.stopCompactionTimer(); } - async workflowExecuteAfterHandler(ctx: ExecutionLifecycleHooks, fullRunData: IRun) { - await this.collectionService.workflowExecuteAfterHandler(ctx, fullRunData); - } - async getInsightsSummary({ periodLengthInDays, }: { periodLengthInDays: number }): Promise { diff --git a/packages/cli/src/modules/module-registry.ts b/packages/cli/src/modules/module-registry.ts new file mode 100644 index 0000000000..691a323588 --- /dev/null +++ b/packages/cli/src/modules/module-registry.ts @@ -0,0 +1,122 @@ +import type { LifecycleContext } from '@n8n/decorators'; +import { LifecycleMetadata, ModuleMetadata } from '@n8n/decorators'; +import { Container, Service } from '@n8n/di'; +import type { ExecutionLifecycleHooks } from 'n8n-core'; +import type { + IDataObject, + IRun, + IRunExecutionData, + ITaskData, + ITaskStartedData, + IWorkflowBase, + Workflow, +} from 'n8n-workflow'; + +@Service() +export class ModuleRegistry { + constructor( + private readonly moduleMetadata: ModuleMetadata, + private readonly lifecycleMetadata: LifecycleMetadata, + ) {} + + initializeModules() { + for (const ModuleClass of this.moduleMetadata.getModules()) { + Container.get(ModuleClass).initialize?.(); + } + } + + registerLifecycleHooks(hooks: ExecutionLifecycleHooks) { + const handlers = this.lifecycleMetadata.getHandlers(); + + for (const { handlerClass, methodName, eventName } of handlers) { + const instance = Container.get(handlerClass); + + switch (eventName) { + case 'workflowExecuteAfter': + hooks.addHandler( + eventName, + async function ( + this: { workflowData: IWorkflowBase }, + runData: IRun, + newStaticData: IDataObject, + ) { + const context: LifecycleContext = { + type: 'workflowExecuteAfter', + workflow: this.workflowData, + runData, + newStaticData, + }; + // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await + return await instance[methodName].call(instance, context); + }, + ); + break; + + case 'nodeExecuteBefore': + hooks.addHandler( + eventName, + async function ( + this: { workflowData: IWorkflowBase }, + nodeName: string, + taskData: ITaskStartedData, + ) { + const context: LifecycleContext = { + type: 'nodeExecuteBefore', + workflow: this.workflowData, + nodeName, + taskData, + }; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await + return await instance[methodName].call(instance, context); + }, + ); + break; + + case 'nodeExecuteAfter': + hooks.addHandler( + eventName, + async function ( + this: { workflowData: IWorkflowBase }, + nodeName: string, + taskData: ITaskData, + executionData: IRunExecutionData, + ) { + const context: LifecycleContext = { + type: 'nodeExecuteAfter', + workflow: this.workflowData, + nodeName, + taskData, + executionData, + }; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await + return await instance[methodName].call(instance, context); + }, + ); + break; + + case 'workflowExecuteBefore': + hooks.addHandler( + eventName, + async function ( + this: { workflowData: IWorkflowBase }, + workflowInstance: Workflow, + executionData?: IRunExecutionData, + ) { + const context: LifecycleContext = { + type: 'workflowExecuteBefore', + workflow: this.workflowData, + workflowInstance, + executionData, + }; + + // eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/return-await + return await instance[methodName].call(instance, context); + }, + ); + break; + } + } + } +}