From d37acdb8738115a55ce7ce6caf806ec4583deb8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 16 Oct 2024 17:34:32 +0200 Subject: [PATCH] refactor(core): Make orchestration service smaller (#11275) --- .../cli/src/__tests__/wait-tracker.test.ts | 2 +- packages/cli/src/active-workflow-manager.ts | 41 +++++++++-- .../controllers/orchestration.controller.ts | 18 ++--- .../message-event-bus/message-event-bus.ts | 8 +-- .../external-secrets-manager.ee.test.ts | 1 + .../external-secrets-manager.ee.ts | 15 ++-- packages/cli/src/push/__tests__/index.test.ts | 4 +- packages/cli/src/push/index.ts | 12 +++- packages/cli/src/requests.ts | 9 --- .../__tests__/orchestration.service.test.ts | 32 --------- .../services/community-packages.service.ts | 18 ++--- .../cli/src/services/orchestration.service.ts | 68 ------------------ .../webhooks/__tests__/test-webhooks.test.ts | 2 +- packages/cli/src/webhooks/test-webhooks.ts | 8 ++- .../active-workflow-manager.test.ts | 70 +++++++++++++++++++ .../collaboration.service.test.ts | 2 +- .../cli/test/integration/eventbus.ee.test.ts | 3 + .../external-secrets.api.test.ts | 1 + .../test/integration/shared/utils/index.ts | 1 - 19 files changed, 158 insertions(+), 157 deletions(-) diff --git a/packages/cli/src/__tests__/wait-tracker.test.ts b/packages/cli/src/__tests__/wait-tracker.test.ts index e51cd88ccb..2473713891 100644 --- a/packages/cli/src/__tests__/wait-tracker.test.ts +++ b/packages/cli/src/__tests__/wait-tracker.test.ts @@ -13,7 +13,7 @@ jest.useFakeTimers(); describe('WaitTracker', () => { const executionRepository = mock(); const multiMainSetup = mock(); - const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup); + const orchestrationService = new OrchestrationService(mock(), multiMainSetup); const instanceSettings = mock({ isLeader: true }); const execution = mock({ diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 4127909e49..189c446b65 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -48,6 +48,7 @@ import { WorkflowExecutionService } from '@/workflows/workflow-execution.service import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { ExecutionService } from './executions/execution.service'; +import { Publisher } from './scaling/pubsub/publisher.service'; interface QueuedActivation { activationMode: WorkflowActivateMode; @@ -75,6 +76,7 @@ export class ActiveWorkflowManager { private readonly activeWorkflowsService: ActiveWorkflowsService, private readonly workflowExecutionService: WorkflowExecutionService, private readonly instanceSettings: InstanceSettings, + private readonly publisher: Publisher, ) {} async init() { @@ -517,8 +519,9 @@ export class ActiveWorkflowManager { { shouldPublish } = { shouldPublish: true }, ) { if (this.orchestrationService.isMultiMainSetupEnabled && shouldPublish) { - await this.orchestrationService.publish('add-webhooks-triggers-and-pollers', { - workflowId, + void this.publisher.publishCommand({ + command: 'add-webhooks-triggers-and-pollers', + payload: { workflowId }, }); return; @@ -526,8 +529,8 @@ export class ActiveWorkflowManager { let workflow: Workflow; - const shouldAddWebhooks = this.orchestrationService.shouldAddWebhooks(activationMode); - const shouldAddTriggersAndPollers = this.orchestrationService.shouldAddTriggersAndPollers(); + const shouldAddWebhooks = this.shouldAddWebhooks(activationMode); + const shouldAddTriggersAndPollers = this.shouldAddTriggersAndPollers(); const shouldDisplayActivationMessage = (shouldAddWebhooks || shouldAddTriggersAndPollers) && @@ -717,7 +720,10 @@ export class ActiveWorkflowManager { ); } - await this.orchestrationService.publish('remove-triggers-and-pollers', { workflowId }); + void this.publisher.publishCommand({ + command: 'remove-triggers-and-pollers', + payload: { workflowId }, + }); return; } @@ -810,4 +816,29 @@ export class ActiveWorkflowManager { async removeActivationError(workflowId: string) { await this.activationErrorsService.deregister(workflowId); } + + /** + * Whether this instance may add webhooks to the `webhook_entity` table. + */ + shouldAddWebhooks(activationMode: WorkflowActivateMode) { + // Always try to populate the webhook entity table as well as register the webhooks + // to prevent issues with users upgrading from a version < 1.15, where the webhook entity + // was cleared on shutdown to anything past 1.28.0, where we stopped populating it on init, + // causing all webhooks to break + if (activationMode === 'init') return true; + + if (activationMode === 'leadershipChange') return false; + + return this.instanceSettings.isLeader; // 'update' or 'activate' + } + + /** + * Whether this instance may add triggers and pollers to memory. + * + * In both single- and multi-main setup, only the leader is allowed to manage + * triggers and pollers in memory, to ensure they are not duplicated. + */ + shouldAddTriggersAndPollers() { + return this.instanceSettings.isLeader; + } } diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index db1d690a3e..14d38cfa43 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,31 +1,23 @@ import { Post, RestController, GlobalScope } from '@/decorators'; import { License } from '@/license'; -import { OrchestrationRequest } from '@/requests'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; @RestController('/orchestration') export class OrchestrationController { constructor( - private readonly orchestrationService: OrchestrationService, private readonly licenseService: License, + private readonly publisher: Publisher, ) {} /** - * These endpoints do not return anything, they just trigger the message to + * This endpoint does not return anything, it just triggers the message to * the workers to respond on Redis with their status. */ - @GlobalScope('orchestration:read') - @Post('/worker/status/:id') - async getWorkersStatus(req: OrchestrationRequest.Get) { - if (!this.licenseService.isWorkerViewLicensed()) return; - const id = req.params.id; - return await this.orchestrationService.getWorkerStatus(id); - } - @GlobalScope('orchestration:read') @Post('/worker/status') async getWorkersStatusAll() { if (!this.licenseService.isWorkerViewLicensed()) return; - return await this.orchestrationService.getWorkerStatus(); + + return await this.publisher.publishCommand({ command: 'get-worker-status' }); } } diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index 0f622c2317..3cf5a5a5d0 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -14,7 +14,7 @@ import { ExecutionRepository } from '@/databases/repositories/execution.reposito import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { License } from '@/license'; import { Logger } from '@/logging/logger.service'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { ExecutionRecoveryService } from '../../executions/execution-recovery.service'; import type { EventMessageTypes } from '../event-message-classes/'; @@ -70,7 +70,7 @@ export class MessageEventBus extends EventEmitter { private readonly executionRepository: ExecutionRepository, private readonly eventDestinationsRepository: EventDestinationsRepository, private readonly workflowRepository: WorkflowRepository, - private readonly orchestrationService: OrchestrationService, + private readonly publisher: Publisher, private readonly recoveryService: ExecutionRecoveryService, private readonly license: License, private readonly globalConfig: GlobalConfig, @@ -210,7 +210,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await this.orchestrationService.publish('restart-event-bus'); + void this.publisher.publishCommand({ command: 'restart-event-bus' }); } return destination; } @@ -236,7 +236,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await this.orchestrationService.publish('restart-event-bus'); + void this.publisher.publishCommand({ command: 'restart-event-bus' }); } return result; } diff --git a/packages/cli/src/external-secrets/__tests__/external-secrets-manager.ee.test.ts b/packages/cli/src/external-secrets/__tests__/external-secrets-manager.ee.test.ts index 97547ecf13..b1a87271f9 100644 --- a/packages/cli/src/external-secrets/__tests__/external-secrets-manager.ee.test.ts +++ b/packages/cli/src/external-secrets/__tests__/external-secrets-manager.ee.test.ts @@ -55,6 +55,7 @@ describe('External Secrets Manager', () => { providersMock, cipher, mock(), + mock(), ); }); diff --git a/packages/cli/src/external-secrets/external-secrets-manager.ee.ts b/packages/cli/src/external-secrets/external-secrets-manager.ee.ts index e175f2969c..ec7c3ed0cf 100644 --- a/packages/cli/src/external-secrets/external-secrets-manager.ee.ts +++ b/packages/cli/src/external-secrets/external-secrets-manager.ee.ts @@ -1,6 +1,6 @@ import { Cipher } from 'n8n-core'; import { jsonParse, type IDataObject, ApplicationError } from 'n8n-workflow'; -import Container, { Service } from 'typedi'; +import { Service } from 'typedi'; import { SettingsRepository } from '@/databases/repositories/settings.repository'; import { EventService } from '@/events/event.service'; @@ -11,7 +11,7 @@ import type { } from '@/interfaces'; import { License } from '@/license'; import { Logger } from '@/logging/logger.service'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { EXTERNAL_SECRETS_INITIAL_BACKOFF, EXTERNAL_SECRETS_MAX_BACKOFF } from './constants'; import { updateIntervalTime } from './external-secrets-helper.ee'; @@ -38,6 +38,7 @@ export class ExternalSecretsManager { private readonly secretsProviders: ExternalSecretsProviders, private readonly cipher: Cipher, private readonly eventService: EventService, + private readonly publisher: Publisher, ) {} async init(): Promise { @@ -78,8 +79,8 @@ export class ExternalSecretsManager { } } - async broadcastReloadExternalSecretsProviders() { - await Container.get(OrchestrationService).publish('reload-external-secrets-providers'); + broadcastReloadExternalSecretsProviders() { + void this.publisher.publishCommand({ command: 'reload-external-secrets-providers' }); } private decryptSecretsSettings(value: string): ExternalSecretsSettings { @@ -280,7 +281,7 @@ export class ExternalSecretsManager { await this.saveAndSetSettings(settings, this.settingsRepo); this.cachedSettings = settings; await this.reloadProvider(provider); - await this.broadcastReloadExternalSecretsProviders(); + this.broadcastReloadExternalSecretsProviders(); void this.trackProviderSave(provider, isNewProvider, userId); } @@ -300,7 +301,7 @@ export class ExternalSecretsManager { this.cachedSettings = settings; await this.reloadProvider(provider); await this.updateSecrets(); - await this.broadcastReloadExternalSecretsProviders(); + this.broadcastReloadExternalSecretsProviders(); } private async trackProviderSave(vaultType: string, isNew: boolean, userId?: string) { @@ -380,7 +381,7 @@ export class ExternalSecretsManager { } try { await this.providers[provider].update(); - await this.broadcastReloadExternalSecretsProviders(); + this.broadcastReloadExternalSecretsProviders(); return true; } catch { return false; diff --git a/packages/cli/src/push/__tests__/index.test.ts b/packages/cli/src/push/__tests__/index.test.ts index 6230c63397..03457926b1 100644 --- a/packages/cli/src/push/__tests__/index.test.ts +++ b/packages/cli/src/push/__tests__/index.test.ts @@ -20,7 +20,7 @@ describe('Push', () => { test('should validate pushRef on requests for websocket backend', () => { config.set('push.backend', 'websocket'); - const push = new Push(mock()); + const push = new Push(mock(), mock()); const ws = mock(); const request = mock({ user, ws }); request.query = { pushRef: '' }; @@ -33,7 +33,7 @@ describe('Push', () => { test('should validate pushRef on requests for SSE backend', () => { config.set('push.backend', 'sse'); - const push = new Push(mock()); + const push = new Push(mock(), mock()); const request = mock({ user, ws: undefined }); request.query = { pushRef: '' }; expect(() => push.handleRequest(request, mock())).toThrow(BadRequestError); diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 232864968d..bfbfb43a51 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -12,6 +12,7 @@ import config from '@/config'; import type { User } from '@/databases/entities/user'; import { OnShutdown } from '@/decorators/on-shutdown'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { OrchestrationService } from '@/services/orchestration.service'; import { TypedEmitter } from '@/typed-emitter'; @@ -39,7 +40,10 @@ export class Push extends TypedEmitter { private backend = useWebSockets ? Container.get(WebSocketPush) : Container.get(SSEPush); - constructor(private readonly orchestrationService: OrchestrationService) { + constructor( + private readonly orchestrationService: OrchestrationService, + private readonly publisher: Publisher, + ) { super(); if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); @@ -89,8 +93,10 @@ export class Push extends TypedEmitter { * relay the former's execution lifecycle events to the creator's frontend. */ if (this.orchestrationService.isMultiMainSetupEnabled && !this.backend.hasPushRef(pushRef)) { - const payload = { type, args: data, pushRef }; - void this.orchestrationService.publish('relay-execution-lifecycle-event', payload); + void this.publisher.publishCommand({ + command: 'relay-execution-lifecycle-event', + payload: { type, args: data, pushRef }, + }); return; } diff --git a/packages/cli/src/requests.ts b/packages/cli/src/requests.ts index e25a244f5f..ffc04925a3 100644 --- a/packages/cli/src/requests.ts +++ b/packages/cli/src/requests.ts @@ -478,15 +478,6 @@ export declare namespace ExternalSecretsRequest { type UpdateProvider = AuthenticatedRequest<{ provider: string }>; } -// ---------------------------------- -// /orchestration -// ---------------------------------- -// -export declare namespace OrchestrationRequest { - type GetAll = AuthenticatedRequest; - type Get = AuthenticatedRequest<{ id: string }, {}, {}, {}>; -} - // ---------------------------------- // /workflow-history // ---------------------------------- diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index 0169462891..a8e72c49bf 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -1,7 +1,6 @@ import type Redis from 'ioredis'; import { mock } from 'jest-mock-extended'; import { InstanceSettings } from 'n8n-core'; -import type { WorkflowActivateMode } from 'n8n-workflow'; import Container from 'typedi'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; @@ -45,35 +44,4 @@ describe('Orchestration Service', () => { // @ts-expect-error Private field expect(os.publisher).toBeDefined(); }); - - describe('shouldAddWebhooks', () => { - test('should return true for init', () => { - // We want to ensure that webhooks are populated on init - // more https://github.com/n8n-io/n8n/pull/8830 - const result = os.shouldAddWebhooks('init'); - expect(result).toBe(true); - }); - - test('should return false for leadershipChange', () => { - const result = os.shouldAddWebhooks('leadershipChange'); - expect(result).toBe(false); - }); - - test('should return true for update or activate when is leader', () => { - const modes = ['update', 'activate'] as WorkflowActivateMode[]; - for (const mode of modes) { - const result = os.shouldAddWebhooks(mode); - expect(result).toBe(true); - } - }); - - test('should return false for update or activate when not leader', () => { - instanceSettings.markAsFollower(); - const modes = ['update', 'activate'] as WorkflowActivateMode[]; - for (const mode of modes) { - const result = os.shouldAddWebhooks(mode); - expect(result).toBe(false); - } - }); - }); }); diff --git a/packages/cli/src/services/community-packages.service.ts b/packages/cli/src/services/community-packages.service.ts index b157119cf2..4906a6ef33 100644 --- a/packages/cli/src/services/community-packages.service.ts +++ b/packages/cli/src/services/community-packages.service.ts @@ -23,10 +23,9 @@ import type { CommunityPackages } from '@/interfaces'; import { License } from '@/license'; import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { Logger } from '@/logging/logger.service'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { toError } from '@/utils'; -import { OrchestrationService } from './orchestration.service'; - const DEFAULT_REGISTRY = 'https://registry.npmjs.org'; const { @@ -60,7 +59,7 @@ export class CommunityPackagesService { private readonly logger: Logger, private readonly installedPackageRepository: InstalledPackagesRepository, private readonly loadNodesAndCredentials: LoadNodesAndCredentials, - private readonly orchestrationService: OrchestrationService, + private readonly publisher: Publisher, private readonly license: License, private readonly globalConfig: GlobalConfig, ) {} @@ -322,7 +321,10 @@ export class CommunityPackagesService { async removePackage(packageName: string, installedPackage: InstalledPackages): Promise { await this.removeNpmPackage(packageName); await this.removePackageFromDatabase(installedPackage); - await this.orchestrationService.publish('community-package-uninstall', { packageName }); + void this.publisher.publishCommand({ + command: 'community-package-uninstall', + payload: { packageName }, + }); } private getNpmRegistry() { @@ -368,10 +370,10 @@ export class CommunityPackagesService { await this.removePackageFromDatabase(options.installedPackage); } const installedPackage = await this.persistInstalledPackage(loader); - await this.orchestrationService.publish( - isUpdate ? 'community-package-update' : 'community-package-install', - { packageName, packageVersion }, - ); + void this.publisher.publishCommand({ + command: isUpdate ? 'community-package-update' : 'community-package-install', + payload: { packageName, packageVersion }, + }); await this.loadNodesAndCredentials.postProcessLoaders(); this.logger.info(`Community package installed: ${packageName}`); return installedPackage; diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 64dbd0ddae..61a2aff540 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -1,10 +1,7 @@ import { InstanceSettings } from 'n8n-core'; -import type { WorkflowActivateMode } from 'n8n-workflow'; import Container, { Service } from 'typedi'; import config from '@/config'; -import type { PubSubCommandMap } from '@/events/maps/pub-sub.event-map'; -import { Logger } from '@/logging/logger.service'; import type { Publisher } from '@/scaling/pubsub/publisher.service'; import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; @@ -13,7 +10,6 @@ import { MultiMainSetup } from './orchestration/main/multi-main-setup.ee'; @Service() export class OrchestrationService { constructor( - private readonly logger: Logger, readonly instanceSettings: InstanceSettings, readonly multiMainSetup: MultiMainSetup, ) {} @@ -78,68 +74,4 @@ export class OrchestrationService { this.isInitialized = false; } - - // ---------------------------------- - // pubsub - // ---------------------------------- - - async publish( - commandKey: CommandKey, - payload?: PubSubCommandMap[CommandKey], - ) { - if (!this.sanityCheck()) return; - - this.logger.debug( - `[Instance ID ${this.instanceSettings.hostId}] Publishing command "${commandKey}"`, - payload, - ); - - await this.publisher.publishCommand({ command: commandKey, payload }); - } - - // ---------------------------------- - // workers status - // ---------------------------------- - - async getWorkerStatus(id?: string) { - if (!this.sanityCheck()) return; - - const command = 'get-worker-status'; - - this.logger.debug(`Sending "${command}" to command channel`); - - await this.publisher.publishCommand({ - command, - targets: id ? [id] : undefined, - }); - } - - // ---------------------------------- - // activations - // ---------------------------------- - - /** - * Whether this instance may add webhooks to the `webhook_entity` table. - */ - shouldAddWebhooks(activationMode: WorkflowActivateMode) { - // Always try to populate the webhook entity table as well as register the webhooks - // to prevent issues with users upgrading from a version < 1.15, where the webhook entity - // was cleared on shutdown to anything past 1.28.0, where we stopped populating it on init, - // causing all webhooks to break - if (activationMode === 'init') return true; - - if (activationMode === 'leadershipChange') return false; - - return this.instanceSettings.isLeader; // 'update' or 'activate' - } - - /** - * Whether this instance may add triggers and pollers to memory. - * - * In both single- and multi-main setup, only the leader is allowed to manage - * triggers and pollers in memory, to ensure they are not duplicated. - */ - shouldAddTriggersAndPollers() { - return this.instanceSettings.isLeader; - } } diff --git a/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts b/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts index d9228bcb0d..3f8972ad9a 100644 --- a/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts @@ -39,7 +39,7 @@ let testWebhooks: TestWebhooks; describe('TestWebhooks', () => { beforeAll(() => { - testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock()); + testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock(), mock()); jest.useFakeTimers(); }); diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index 21511d4843..bf2fa6c9d8 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -16,6 +16,7 @@ import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error'; import type { IWorkflowDb } from '@/interfaces'; import { NodeTypes } from '@/node-types'; import { Push } from '@/push'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { OrchestrationService } from '@/services/orchestration.service'; import { removeTrailingSlash } from '@/utils'; import type { TestWebhookRegistration } from '@/webhooks/test-webhook-registrations.service'; @@ -41,6 +42,7 @@ export class TestWebhooks implements IWebhookManager { private readonly nodeTypes: NodeTypes, private readonly registrations: TestWebhookRegistrationsService, private readonly orchestrationService: OrchestrationService, + private readonly publisher: Publisher, ) {} private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {}; @@ -156,8 +158,10 @@ export class TestWebhooks implements IWebhookManager { pushRef && !this.push.getBackend().hasPushRef(pushRef) ) { - const payload = { webhookKey: key, workflowEntity, pushRef }; - void this.orchestrationService.publish('clear-test-webhooks', payload); + void this.publisher.publishCommand({ + command: 'clear-test-webhooks', + payload: { webhookKey: key, workflowEntity, pushRef }, + }); return; } diff --git a/packages/cli/test/integration/active-workflow-manager.test.ts b/packages/cli/test/integration/active-workflow-manager.test.ts index d5d471ba60..8ea790ade7 100644 --- a/packages/cli/test/integration/active-workflow-manager.test.ts +++ b/packages/cli/test/integration/active-workflow-manager.test.ts @@ -1,4 +1,5 @@ import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow'; import { Container } from 'typedi'; @@ -278,3 +279,72 @@ describe('addWebhooks()', () => { expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1); }); }); + +describe('shouldAddWebhooks', () => { + describe('if leader', () => { + const activeWorkflowManager = new ActiveWorkflowManager( + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock({ isLeader: true, isFollower: false }), + mock(), + ); + + test('should return `true` for `init`', () => { + // ensure webhooks are populated on init: https://github.com/n8n-io/n8n/pull/8830 + const result = activeWorkflowManager.shouldAddWebhooks('init'); + expect(result).toBe(true); + }); + + test('should return `false` for `leadershipChange`', () => { + const result = activeWorkflowManager.shouldAddWebhooks('leadershipChange'); + expect(result).toBe(false); + }); + + test('should return `true` for `update` or `activate`', () => { + const modes = ['update', 'activate'] as WorkflowActivateMode[]; + for (const mode of modes) { + const result = activeWorkflowManager.shouldAddWebhooks(mode); + expect(result).toBe(true); + } + }); + }); + + describe('if follower', () => { + const activeWorkflowManager = new ActiveWorkflowManager( + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock({ isLeader: false, isFollower: true }), + mock(), + ); + + test('should return `false` for `update` or `activate`', () => { + const modes = ['update', 'activate'] as WorkflowActivateMode[]; + for (const mode of modes) { + const result = activeWorkflowManager.shouldAddWebhooks(mode); + expect(result).toBe(false); + } + }); + }); +}); diff --git a/packages/cli/test/integration/collaboration/collaboration.service.test.ts b/packages/cli/test/integration/collaboration/collaboration.service.test.ts index a90424de87..df5f901f28 100644 --- a/packages/cli/test/integration/collaboration/collaboration.service.test.ts +++ b/packages/cli/test/integration/collaboration/collaboration.service.test.ts @@ -16,7 +16,7 @@ import { createWorkflow, shareWorkflowWithUsers } from '@test-integration/db/wor import * as testDb from '@test-integration/test-db'; describe('CollaborationService', () => { - mockInstance(Push, new Push(mock())); + mockInstance(Push, new Push(mock(), mock())); let pushService: Push; let collaborationService: CollaborationService; let owner: User; diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index 9b12cc53d5..c2b6a7f23c 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -22,6 +22,7 @@ import type { MessageEventBusDestinationSentry } from '@/eventbus/message-event- import type { MessageEventBusDestinationSyslog } from '@/eventbus/message-event-bus-destination/message-event-bus-destination-syslog.ee'; import type { MessageEventBusDestinationWebhook } from '@/eventbus/message-event-bus-destination/message-event-bus-destination-webhook.ee'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { createUser } from './shared/db/users'; import type { SuperAgentTest } from './shared/types'; @@ -34,6 +35,8 @@ const mockedAxios = axios as jest.Mocked; jest.mock('syslog-client'); const mockedSyslog = syslog as jest.Mocked; +mockInstance(Publisher); + let owner: User; let authOwnerAgent: SuperAgentTest; diff --git a/packages/cli/test/integration/external-secrets/external-secrets.api.test.ts b/packages/cli/test/integration/external-secrets/external-secrets.api.test.ts index b3560b9262..3418576be1 100644 --- a/packages/cli/test/integration/external-secrets/external-secrets.api.test.ts +++ b/packages/cli/test/integration/external-secrets/external-secrets.api.test.ts @@ -63,6 +63,7 @@ const resetManager = async () => { mockProvidersInstance, Container.get(Cipher), eventService, + mock(), ), ); diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index 4d4a207f94..78de2c1b25 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -32,7 +32,6 @@ export { setupTestServer } from './test-server'; export async function initActiveWorkflowManager() { mockInstance(OrchestrationService, { isMultiMainSetupEnabled: false, - shouldAddWebhooks: jest.fn().mockReturnValue(true), }); mockInstance(Push);