From 4b11268a6ea44fa5ab1db3ec2e3338840265b655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 4 Jun 2025 18:32:33 +0200 Subject: [PATCH] refactor(core): Implement a new `OnPubSubEvent` decorator (#15688) --- packages/@n8n/constants/src/index.ts | 6 + packages/@n8n/db/src/utils/generators.ts | 2 +- packages/@n8n/decorators/src/index.ts | 1 + .../pubsub/__tests__/on-pubsub-event.test.ts | 56 ++ packages/@n8n/decorators/src/pubsub/index.ts | 2 + .../decorators/src/pubsub/on-pubsub-event.ts | 43 + .../decorators/src/pubsub/pubsub-metadata.ts | 46 + .../__tests__/active-workflow-manager.test.ts | 1 + packages/cli/src/active-workflow-manager.ts | 74 +- packages/cli/src/commands/start.ts | 4 +- packages/cli/src/commands/webhook.ts | 4 +- packages/cli/src/commands/worker.ts | 4 +- .../controllers/orchestration.controller.ts | 6 +- .../message-event-bus/message-event-bus.ts | 2 + packages/cli/src/events/event.service.ts | 3 +- .../external-secrets-manager.ee.ts | 3 +- packages/cli/src/license.ts | 3 +- .../__tests__/insights.pre-init.test.ts | 3 +- packages/cli/src/push/index.ts | 8 +- .../scaling/__tests__/pubsub-handler.test.ts | 896 ------------------ .../__tests__/publisher.service.test.ts | 4 +- .../pubsub/__tests__/pubsub.registry.test.ts | 215 +++++ .../__tests__/subscriber.service.test.ts | 10 +- .../cli/src/scaling/pubsub/pubsub-handler.ts | 177 ---- .../pubsub/pubsub.event-map.ts} | 4 +- .../cli/src/scaling/pubsub/pubsub.eventbus.ts | 8 + .../cli/src/scaling/pubsub/pubsub.registry.ts | 39 + .../cli/src/scaling/pubsub/pubsub.types.ts | 36 +- .../src/scaling/pubsub/subscriber.service.ts | 8 +- .../src/scaling/worker-status.service.ee.ts | 35 +- .../services/community-packages.service.ts | 19 +- packages/cli/src/webhooks/test-webhooks.ts | 20 + packages/core/package.json | 1 + packages/core/src/errors/error-reporter.ts | 3 +- packages/core/src/instance-settings/index.ts | 2 +- .../instance-settings/instance-settings.ts | 11 +- packages/core/tsconfig.json | 1 + pnpm-lock.yaml | 3 + 38 files changed, 610 insertions(+), 1153 deletions(-) create mode 100644 packages/@n8n/decorators/src/pubsub/__tests__/on-pubsub-event.test.ts create mode 100644 packages/@n8n/decorators/src/pubsub/index.ts create mode 100644 packages/@n8n/decorators/src/pubsub/on-pubsub-event.ts create mode 100644 packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts delete mode 100644 packages/cli/src/scaling/__tests__/pubsub-handler.test.ts rename packages/cli/src/scaling/{ => pubsub}/__tests__/publisher.service.test.ts (97%) create mode 100644 packages/cli/src/scaling/pubsub/__tests__/pubsub.registry.test.ts rename packages/cli/src/scaling/{ => pubsub}/__tests__/subscriber.service.test.ts (75%) delete mode 100644 packages/cli/src/scaling/pubsub/pubsub-handler.ts rename packages/cli/src/{events/maps/pub-sub.event-map.ts => scaling/pubsub/pubsub.event-map.ts} (100%) create mode 100644 packages/cli/src/scaling/pubsub/pubsub.eventbus.ts create mode 100644 packages/cli/src/scaling/pubsub/pubsub.registry.ts diff --git a/packages/@n8n/constants/src/index.ts b/packages/@n8n/constants/src/index.ts index d9ed5b2b02..0e952df381 100644 --- a/packages/@n8n/constants/src/index.ts +++ b/packages/@n8n/constants/src/index.ts @@ -94,3 +94,9 @@ export const LDAP_DEFAULT_CONFIGURATION: LdapConfig = { searchPageSize: 0, searchTimeout: 60, }; + +export const INSTANCE_TYPES = ['main', 'webhook', 'worker'] as const; +export type InstanceType = (typeof INSTANCE_TYPES)[number]; + +export const INSTANCE_ROLES = ['unset', 'leader', 'follower'] as const; +export type InstanceRole = (typeof INSTANCE_ROLES)[number]; diff --git a/packages/@n8n/db/src/utils/generators.ts b/packages/@n8n/db/src/utils/generators.ts index 85a1d980a4..65b1f07b32 100644 --- a/packages/@n8n/db/src/utils/generators.ts +++ b/packages/@n8n/db/src/utils/generators.ts @@ -1,4 +1,4 @@ -import type { InstanceType } from 'n8n-core'; +import type { InstanceType } from '@n8n/constants'; import { ALPHABET } from 'n8n-workflow'; import { customAlphabet } from 'nanoid'; diff --git a/packages/@n8n/decorators/src/index.ts b/packages/@n8n/decorators/src/index.ts index 9ea7cae4bf..7c21df416a 100644 --- a/packages/@n8n/decorators/src/index.ts +++ b/packages/@n8n/decorators/src/index.ts @@ -4,6 +4,7 @@ export * from './execution-lifecycle'; export { Memoized } from './memoized'; export * from './module'; export * from './multi-main'; +export * from './pubsub'; export { Redactable } from './redactable'; export * from './shutdown'; export * from './module/module-metadata'; diff --git a/packages/@n8n/decorators/src/pubsub/__tests__/on-pubsub-event.test.ts b/packages/@n8n/decorators/src/pubsub/__tests__/on-pubsub-event.test.ts new file mode 100644 index 0000000000..acce7fc50a --- /dev/null +++ b/packages/@n8n/decorators/src/pubsub/__tests__/on-pubsub-event.test.ts @@ -0,0 +1,56 @@ +import { Container } from '@n8n/di'; +import { Service } from '@n8n/di'; + +import { NonMethodError } from '../../errors'; +import { OnPubSubEvent } from '../on-pubsub-event'; +import { PubSubMetadata } from '../pubsub-metadata'; + +describe('@OnPubSubEvent', () => { + let metadata: PubSubMetadata; + + beforeEach(() => { + Container.reset(); + + metadata = new PubSubMetadata(); + Container.set(PubSubMetadata, metadata); + }); + + it('should register methods decorated with @OnPubSubEvent', () => { + jest.spyOn(metadata, 'register'); + + @Service() + class TestService { + @OnPubSubEvent('reload-external-secrets-providers') + async reloadProviders() {} + + @OnPubSubEvent('restart-event-bus', { instanceType: 'worker' }) + async restartEventBus() {} + } + + expect(metadata.register).toHaveBeenNthCalledWith(1, { + eventName: 'reload-external-secrets-providers', + methodName: 'reloadProviders', + eventHandlerClass: TestService, + }); + + expect(metadata.register).toHaveBeenNthCalledWith(2, { + eventName: 'restart-event-bus', + methodName: 'restartEventBus', + eventHandlerClass: TestService, + filter: { instanceType: 'worker' }, + }); + }); + + it('should throw an error if the decorated target is not a method', () => { + expect(() => { + @Service() + class TestService { + // @ts-expect-error Testing invalid code + @OnPubSubEvent('reload-external-secrets-providers') + notAFunction = 'string'; + } + + new TestService(); + }).toThrowError(NonMethodError); + }); +}); diff --git a/packages/@n8n/decorators/src/pubsub/index.ts b/packages/@n8n/decorators/src/pubsub/index.ts new file mode 100644 index 0000000000..7f8cad4873 --- /dev/null +++ b/packages/@n8n/decorators/src/pubsub/index.ts @@ -0,0 +1,2 @@ +export { PubSubMetadata, PubSubEventName } from './pubsub-metadata'; +export { OnPubSubEvent } from './on-pubsub-event'; diff --git a/packages/@n8n/decorators/src/pubsub/on-pubsub-event.ts b/packages/@n8n/decorators/src/pubsub/on-pubsub-event.ts new file mode 100644 index 0000000000..4da429543e --- /dev/null +++ b/packages/@n8n/decorators/src/pubsub/on-pubsub-event.ts @@ -0,0 +1,43 @@ +import { Container } from '@n8n/di'; + +import { PubSubMetadata } from './pubsub-metadata'; +import type { PubSubEventName, PubSubEventFilter } from './pubsub-metadata'; +import { NonMethodError } from '../errors'; +import type { EventHandlerClass } from '../types'; + +/** + * Decorator that registers a method to be called when a specific PubSub event occurs. + * Optionally filters event handling based on instance type and role. + * + * @param eventName - The PubSub event to listen for + * @param filter - Optional filter to limit event handling to specific instance types or roles + * + * @example + * + * ```ts + * @Service() + * class MyService { + * @OnPubSubEvent('community-package-install', { instanceType: 'main', instanceRole: 'leader' }) + * async handlePackageInstall() { + * // Handle community package installation + * } + * } + * ``` + */ +export const OnPubSubEvent = + (eventName: PubSubEventName, filter?: PubSubEventFilter): MethodDecorator => + (prototype, propertyKey, descriptor) => { + const eventHandlerClass = prototype.constructor as EventHandlerClass; + const methodName = String(propertyKey); + + if (typeof descriptor?.value !== 'function') { + throw new NonMethodError(`${eventHandlerClass.name}.${methodName}()`); + } + + Container.get(PubSubMetadata).register({ + eventHandlerClass, + methodName, + eventName, + filter, + }); + }; diff --git a/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts b/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts new file mode 100644 index 0000000000..5405949972 --- /dev/null +++ b/packages/@n8n/decorators/src/pubsub/pubsub-metadata.ts @@ -0,0 +1,46 @@ +import type { InstanceRole, InstanceType } from '@n8n/constants'; +import { Service } from '@n8n/di'; + +import type { EventHandler } from '../types'; + +export type PubSubEventName = + | 'add-webhooks-triggers-and-pollers' + | 'remove-triggers-and-pollers' + | 'clear-test-webhooks' + | 'display-workflow-activation' + | 'display-workflow-deactivation' + | 'display-workflow-activation-error' + | 'community-package-install' + | 'community-package-uninstall' + | 'community-package-update' + | 'get-worker-status' + | 'reload-external-secrets-providers' + | 'reload-license' + | 'response-to-get-worker-status' + | 'restart-event-bus' + | 'relay-execution-lifecycle-event'; + +export type PubSubEventFilter = + | { + instanceType: 'main'; + instanceRole?: Omit; + } + | { + instanceType: Omit; + instanceRole?: never; + }; + +type PubSubEventHandler = EventHandler & { filter?: PubSubEventFilter }; + +@Service() +export class PubSubMetadata { + private readonly handlers: PubSubEventHandler[] = []; + + register(handler: PubSubEventHandler) { + this.handlers.push(handler); + } + + getHandlers(): PubSubEventHandler[] { + return this.handlers; + } +} diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index 7611ec5fd9..562aab026e 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -40,6 +40,7 @@ describe('ActiveWorkflowManager', () => { instanceSettings, mock(), mock(), + mock(), ); }); diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 9c1d079815..844f700c85 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -3,7 +3,7 @@ import { Logger } from '@n8n/backend-common'; import { WorkflowsConfig } from '@n8n/config'; import type { WorkflowEntity, IWorkflowDb } from '@n8n/db'; import { WorkflowRepository } from '@n8n/db'; -import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators'; +import { OnLeaderStepdown, OnLeaderTakeover, OnPubSubEvent, OnShutdown } from '@n8n/decorators'; import { Service } from '@n8n/di'; import chunk from 'lodash/chunk'; import { @@ -34,6 +34,7 @@ import { WorkflowActivationError, WebhookPathTakenError, UnexpectedError, + ensureError, } from 'n8n-workflow'; import { strict } from 'node:assert'; @@ -48,6 +49,7 @@ import { executeErrorWorkflow } from '@/execution-lifecycle/execute-error-workfl import { ExecutionService } from '@/executions/execution.service'; import { ExternalHooks } from '@/external-hooks'; import { NodeTypes } from '@/node-types'; +import { Push } from '@/push'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { ActiveWorkflowsService } from '@/services/active-workflows.service'; import * as WebhookHelpers from '@/webhooks/webhook-helpers'; @@ -85,6 +87,7 @@ export class ActiveWorkflowManager { private readonly instanceSettings: InstanceSettings, private readonly publisher: Publisher, private readonly workflowsConfig: WorkflowsConfig, + private readonly push: Push, ) { this.logger = this.logger.scoped(['workflow-activation']); } @@ -620,6 +623,61 @@ export class ActiveWorkflowManager { return added; } + @OnPubSubEvent('display-workflow-activation', { instanceType: 'main' }) + handleDisplayWorkflowActivation({ workflowId }: { workflowId: string }) { + this.push.broadcast({ type: 'workflowActivated', data: { workflowId } }); + } + + @OnPubSubEvent('display-workflow-deactivation', { instanceType: 'main' }) + handleDisplayWorkflowDeactivation({ workflowId }: { workflowId: string }) { + this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }); + } + + @OnPubSubEvent('display-workflow-activation-error', { instanceType: 'main' }) + handleDisplayWorkflowActivationError({ + workflowId, + errorMessage, + }: { workflowId: string; errorMessage: string }) { + this.push.broadcast({ + type: 'workflowFailedToActivate', + data: { workflowId, errorMessage }, + }); + } + + @OnPubSubEvent('add-webhooks-triggers-and-pollers', { + instanceType: 'main', + instanceRole: 'leader', + }) + async handleAddWebhooksTriggersAndPollers({ workflowId }: { workflowId: string }) { + try { + await this.add(workflowId, 'activate', undefined, { + shouldPublish: false, // prevent leader from re-publishing message + }); + + this.push.broadcast({ type: 'workflowActivated', data: { workflowId } }); + + await this.publisher.publishCommand({ + command: 'display-workflow-activation', + payload: { workflowId }, + }); // instruct followers to show activation in UI + } catch (e) { + const error = ensureError(e); + const { message } = error; + + await this.workflowRepository.update(workflowId, { active: false }); + + this.push.broadcast({ + type: 'workflowFailedToActivate', + data: { workflowId, errorMessage: message }, + }); + + await this.publisher.publishCommand({ + command: 'display-workflow-activation-error', + payload: { workflowId, errorMessage: message }, + }); // instruct followers to show activation error in UI + } + } + /** * A workflow can only be activated if it has a node which has either triggers * or webhooks defined. @@ -814,6 +872,20 @@ export class ActiveWorkflowManager { await this.removeWorkflowTriggersAndPollers(workflowId); } + @OnPubSubEvent('remove-triggers-and-pollers', { instanceType: 'main', instanceRole: 'leader' }) + async handleRemoveTriggersAndPollers({ workflowId }: { workflowId: string }) { + await this.removeActivationError(workflowId); + await this.removeWorkflowTriggersAndPollers(workflowId); + + this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }); + + // instruct followers to show workflow deactivation in UI + await this.publisher.publishCommand({ + command: 'display-workflow-deactivation', + payload: { workflowId }, + }); + } + /** * Stop running active triggers and pollers for a workflow. */ diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 674fc85dbc..acfce1e123 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -22,7 +22,7 @@ import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; import { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import { Publisher } from '@/scaling/pubsub/publisher.service'; -import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { PubSubRegistry } from '@/scaling/pubsub/pubsub.registry'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Server } from '@/server'; import { OwnershipService } from '@/services/ownership.service'; @@ -252,7 +252,7 @@ export class Start extends BaseCommand { async initOrchestration() { Container.get(Publisher); - Container.get(PubSubHandler).init(); + Container.get(PubSubRegistry).init(); const subscriber = Container.get(Subscriber); await subscriber.subscribe('n8n.commands'); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 0cb446cc74..e4c01eb283 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -4,7 +4,7 @@ import { Flags } from '@oclif/core'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { Publisher } from '@/scaling/pubsub/publisher.service'; -import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { PubSubRegistry } from '@/scaling/pubsub/pubsub.registry'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { WebhookServer } from '@/webhooks/webhook-server'; @@ -100,7 +100,7 @@ export class Webhook extends BaseCommand { async initOrchestration() { Container.get(Publisher); - Container.get(PubSubHandler).init(); + Container.get(PubSubRegistry).init(); await Container.get(Subscriber).subscribe('n8n.commands'); } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index e31bb8d619..630c3ce8f3 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -8,7 +8,7 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; import { Publisher } from '@/scaling/pubsub/publisher.service'; -import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; +import { PubSubRegistry } from '@/scaling/pubsub/pubsub.registry'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; @@ -130,7 +130,7 @@ export class Worker extends BaseCommand { async initOrchestration() { Container.get(Publisher); - Container.get(PubSubHandler).init(); + Container.get(PubSubRegistry).init(); await Container.get(Subscriber).subscribe('n8n.commands'); } diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index 807c25d00d..97b2353f99 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,13 +1,13 @@ import { Post, RestController, GlobalScope } from '@n8n/decorators'; import { License } from '@/license'; -import { Publisher } from '@/scaling/pubsub/publisher.service'; +import { WorkerStatusService } from '@/scaling/worker-status.service.ee'; @RestController('/orchestration') export class OrchestrationController { constructor( private readonly licenseService: License, - private readonly publisher: Publisher, + private readonly workerStatusService: WorkerStatusService, ) {} /** @@ -19,6 +19,6 @@ export class OrchestrationController { async getWorkersStatusAll() { if (!this.licenseService.isWorkerViewLicensed()) return; - return await this.publisher.publishCommand({ command: 'get-worker-status' }); + return await this.workerStatusService.requestWorkerStatus(); } } diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index 4bebb1b895..73300ad492 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -1,6 +1,7 @@ import { Logger } from '@n8n/backend-common'; import { GlobalConfig } from '@n8n/config'; import { EventDestinationsRepository, ExecutionRepository, WorkflowRepository } from '@n8n/db'; +import { OnPubSubEvent } from '@n8n/decorators'; import { Service } from '@n8n/di'; // eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import import type { DeleteResult } from '@n8n/typeorm'; @@ -263,6 +264,7 @@ export class MessageEventBus extends EventEmitter { this.logger.debug('EventBus shut down.'); } + @OnPubSubEvent('restart-event-bus') async restart() { await this.close(); await this.initialize({ skipRecoveryPass: true }); diff --git a/packages/cli/src/events/event.service.ts b/packages/cli/src/events/event.service.ts index dbcb5f2955..29a198a0d6 100644 --- a/packages/cli/src/events/event.service.ts +++ b/packages/cli/src/events/event.service.ts @@ -3,11 +3,10 @@ import { Service } from '@n8n/di'; import { TypedEmitter } from '@/typed-emitter'; import type { AiEventMap } from './maps/ai.event-map'; -import type { PubSubEventMap } from './maps/pub-sub.event-map'; import type { QueueMetricsEventMap } from './maps/queue-metrics.event-map'; import type { RelayEventMap } from './maps/relay.event-map'; -type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap & PubSubEventMap; +type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap; @Service() export class EventService extends TypedEmitter {} diff --git a/packages/cli/src/external-secrets.ee/external-secrets-manager.ee.ts b/packages/cli/src/external-secrets.ee/external-secrets-manager.ee.ts index 91a6b70b3a..2d48891e29 100644 --- a/packages/cli/src/external-secrets.ee/external-secrets-manager.ee.ts +++ b/packages/cli/src/external-secrets.ee/external-secrets-manager.ee.ts @@ -1,6 +1,6 @@ import { Logger } from '@n8n/backend-common'; import { SettingsRepository } from '@n8n/db'; -import { OnShutdown } from '@n8n/decorators'; +import { OnPubSubEvent, OnShutdown } from '@n8n/decorators'; import { Service } from '@n8n/di'; import { Cipher } from 'n8n-core'; import { jsonParse, type IDataObject, ensureError, UnexpectedError } from 'n8n-workflow'; @@ -77,6 +77,7 @@ export class ExternalSecretsManager { this.logger.debug('External secrets manager shut down'); } + @OnPubSubEvent('reload-external-secrets-providers') async reloadAllProviders(backoff?: number) { this.logger.debug('Reloading all external secrets providers'); const providers = this.getProviderNames(); diff --git a/packages/cli/src/license.ts b/packages/cli/src/license.ts index b9983cf24b..eec82fe9e8 100644 --- a/packages/cli/src/license.ts +++ b/packages/cli/src/license.ts @@ -9,7 +9,7 @@ import { type NumericLicenseFeature, } from '@n8n/constants'; import { SettingsRepository } from '@n8n/db'; -import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators'; +import { OnLeaderStepdown, OnLeaderTakeover, OnPubSubEvent, OnShutdown } from '@n8n/decorators'; import { Container, Service } from '@n8n/di'; import type { TEntitlement, TLicenseBlock } from '@n8n_io/license-sdk'; import { LicenseManager } from '@n8n_io/license-sdk'; @@ -171,6 +171,7 @@ export class License implements LicenseProvider { this.logger.debug('License activated'); } + @OnPubSubEvent('reload-license') async reload(): Promise { if (!this.manager) { return; diff --git a/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts b/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts index 1af0a9df84..071d74a7da 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.pre-init.test.ts @@ -1,5 +1,6 @@ +import type { InstanceType } from '@n8n/constants'; import { mock } from 'jest-mock-extended'; -import type { InstanceSettings, InstanceType } from 'n8n-core'; +import type { InstanceSettings } from 'n8n-core'; import type { ModulePreInitContext } from '@/modules/modules.config'; diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 671b579076..7ffb989536 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,7 +1,7 @@ import type { PushMessage } from '@n8n/api-types'; import { inProduction, Logger } from '@n8n/backend-common'; import type { User } from '@n8n/db'; -import { OnShutdown } from '@n8n/decorators'; +import { OnPubSubEvent, OnShutdown } from '@n8n/decorators'; import { Container, Service } from '@n8n/di'; import type { Application } from 'express'; import { ServerResponse } from 'http'; @@ -205,6 +205,12 @@ export class Push extends TypedEmitter { return isWorker || (isMultiMain && !this.hasPushRef(pushRef)); } + @OnPubSubEvent('relay-execution-lifecycle-event', { instanceType: 'main' }) + handleRelayExecutionLifecycleEvent({ pushRef, ...pushMsg }: PushMessage & { pushRef: string }) { + if (!this.hasPushRef(pushRef)) return; + this.send(pushMsg, pushRef); + } + /** * Relay a push message via the `n8n.commands` pubsub channel, * reducing the payload size if too large. diff --git a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts b/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts deleted file mode 100644 index 11594edcc9..0000000000 --- a/packages/cli/src/scaling/__tests__/pubsub-handler.test.ts +++ /dev/null @@ -1,896 +0,0 @@ -import type { WorkerStatus } from '@n8n/api-types'; -import type { WorkflowRepository } from '@n8n/db'; -import { mock } from 'jest-mock-extended'; -import type { InstanceSettings } from 'n8n-core'; -import type { IWorkflowBase, Workflow } from 'n8n-workflow'; - -import type { ActiveWorkflowManager } from '@/active-workflow-manager'; -import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { EventService } from '@/events/event.service'; -import type { ExternalSecretsManager } from '@/external-secrets.ee/external-secrets-manager.ee'; -import type { License } from '@/license'; -import type { Push } from '@/push'; -import type { CommunityPackagesService } from '@/services/community-packages.service'; -import type { TestWebhooks } from '@/webhooks/test-webhooks'; - -import type { Publisher } from '../pubsub/publisher.service'; -import { PubSubHandler } from '../pubsub/pubsub-handler'; -import type { WorkerStatusService } from '../worker-status.service.ee'; - -const flushPromises = async () => await new Promise((resolve) => setImmediate(resolve)); - -describe('PubSubHandler', () => { - const eventService = new EventService(); - const license = mock(); - const eventbus = mock(); - const externalSecretsManager = mock(); - const communityPackagesService = mock(); - const publisher = mock(); - const workerStatusService = mock(); - const activeWorkflowManager = mock(); - const push = mock(); - const workflowRepository = mock(); - const testWebhooks = mock(); - - afterEach(() => { - eventService.removeAllListeners(); - }); - - describe('in webhook process', () => { - const instanceSettings = mock({ instanceType: 'webhook' }); - - it('should set up handlers in webhook process', () => { - // @ts-expect-error Spying on private method - const setupHandlers = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); - - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - expect(setupHandlers).toHaveBeenCalledWith({ - 'reload-license': expect.any(Function), - 'restart-event-bus': expect.any(Function), - 'reload-external-secrets-providers': expect.any(Function), - 'community-package-install': expect.any(Function), - 'community-package-update': expect.any(Function), - 'community-package-uninstall': expect.any(Function), - }); - }); - - it('should reload license on `reload-license` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('reload-license'); - - expect(license.reload).toHaveBeenCalled(); - }); - - it('should restart event bus on `restart-event-bus` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('restart-event-bus'); - - expect(eventbus.restart).toHaveBeenCalled(); - }); - - it('should reload providers on `reload-external-secrets-providers` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('reload-external-secrets-providers'); - - expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); - }); - - it('should install community package on `community-package-install` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-install', { - packageName: 'test-package', - packageVersion: '1.0.0', - }); - - expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( - 'test-package', - '1.0.0', - ); - }); - - it('should update community package on `community-package-update` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-update', { - packageName: 'test-package', - packageVersion: '1.0.0', - }); - - expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( - 'test-package', - '1.0.0', - ); - }); - - it('should uninstall community package on `community-package-uninstall` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-uninstall', { - packageName: 'test-package', - }); - - expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); - }); - }); - - describe('in worker process', () => { - const instanceSettings = mock({ instanceType: 'worker' }); - - it('should set up handlers in worker process', () => { - // @ts-expect-error Spying on private method - const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); - - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - expect(setupHandlersSpy).toHaveBeenCalledWith({ - 'reload-license': expect.any(Function), - 'restart-event-bus': expect.any(Function), - 'reload-external-secrets-providers': expect.any(Function), - 'community-package-install': expect.any(Function), - 'community-package-update': expect.any(Function), - 'community-package-uninstall': expect.any(Function), - 'get-worker-status': expect.any(Function), - }); - }); - - it('should reload license on `reload-license` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('reload-license'); - - expect(license.reload).toHaveBeenCalled(); - }); - - it('should restart event bus on `restart-event-bus` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('restart-event-bus'); - - expect(eventbus.restart).toHaveBeenCalled(); - }); - - it('should reload providers on `reload-external-secrets-providers` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('reload-external-secrets-providers'); - - expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); - }); - - it('should install community package on `community-package-install` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-install', { - packageName: 'test-package', - packageVersion: '1.0.0', - }); - - expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( - 'test-package', - '1.0.0', - ); - }); - - it('should update community package on `community-package-update` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-update', { - packageName: 'test-package', - packageVersion: '1.0.0', - }); - - expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( - 'test-package', - '1.0.0', - ); - }); - - it('should uninstall community package on `community-package-uninstall` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-uninstall', { - packageName: 'test-package', - }); - - expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); - }); - - it('should generate status on `get-worker-status` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('get-worker-status'); - - expect(workerStatusService.generateStatus).toHaveBeenCalled(); - }); - }); - - describe('in main process', () => { - const instanceSettings = mock({ - instanceType: 'main', - isLeader: true, - isFollower: false, - }); - - afterEach(() => { - jest.clearAllMocks(); - }); - - it('should set up command and worker response handlers in main process', () => { - // @ts-expect-error Spying on private method - const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers'); - - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - expect(setupHandlersSpy).toHaveBeenCalledWith({ - 'reload-license': expect.any(Function), - 'restart-event-bus': expect.any(Function), - 'reload-external-secrets-providers': expect.any(Function), - 'community-package-install': expect.any(Function), - 'community-package-update': expect.any(Function), - 'community-package-uninstall': expect.any(Function), - 'add-webhooks-triggers-and-pollers': expect.any(Function), - 'remove-triggers-and-pollers': expect.any(Function), - 'display-workflow-activation': expect.any(Function), - 'display-workflow-deactivation': expect.any(Function), - 'display-workflow-activation-error': expect.any(Function), - 'relay-execution-lifecycle-event': expect.any(Function), - 'clear-test-webhooks': expect.any(Function), - 'response-to-get-worker-status': expect.any(Function), - }); - }); - - it('should reload license on `reload-license` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('reload-license'); - - expect(license.reload).toHaveBeenCalled(); - }); - - it('should restart event bus on `restart-event-bus` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('restart-event-bus'); - - expect(eventbus.restart).toHaveBeenCalled(); - }); - - it('should reload providers on `reload-external-secrets-providers` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('reload-external-secrets-providers'); - - expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled(); - }); - - it('should install community package on `community-package-install` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-install', { - packageName: 'test-package', - packageVersion: '1.0.0', - }); - - expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( - 'test-package', - '1.0.0', - ); - }); - - it('should update community package on `community-package-update` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-update', { - packageName: 'test-package', - packageVersion: '1.0.0', - }); - - expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith( - 'test-package', - '1.0.0', - ); - }); - - it('should uninstall community package on `community-package-uninstall` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - eventService.emit('community-package-uninstall', { - packageName: 'test-package', - }); - - expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package'); - }); - - describe('multi-main setup', () => { - it('if leader, should handle `add-webhooks-triggers-and-pollers` event', async () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - - eventService.emit('add-webhooks-triggers-and-pollers', { workflowId }); - - await flushPromises(); - - expect(activeWorkflowManager.add).toHaveBeenCalledWith(workflowId, 'activate', undefined, { - shouldPublish: false, - }); - expect(push.broadcast).toHaveBeenCalledWith({ - type: 'workflowActivated', - data: { workflowId }, - }); - expect(publisher.publishCommand).toHaveBeenCalledWith({ - command: 'display-workflow-activation', - payload: { workflowId }, - }); - }); - - it('if follower, should skip `add-webhooks-triggers-and-pollers` event', async () => { - new PubSubHandler( - eventService, - mock({ instanceType: 'main', isLeader: false, isFollower: true }), - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - - eventService.emit('add-webhooks-triggers-and-pollers', { workflowId }); - - await flushPromises(); - - expect(activeWorkflowManager.add).not.toHaveBeenCalled(); - expect(push.broadcast).not.toHaveBeenCalled(); - expect(publisher.publishCommand).not.toHaveBeenCalled(); - }); - - it('if leader, should handle `remove-triggers-and-pollers` event', async () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - - eventService.emit('remove-triggers-and-pollers', { workflowId }); - - await flushPromises(); - - expect(activeWorkflowManager.removeActivationError).toHaveBeenCalledWith(workflowId); - expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).toHaveBeenCalledWith( - workflowId, - ); - expect(push.broadcast).toHaveBeenCalledWith({ - type: 'workflowDeactivated', - data: { workflowId }, - }); - expect(publisher.publishCommand).toHaveBeenCalledWith({ - command: 'display-workflow-deactivation', - payload: { workflowId }, - }); - }); - - it('if follower, should skip `remove-triggers-and-pollers` event', async () => { - new PubSubHandler( - eventService, - mock({ instanceType: 'main', isLeader: false, isFollower: true }), - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - - eventService.emit('remove-triggers-and-pollers', { workflowId }); - - await flushPromises(); - - expect(activeWorkflowManager.removeActivationError).not.toHaveBeenCalled(); - expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).not.toHaveBeenCalled(); - expect(push.broadcast).not.toHaveBeenCalled(); - expect(publisher.publishCommand).not.toHaveBeenCalled(); - }); - - it('should handle `display-workflow-activation` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - - eventService.emit('display-workflow-activation', { workflowId }); - - expect(push.broadcast).toHaveBeenCalledWith({ - type: 'workflowActivated', - data: { workflowId }, - }); - }); - - it('should handle `display-workflow-deactivation` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - - eventService.emit('display-workflow-deactivation', { workflowId }); - - expect(push.broadcast).toHaveBeenCalledWith({ - type: 'workflowDeactivated', - data: { workflowId }, - }); - }); - - it('should handle `display-workflow-activation-error` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workflowId = 'test-workflow-id'; - const errorMessage = 'Test error message'; - - eventService.emit('display-workflow-activation-error', { workflowId, errorMessage }); - - expect(push.broadcast).toHaveBeenCalledWith({ - type: 'workflowFailedToActivate', - data: { - workflowId, - errorMessage, - }, - }); - }); - - it('should handle `relay-execution-lifecycle-event` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const pushRef = 'test-push-ref'; - const type = 'executionStarted'; - const data = { - executionId: '123', - mode: 'webhook' as const, - startedAt: new Date(), - workflowId: '456', - flattedRunData: '[]', - }; - - push.hasPushRef.mockReturnValue(true); - - eventService.emit('relay-execution-lifecycle-event', { type, data, pushRef }); - - expect(push.send).toHaveBeenCalledWith({ type, data }, pushRef); - }); - - it('should handle `clear-test-webhooks` event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const webhookKey = 'test-webhook-key'; - const workflowEntity = mock({ id: 'test-workflow-id' }); - const pushRef = 'test-push-ref'; - - push.hasPushRef.mockReturnValue(true); - testWebhooks.toWorkflow.mockReturnValue(mock({ id: 'test-workflow-id' })); - - eventService.emit('clear-test-webhooks', { webhookKey, workflowEntity, pushRef }); - - expect(testWebhooks.clearTimeout).toHaveBeenCalledWith(webhookKey); - expect(testWebhooks.deactivateWebhooks).toHaveBeenCalled(); - }); - - it('should handle `response-to-get-worker-status event', () => { - new PubSubHandler( - eventService, - instanceSettings, - license, - eventbus, - externalSecretsManager, - communityPackagesService, - publisher, - workerStatusService, - activeWorkflowManager, - push, - workflowRepository, - testWebhooks, - ).init(); - - const workerStatus = mock({ senderId: 'worker-1', loadAvg: [123] }); - - eventService.emit('response-to-get-worker-status', workerStatus); - - expect(push.broadcast).toHaveBeenCalledWith({ - type: 'sendWorkerStatusMessage', - data: { - workerId: workerStatus.senderId, - status: workerStatus, - }, - }); - }); - }); - }); -}); diff --git a/packages/cli/src/scaling/__tests__/publisher.service.test.ts b/packages/cli/src/scaling/pubsub/__tests__/publisher.service.test.ts similarity index 97% rename from packages/cli/src/scaling/__tests__/publisher.service.test.ts rename to packages/cli/src/scaling/pubsub/__tests__/publisher.service.test.ts index fb0a340c11..d0da43e703 100644 --- a/packages/cli/src/scaling/__tests__/publisher.service.test.ts +++ b/packages/cli/src/scaling/pubsub/__tests__/publisher.service.test.ts @@ -6,8 +6,8 @@ import config from '@/config'; import type { RedisClientService } from '@/services/redis-client.service'; import { mockLogger } from '@test/mocking'; -import { Publisher } from '../pubsub/publisher.service'; -import type { PubSub } from '../pubsub/pubsub.types'; +import { Publisher } from '../publisher.service'; +import type { PubSub } from '../pubsub.types'; describe('Publisher', () => { beforeEach(() => { diff --git a/packages/cli/src/scaling/pubsub/__tests__/pubsub.registry.test.ts b/packages/cli/src/scaling/pubsub/__tests__/pubsub.registry.test.ts new file mode 100644 index 0000000000..14c596a14f --- /dev/null +++ b/packages/cli/src/scaling/pubsub/__tests__/pubsub.registry.test.ts @@ -0,0 +1,215 @@ +import { OnPubSubEvent, PubSubMetadata } from '@n8n/decorators'; +import { Container, Service } from '@n8n/di'; +import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; + +import { mockLogger } from '@test/mocking'; + +import { PubSubEventBus } from '../pubsub.eventbus'; +import { PubSubRegistry } from '../pubsub.registry'; + +describe('PubSubRegistry', () => { + let metadata: PubSubMetadata; + let pubsubEventBus: PubSubEventBus; + let logger: ReturnType; + const workflowId = 'test-workflow-id'; + + const createTestServiceClass = () => { + @Service() + class TestService { + @OnPubSubEvent('reload-external-secrets-providers', { instanceType: 'main' }) + onMainInstance() {} + + @OnPubSubEvent('restart-event-bus', { instanceType: 'worker' }) + onWorkerInstance() {} + + @OnPubSubEvent('add-webhooks-triggers-and-pollers', { + instanceType: 'main', + instanceRole: 'leader', + }) + onLeaderInstance() {} + + @OnPubSubEvent('restart-event-bus', { + instanceType: 'main', + instanceRole: 'follower', + }) + onFollowerInstance() {} + + @OnPubSubEvent('clear-test-webhooks') + onAllInstances() {} + } + + return TestService; + }; + + const workerInstanceSettings = mock({ instanceType: 'worker' }); + const leaderInstanceSettings = mock({ + instanceType: 'main', + instanceRole: 'leader', + }); + const followerInstanceSettings = mock({ + instanceType: 'main', + instanceRole: 'follower', + }); + + beforeEach(() => { + jest.resetAllMocks(); + Container.reset(); + metadata = Container.get(PubSubMetadata); + pubsubEventBus = Container.get(PubSubEventBus); + logger = mockLogger(); + }); + + it('should call decorated methods when events are emitted', () => { + const TestService = createTestServiceClass(); + const testService = Container.get(TestService); + const onMainInstanceSpy = jest.spyOn(testService, 'onMainInstance'); + + const pubSubRegistry = new PubSubRegistry( + logger, + leaderInstanceSettings, + metadata, + pubsubEventBus, + ); + pubSubRegistry.init(); + + pubsubEventBus.emit('reload-external-secrets-providers'); + expect(onMainInstanceSpy).toHaveBeenCalledTimes(1); + }); + + it('should respect instance type filtering when handling events', () => { + const TestService = createTestServiceClass(); + const testService = Container.get(TestService); + const onMainInstanceSpy = jest.spyOn(testService, 'onMainInstance'); + const onWorkerInstanceSpy = jest.spyOn(testService, 'onWorkerInstance'); + + // Test with main leader instance + const mainPubSubRegistry = new PubSubRegistry( + logger, + leaderInstanceSettings, + metadata, + pubsubEventBus, + ); + mainPubSubRegistry.init(); + + pubsubEventBus.emit('reload-external-secrets-providers'); + expect(onMainInstanceSpy).toHaveBeenCalledTimes(1); + pubsubEventBus.emit('restart-event-bus'); + expect(onWorkerInstanceSpy).not.toHaveBeenCalled(); + + // Test with worker instance + jest.clearAllMocks(); + pubsubEventBus.removeAllListeners(); + + const workerPubSub = new PubSubRegistry( + logger, + workerInstanceSettings, + metadata, + pubsubEventBus, + ); + workerPubSub.init(); + + pubsubEventBus.emit('reload-external-secrets-providers'); + expect(onMainInstanceSpy).not.toHaveBeenCalled(); + pubsubEventBus.emit('restart-event-bus'); + expect(onWorkerInstanceSpy).toHaveBeenCalledTimes(1); + }); + + it('should respect instance role filtering when handling events', () => { + const TestService = createTestServiceClass(); + const testService = Container.get(TestService); + const onLeaderInstanceSpy = jest.spyOn(testService, 'onLeaderInstance'); + const onFollowerInstanceSpy = jest.spyOn(testService, 'onFollowerInstance'); + const onAllInstancesSpy = jest.spyOn(testService, 'onAllInstances'); + + // Test with leader instance + const pubSubRegistry = new PubSubRegistry( + logger, + leaderInstanceSettings, + metadata, + pubsubEventBus, + ); + pubSubRegistry.init(); + + pubsubEventBus.emit('add-webhooks-triggers-and-pollers', { workflowId }); + expect(onLeaderInstanceSpy).toHaveBeenCalledTimes(1); + expect(onLeaderInstanceSpy).toHaveBeenCalledWith({ workflowId }); + + pubsubEventBus.emit('restart-event-bus'); + expect(onFollowerInstanceSpy).not.toHaveBeenCalled(); + + pubsubEventBus.emit('clear-test-webhooks'); + expect(onAllInstancesSpy).toHaveBeenCalledTimes(1); + + // Test with follower instance + jest.clearAllMocks(); + pubsubEventBus.removeAllListeners(); + + const followerPubSubRegistry = new PubSubRegistry( + logger, + followerInstanceSettings, + metadata, + pubsubEventBus, + ); + followerPubSubRegistry.init(); + + pubsubEventBus.emit('add-webhooks-triggers-and-pollers', { workflowId }); + expect(onLeaderInstanceSpy).not.toHaveBeenCalled(); + + pubsubEventBus.emit('restart-event-bus'); + expect(onFollowerInstanceSpy).toHaveBeenCalledTimes(1); + + pubsubEventBus.emit('clear-test-webhooks'); + expect(onAllInstancesSpy).toHaveBeenCalledTimes(1); + }); + + it('should handle both instance type and role filtering together', () => { + const TestService = createTestServiceClass(); + const testService = Container.get(TestService); + const onLeaderInstanceSpy = jest.spyOn(testService, 'onLeaderInstance'); + + // Test with main leader instance + const pubSubRegistry = new PubSubRegistry( + logger, + leaderInstanceSettings, + metadata, + pubsubEventBus, + ); + pubSubRegistry.init(); + + pubsubEventBus.emit('add-webhooks-triggers-and-pollers', { workflowId }); + expect(onLeaderInstanceSpy).toHaveBeenCalledTimes(1); + expect(onLeaderInstanceSpy).toHaveBeenCalledWith({ workflowId }); + }); + + it('should handle dynamic role changes at runtime', () => { + const TestService = createTestServiceClass(); + const testService = Container.get(TestService); + const onLeaderInstanceSpy = jest.spyOn(testService, 'onLeaderInstance'); + + // Create a mutable instance settings object to simulate role changes + const instanceSettings = mock({ + instanceType: 'main', + instanceRole: 'follower', + }); + + const pubSubRegistry = new PubSubRegistry(logger, instanceSettings, metadata, pubsubEventBus); + pubSubRegistry.init(); + + // Initially as follower, event should be ignored + pubsubEventBus.emit('add-webhooks-triggers-and-pollers', { workflowId }); + expect(onLeaderInstanceSpy).not.toHaveBeenCalled(); + + // Change role to leader + instanceSettings.instanceRole = 'leader'; + pubsubEventBus.emit('add-webhooks-triggers-and-pollers', { workflowId }); + expect(onLeaderInstanceSpy).toHaveBeenCalledTimes(1); + expect(onLeaderInstanceSpy).toHaveBeenCalledWith({ workflowId }); + + // Change back to follower + onLeaderInstanceSpy.mockClear(); + instanceSettings.instanceRole = 'follower'; + pubsubEventBus.emit('add-webhooks-triggers-and-pollers', { workflowId }); + expect(onLeaderInstanceSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts b/packages/cli/src/scaling/pubsub/__tests__/subscriber.service.test.ts similarity index 75% rename from packages/cli/src/scaling/__tests__/subscriber.service.test.ts rename to packages/cli/src/scaling/pubsub/__tests__/subscriber.service.test.ts index 4f97208b99..8bbcd28dfc 100644 --- a/packages/cli/src/scaling/__tests__/subscriber.service.test.ts +++ b/packages/cli/src/scaling/pubsub/__tests__/subscriber.service.test.ts @@ -4,7 +4,7 @@ import { mock } from 'jest-mock-extended'; import config from '@/config'; import type { RedisClientService } from '@/services/redis-client.service'; -import { Subscriber } from '../pubsub/subscriber.service'; +import { Subscriber } from '../subscriber.service'; describe('Subscriber', () => { beforeEach(() => { @@ -17,14 +17,14 @@ describe('Subscriber', () => { describe('constructor', () => { it('should init Redis client in scaling mode', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); + const subscriber = new Subscriber(mock(), mock(), mock(), redisClientService); expect(subscriber.getClient()).toEqual(client); }); it('should not init Redis client in regular mode', () => { config.set('executions.mode', 'regular'); - const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); + const subscriber = new Subscriber(mock(), mock(), mock(), redisClientService); expect(subscriber.getClient()).toBeUndefined(); }); @@ -32,7 +32,7 @@ describe('Subscriber', () => { describe('shutdown', () => { it('should disconnect Redis client', () => { - const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); + const subscriber = new Subscriber(mock(), mock(), mock(), redisClientService); subscriber.shutdown(); expect(client.disconnect).toHaveBeenCalled(); }); @@ -40,7 +40,7 @@ describe('Subscriber', () => { describe('subscribe', () => { it('should subscribe to pubsub channel', async () => { - const subscriber = new Subscriber(mock(), redisClientService, mock(), mock()); + const subscriber = new Subscriber(mock(), mock(), mock(), redisClientService); await subscriber.subscribe('n8n.commands'); diff --git a/packages/cli/src/scaling/pubsub/pubsub-handler.ts b/packages/cli/src/scaling/pubsub/pubsub-handler.ts deleted file mode 100644 index 8a31e85d20..0000000000 --- a/packages/cli/src/scaling/pubsub/pubsub-handler.ts +++ /dev/null @@ -1,177 +0,0 @@ -import { WorkflowRepository } from '@n8n/db'; -import { Service } from '@n8n/di'; -import { InstanceSettings } from 'n8n-core'; -import { ensureError } from 'n8n-workflow'; - -import { ActiveWorkflowManager } from '@/active-workflow-manager'; -import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { EventService } from '@/events/event.service'; -import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map'; -import { ExternalSecretsManager } from '@/external-secrets.ee/external-secrets-manager.ee'; -import { License } from '@/license'; -import { Push } from '@/push'; -import { Publisher } from '@/scaling/pubsub/publisher.service'; -import { CommunityPackagesService } from '@/services/community-packages.service'; -import { assertNever } from '@/utils'; -import { TestWebhooks } from '@/webhooks/test-webhooks'; - -import type { PubSub } from './pubsub.types'; -import { WorkerStatusService } from '../worker-status.service.ee'; - -/** - * Responsible for handling events emitted from messages received via a pubsub channel. - */ -@Service() -export class PubSubHandler { - constructor( - private readonly eventService: EventService, - private readonly instanceSettings: InstanceSettings, - private readonly license: License, - private readonly eventbus: MessageEventBus, - private readonly externalSecretsManager: ExternalSecretsManager, - private readonly communityPackagesService: CommunityPackagesService, - private readonly publisher: Publisher, - private readonly workerStatusService: WorkerStatusService, - private readonly activeWorkflowManager: ActiveWorkflowManager, - private readonly push: Push, - private readonly workflowRepository: WorkflowRepository, - private readonly testWebhooks: TestWebhooks, - ) {} - - init() { - switch (this.instanceSettings.instanceType) { - case 'webhook': - this.setupHandlers(this.commonHandlers); - break; - case 'worker': - this.setupHandlers({ - ...this.commonHandlers, - 'get-worker-status': async () => - await this.publisher.publishWorkerResponse({ - senderId: this.instanceSettings.hostId, - response: 'response-to-get-worker-status', - payload: this.workerStatusService.generateStatus(), - }), - }); - break; - case 'main': - this.setupHandlers({ - ...this.commonHandlers, - ...this.multiMainHandlers, - 'response-to-get-worker-status': async (payload) => - this.push.broadcast({ - type: 'sendWorkerStatusMessage', - data: { - workerId: payload.senderId, - status: payload, - }, - }), - }); - - break; - default: - assertNever(this.instanceSettings.instanceType); - } - } - - private setupHandlers( - map: { - [EventName in EventNames]?: (event: PubSubEventMap[EventName]) => void | Promise; - }, - ) { - for (const [eventName, handlerFn] of Object.entries(map) as Array< - [EventNames, (event: PubSubEventMap[EventNames]) => void | Promise] - >) { - this.eventService.on(eventName, async (event) => { - await handlerFn(event); - }); - } - } - - private commonHandlers: { - [EventName in keyof PubSub.CommonEvents]: (event: PubSubEventMap[EventName]) => Promise; - } = { - 'reload-license': async () => await this.license.reload(), - 'restart-event-bus': async () => await this.eventbus.restart(), - 'reload-external-secrets-providers': async () => - await this.externalSecretsManager.reloadAllProviders(), - 'community-package-install': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-update': async ({ packageName, packageVersion }) => - await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion), - 'community-package-uninstall': async ({ packageName }) => - await this.communityPackagesService.removeNpmPackage(packageName), - }; - - private multiMainHandlers: { - [EventName in keyof PubSub.MultiMainEvents]: ( - event: PubSubEventMap[EventName], - ) => Promise; - } = { - 'add-webhooks-triggers-and-pollers': async ({ workflowId }) => { - if (this.instanceSettings.isFollower) return; - - try { - await this.activeWorkflowManager.add(workflowId, 'activate', undefined, { - shouldPublish: false, // prevent leader from re-publishing message - }); - - this.push.broadcast({ type: 'workflowActivated', data: { workflowId } }); - - await this.publisher.publishCommand({ - command: 'display-workflow-activation', - payload: { workflowId }, - }); // instruct followers to show activation in UI - } catch (e) { - const error = ensureError(e); - const { message } = error; - - await this.workflowRepository.update(workflowId, { active: false }); - - this.push.broadcast({ - type: 'workflowFailedToActivate', - data: { workflowId, errorMessage: message }, - }); - - await this.publisher.publishCommand({ - command: 'display-workflow-activation-error', - payload: { workflowId, errorMessage: message }, - }); // instruct followers to show activation error in UI - } - }, - 'remove-triggers-and-pollers': async ({ workflowId }) => { - if (this.instanceSettings.isFollower) return; - - await this.activeWorkflowManager.removeActivationError(workflowId); - await this.activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId); - - this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }); - - // instruct followers to show workflow deactivation in UI - await this.publisher.publishCommand({ - command: 'display-workflow-deactivation', - payload: { workflowId }, - }); - }, - 'display-workflow-activation': async ({ workflowId }) => - this.push.broadcast({ type: 'workflowActivated', data: { workflowId } }), - 'display-workflow-deactivation': async ({ workflowId }) => - this.push.broadcast({ type: 'workflowDeactivated', data: { workflowId } }), - 'display-workflow-activation-error': async ({ workflowId, errorMessage }) => - this.push.broadcast({ type: 'workflowFailedToActivate', data: { workflowId, errorMessage } }), - 'relay-execution-lifecycle-event': async ({ pushRef, ...pushMsg }) => { - if (!this.push.hasPushRef(pushRef)) return; - - this.push.send(pushMsg, pushRef); - }, - 'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => { - if (!this.push.hasPushRef(pushRef)) return; - - this.testWebhooks.clearTimeout(webhookKey); - - const workflow = this.testWebhooks.toWorkflow(workflowEntity); - - await this.testWebhooks.deactivateWebhooks(workflow); - }, - }; -} diff --git a/packages/cli/src/events/maps/pub-sub.event-map.ts b/packages/cli/src/scaling/pubsub/pubsub.event-map.ts similarity index 100% rename from packages/cli/src/events/maps/pub-sub.event-map.ts rename to packages/cli/src/scaling/pubsub/pubsub.event-map.ts index e4e002f779..fa78f92574 100644 --- a/packages/cli/src/events/maps/pub-sub.event-map.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.event-map.ts @@ -1,8 +1,6 @@ import type { PushMessage, WorkerStatus } from '@n8n/api-types'; import type { IWorkflowBase } from 'n8n-workflow'; -export type PubSubEventMap = PubSubCommandMap & PubSubWorkerResponseMap; - export type PubSubCommandMap = { // #region Lifecycle @@ -79,3 +77,5 @@ export type PubSubCommandMap = { export type PubSubWorkerResponseMap = { 'response-to-get-worker-status': WorkerStatus; }; + +export type PubSubEventMap = PubSubCommandMap & PubSubWorkerResponseMap; diff --git a/packages/cli/src/scaling/pubsub/pubsub.eventbus.ts b/packages/cli/src/scaling/pubsub/pubsub.eventbus.ts new file mode 100644 index 0000000000..e4185d69d6 --- /dev/null +++ b/packages/cli/src/scaling/pubsub/pubsub.eventbus.ts @@ -0,0 +1,8 @@ +import { Service } from '@n8n/di'; + +import { TypedEmitter } from '@/typed-emitter'; + +import type { PubSubEventMap } from './pubsub.event-map'; + +@Service() +export class PubSubEventBus extends TypedEmitter {} diff --git a/packages/cli/src/scaling/pubsub/pubsub.registry.ts b/packages/cli/src/scaling/pubsub/pubsub.registry.ts new file mode 100644 index 0000000000..9b622ad4a8 --- /dev/null +++ b/packages/cli/src/scaling/pubsub/pubsub.registry.ts @@ -0,0 +1,39 @@ +import { Logger } from '@n8n/backend-common'; +import { PubSubMetadata } from '@n8n/decorators'; +import { Container, Service } from '@n8n/di'; +import { InstanceSettings } from 'n8n-core'; + +import { PubSubEventBus } from './pubsub.eventbus'; + +@Service() +export class PubSubRegistry { + constructor( + private readonly logger: Logger, + private readonly instanceSettings: InstanceSettings, + private readonly pubSubMetadata: PubSubMetadata, + private readonly pubsubEventBus: PubSubEventBus, + ) { + this.logger = this.logger.scoped('pubsub'); + } + + init() { + const { instanceSettings, pubSubMetadata } = this; + const handlers = pubSubMetadata.getHandlers(); + for (const { eventHandlerClass, methodName, eventName, filter } of handlers) { + const handlerClass = Container.get(eventHandlerClass); + if (!filter?.instanceType || filter.instanceType === instanceSettings.instanceType) { + this.logger.debug( + `Registered a "${eventName}" event handler on ${eventHandlerClass.name}#${methodName}`, + ); + this.pubsubEventBus.on(eventName, async (...args: unknown[]) => { + // Since the instance role can change, this check needs to be in the event listener + const shouldTrigger = + filter?.instanceType !== 'main' || + !filter.instanceRole || + filter.instanceRole === instanceSettings.instanceRole; + if (shouldTrigger) await handlerClass[methodName].call(handlerClass, ...args); + }); + } + } + } +} diff --git a/packages/cli/src/scaling/pubsub/pubsub.types.ts b/packages/cli/src/scaling/pubsub/pubsub.types.ts index 501185d07e..d0d24b6e3c 100644 --- a/packages/cli/src/scaling/pubsub/pubsub.types.ts +++ b/packages/cli/src/scaling/pubsub/pubsub.types.ts @@ -1,10 +1,6 @@ -import type { - PubSubCommandMap, - PubSubEventMap, - PubSubWorkerResponseMap, -} from '@/events/maps/pub-sub.event-map'; import type { Resolve } from '@/utlity.types'; +import type { PubSubCommandMap, PubSubWorkerResponseMap } from './pubsub.event-map'; import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants'; export namespace PubSub { @@ -104,34 +100,4 @@ export namespace PubSub { /** Response sent via the `n8n.worker-response` pubsub channel. */ export type WorkerResponse = ToWorkerResponse<'response-to-get-worker-status'>; - - // ---------------------------------- - // events - // ---------------------------------- - - /** - * Of all events emitted from pubsub messages, those whose handlers - * are all present in main, worker, and webhook processes. - */ - export type CommonEvents = Pick< - PubSubEventMap, - | 'reload-license' - | 'restart-event-bus' - | 'reload-external-secrets-providers' - | 'community-package-install' - | 'community-package-update' - | 'community-package-uninstall' - >; - - /** Multi-main events emitted from pubsub messages. */ - export type MultiMainEvents = Pick< - PubSubEventMap, - | 'add-webhooks-triggers-and-pollers' - | 'remove-triggers-and-pollers' - | 'display-workflow-activation' - | 'display-workflow-deactivation' - | 'display-workflow-activation-error' - | 'relay-execution-lifecycle-event' - | 'clear-test-webhooks' - >; } diff --git a/packages/cli/src/scaling/pubsub/subscriber.service.ts b/packages/cli/src/scaling/pubsub/subscriber.service.ts index 0baa8f4cae..b77cf0d5e5 100644 --- a/packages/cli/src/scaling/pubsub/subscriber.service.ts +++ b/packages/cli/src/scaling/pubsub/subscriber.service.ts @@ -7,9 +7,9 @@ import { jsonParse } from 'n8n-workflow'; import type { LogMetadata } from 'n8n-workflow'; import config from '@/config'; -import { EventService } from '@/events/event.service'; import { RedisClientService } from '@/services/redis-client.service'; +import { PubSubEventBus } from './pubsub.eventbus'; import type { PubSub } from './pubsub.types'; /** @@ -21,9 +21,9 @@ export class Subscriber { constructor( private readonly logger: Logger, - private readonly redisClientService: RedisClientService, - private readonly eventService: EventService, private readonly instanceSettings: InstanceSettings, + private readonly pubsubEventBus: PubSubEventBus, + private readonly redisClientService: RedisClientService, ) { // @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead. if (config.getEnv('executions.mode') !== 'queue') return; @@ -34,7 +34,7 @@ export class Subscriber { const handlerFn = (msg: PubSub.Command | PubSub.WorkerResponse) => { const eventName = 'command' in msg ? msg.command : msg.response; - this.eventService.emit(eventName, msg.payload); + this.pubsubEventBus.emit(eventName, msg.payload); }; const debouncedHandlerFn = debounce(handlerFn, 300); diff --git a/packages/cli/src/scaling/worker-status.service.ee.ts b/packages/cli/src/scaling/worker-status.service.ee.ts index 226ee3f70f..b566813822 100644 --- a/packages/cli/src/scaling/worker-status.service.ee.ts +++ b/packages/cli/src/scaling/worker-status.service.ee.ts @@ -1,20 +1,51 @@ -import type { WorkerStatus } from '@n8n/api-types'; +import { WorkerStatus } from '@n8n/api-types'; +import { OnPubSubEvent } from '@n8n/decorators'; import { Service } from '@n8n/di'; import { InstanceSettings } from 'n8n-core'; import os from 'node:os'; import { N8N_VERSION } from '@/constants'; +import { Push } from '@/push'; import { JobProcessor } from './job-processor'; +import { Publisher } from './pubsub/publisher.service'; @Service() export class WorkerStatusService { constructor( private readonly jobProcessor: JobProcessor, private readonly instanceSettings: InstanceSettings, + private readonly publisher: Publisher, + private readonly push: Push, ) {} - generateStatus(): WorkerStatus { + async requestWorkerStatus() { + if (this.instanceSettings.instanceType !== 'main') return; + + return await this.publisher.publishCommand({ command: 'get-worker-status' }); + } + + @OnPubSubEvent('response-to-get-worker-status', { instanceType: 'main' }) + handleWorkerStatusResponse(payload: WorkerStatus) { + this.push.broadcast({ + type: 'sendWorkerStatusMessage', + data: { + workerId: payload.senderId, + status: payload, + }, + }); + } + + @OnPubSubEvent('get-worker-status', { instanceType: 'worker' }) + async publishWorkerResponse() { + await this.publisher.publishWorkerResponse({ + senderId: this.instanceSettings.hostId, + response: 'response-to-get-worker-status', + payload: this.generateStatus(), + }); + } + + private generateStatus(): WorkerStatus { return { senderId: this.instanceSettings.hostId, runningJobsSummary: this.jobProcessor.getRunningJobsSummary(), diff --git a/packages/cli/src/services/community-packages.service.ts b/packages/cli/src/services/community-packages.service.ts index 2c0d1cd0a7..f94ceb9631 100644 --- a/packages/cli/src/services/community-packages.service.ts +++ b/packages/cli/src/services/community-packages.service.ts @@ -3,6 +3,7 @@ import { GlobalConfig } from '@n8n/config'; import { LICENSE_FEATURES } from '@n8n/constants'; import type { InstalledPackages } from '@n8n/db'; import { InstalledPackagesRepository } from '@n8n/db'; +import { OnPubSubEvent } from '@n8n/decorators'; import { Service } from '@n8n/di'; import axios from 'axios'; import { exec } from 'child_process'; @@ -447,14 +448,28 @@ export class CommunityPackagesService { } } - async installOrUpdateNpmPackage(packageName: string, packageVersion: string) { + @OnPubSubEvent('community-package-install') + @OnPubSubEvent('community-package-update') + async handleInstallEvent({ + packageName, + packageVersion, + }: { packageName: string; packageVersion: string }) { + await this.installOrUpdateNpmPackage(packageName, packageVersion); + } + + @OnPubSubEvent('community-package-uninstall') + async handleUninstallEvent({ packageName }: { packageName: string }) { + await this.removeNpmPackage(packageName); + } + + private async installOrUpdateNpmPackage(packageName: string, packageVersion: string) { await this.downloadPackage(packageName, packageVersion); await this.loadNodesAndCredentials.loadPackage(packageName); await this.loadNodesAndCredentials.postProcessLoaders(); this.logger.info(`Community package installed: ${packageName}`); } - async removeNpmPackage(packageName: string) { + private async removeNpmPackage(packageName: string) { await this.deletePackageDirectory(packageName); await this.loadNodesAndCredentials.unloadPackage(packageName); await this.loadNodesAndCredentials.postProcessLoaders(); diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index 4ff5f16d73..f443a3f5aa 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -1,3 +1,4 @@ +import { OnPubSubEvent } from '@n8n/decorators'; import { Service } from '@n8n/di'; import type express from 'express'; import { InstanceSettings } from 'n8n-core'; @@ -168,6 +169,25 @@ export class TestWebhooks implements IWebhookManager { }); } + @OnPubSubEvent('clear-test-webhooks', { instanceType: 'main' }) + async handleClearTestWebhooks({ + webhookKey, + workflowEntity, + pushRef, + }: { + webhookKey: string; + workflowEntity: IWorkflowBase; + pushRef: string; + }) { + if (!this.push.hasPushRef(pushRef)) return; + + this.clearTimeout(webhookKey); + + const workflow = this.toWorkflow(workflowEntity); + + await this.deactivateWebhooks(workflow); + } + clearTimeout(key: string) { const timeout = this.timeouts[key]; diff --git a/packages/core/package.json b/packages/core/package.json index 1afe9c04b9..22dec528f3 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -41,6 +41,7 @@ "@n8n/backend-common": "workspace:^", "@n8n/client-oauth2": "workspace:*", "@n8n/config": "workspace:*", + "@n8n/constants": "workspace:*", "@n8n/decorators": "workspace:*", "@n8n/di": "workspace:*", "@sentry/node": "catalog:", diff --git a/packages/core/src/errors/error-reporter.ts b/packages/core/src/errors/error-reporter.ts index fae9af232c..5b7a492056 100644 --- a/packages/core/src/errors/error-reporter.ts +++ b/packages/core/src/errors/error-reporter.ts @@ -1,4 +1,5 @@ import { Logger } from '@n8n/backend-common'; +import type { InstanceType } from '@n8n/constants'; import { Service } from '@n8n/di'; import type { NodeOptions } from '@sentry/node'; import type { ErrorEvent, EventHint } from '@sentry/types'; @@ -7,8 +8,6 @@ import type { ReportingOptions } from 'n8n-workflow'; import { ApplicationError, ExecutionCancelledError, BaseError } from 'n8n-workflow'; import { createHash } from 'node:crypto'; -import type { InstanceType } from '@/instance-settings'; - type ErrorReporterInitOptions = { serverType: InstanceType | 'task_runner'; dsn: string; diff --git a/packages/core/src/instance-settings/index.ts b/packages/core/src/instance-settings/index.ts index 74b05f5d98..378ca16cf3 100644 --- a/packages/core/src/instance-settings/index.ts +++ b/packages/core/src/instance-settings/index.ts @@ -1 +1 @@ -export { InstanceSettings, InstanceType } from './instance-settings'; +export { InstanceSettings } from './instance-settings'; diff --git a/packages/core/src/instance-settings/instance-settings.ts b/packages/core/src/instance-settings/instance-settings.ts index 039bf212dc..84f2b26b60 100644 --- a/packages/core/src/instance-settings/instance-settings.ts +++ b/packages/core/src/instance-settings/instance-settings.ts @@ -1,5 +1,6 @@ import { inTest, Logger } from '@n8n/backend-common'; import { InstanceSettingsConfig } from '@n8n/config'; +import type { InstanceRole, InstanceType } from '@n8n/constants'; import { Memoized } from '@n8n/decorators'; import { Service } from '@n8n/di'; import { createHash, randomBytes } from 'crypto'; @@ -22,10 +23,6 @@ interface WritableSettings { type Settings = ReadOnlySettings & WritableSettings; -type InstanceRole = 'unset' | 'leader' | 'follower'; - -export type InstanceType = 'main' | 'webhook' | 'worker'; - @Service() export class InstanceSettings { /** The path to the n8n folder in which all n8n related data gets saved */ @@ -60,10 +57,8 @@ export class InstanceSettings { private readonly config: InstanceSettingsConfig, private readonly logger: Logger, ) { - const command = process.argv[2]; - this.instanceType = ['webhook', 'worker'].includes(command) - ? (command as InstanceType) - : 'main'; + const command = process.argv[2] as InstanceType; + this.instanceType = ['webhook', 'worker'].includes(command) ? command : 'main'; this.hostId = `${this.instanceType}-${nanoid()}`; this.settings = this.loadOrCreate(); diff --git a/packages/core/tsconfig.json b/packages/core/tsconfig.json index b32b0c606e..a839572d0b 100644 --- a/packages/core/tsconfig.json +++ b/packages/core/tsconfig.json @@ -22,6 +22,7 @@ { "path": "../@n8n/decorators/tsconfig.build.json" }, { "path": "../@n8n/backend-common/tsconfig.build.json" }, { "path": "../@n8n/config/tsconfig.build.json" }, + { "path": "../@n8n/constants/tsconfig.build.json" }, { "path": "../@n8n/di/tsconfig.build.json" }, { "path": "../@n8n/client-oauth2/tsconfig.build.json" } ] diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index bf07c64274..1f45a71409 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1507,6 +1507,9 @@ importers: '@n8n/config': specifier: workspace:* version: link:../@n8n/config + '@n8n/constants': + specifier: workspace:* + version: link:../@n8n/constants '@n8n/decorators': specifier: workspace:* version: link:../@n8n/decorators