From f35d4fcbd81022866cc9dca318944e862df421f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 22 Jan 2024 11:16:29 +0100 Subject: [PATCH] refactor(core): Simplify `OrchestrationService` (no-changelog) (#8364) --- packages/cli/src/ActiveWorkflowRunner.ts | 56 ++++---- .../ExternalSecretsManager.ee.ts | 4 +- packages/cli/src/License.ts | 15 ++- packages/cli/src/Server.ts | 7 +- packages/cli/src/TestWebhooks.ts | 8 +- packages/cli/src/commands/start.ts | 88 +++++++------ .../cli/src/controllers/debug.controller.ts | 10 +- .../controllers/orchestration.controller.ts | 10 +- .../MessageEventBus/MessageEventBus.ts | 6 +- packages/cli/src/push/abstract.push.ts | 8 +- packages/cli/src/push/sse.push.ts | 6 +- packages/cli/src/push/websocket.push.ts | 6 +- .../services/orchestration.base.service.ts | 55 -------- .../cli/src/services/orchestration.service.ts | 121 ++++++++++++++++++ .../orchestration/main/MultiMainSetup.ee.ts | 89 +++++-------- .../orchestration/main/SingleMainSetup.ts | 60 --------- .../main/handleCommandMessageMain.ts | 4 +- .../webhook/orchestration.webhook.service.ts | 9 +- .../worker/orchestration.worker.service.ts | 9 +- packages/cli/src/services/pruning.service.ts | 6 +- .../cli/src/workflows/workflow.service.ts | 10 +- .../integration/ActiveWorkflowRunner.test.ts | 18 +-- .../integration/commands/worker.cmd.test.ts | 4 +- .../test/integration/debug.controller.test.ts | 5 +- .../test/integration/shared/utils/index.ts | 4 +- .../workflows/workflow.service.test.ts | 12 +- packages/cli/test/unit/License.test.ts | 4 +- .../services/orchestration.service.test.ts | 4 +- 28 files changed, 323 insertions(+), 315 deletions(-) delete mode 100644 packages/cli/src/services/orchestration.base.service.ts create mode 100644 packages/cli/src/services/orchestration.service.ts delete mode 100644 packages/cli/src/services/orchestration/main/SingleMainSetup.ts diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index a2e082d4b3..70b0272227 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -47,7 +47,7 @@ import { ExternalHooks } from '@/ExternalHooks'; import { WebhookService } from './services/webhook.service'; import { Logger } from './Logger'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { ActivationErrorsService } from '@/ActivationErrors.service'; import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; @@ -72,7 +72,7 @@ export class ActiveWorkflowRunner { private readonly nodeTypes: NodeTypes, private readonly webhookService: WebhookService, private readonly workflowRepository: WorkflowRepository, - private readonly multiMainSetup: MultiMainSetup, + private readonly orchestrationService: OrchestrationService, private readonly activationErrorsService: ActivationErrorsService, private readonly executionService: ExecutionService, private readonly workflowStaticDataService: WorkflowStaticDataService, @@ -80,7 +80,7 @@ export class ActiveWorkflowRunner { ) {} async init() { - await this.multiMainSetup.init(); + await this.orchestrationService.init(); await this.addActiveWorkflows('init'); @@ -470,25 +470,23 @@ export class ActiveWorkflowRunner { if (dbWorkflows.length === 0) return; - this.logger.info(' ================================'); - this.logger.info(' Start Active Workflows:'); - this.logger.info(' ================================'); + if (this.orchestrationService.isLeader) { + this.logger.info(' ================================'); + this.logger.info(' Start Active Workflows:'); + this.logger.info(' ================================'); + } for (const dbWorkflow of dbWorkflows) { - this.logger.info(` - ${dbWorkflow.display()}`); - this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, { - workflowName: dbWorkflow.name, - workflowId: dbWorkflow.id, - }); - try { - await this.add(dbWorkflow.id, activationMode, dbWorkflow); + const wasActivated = await this.add(dbWorkflow.id, activationMode, dbWorkflow); - this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, { - workflowName: dbWorkflow.name, - workflowId: dbWorkflow.id, - }); - this.logger.info(' => Started'); + if (wasActivated) { + this.logger.verbose(`Successfully started workflow ${dbWorkflow.display()}`, { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }); + this.logger.info(' => Started'); + } } catch (error) { ErrorReporter.error(error); this.logger.info( @@ -571,16 +569,18 @@ export class ActiveWorkflowRunner { * again, and the new leader should take over the triggers and pollers that stopped * running when the former leader became unresponsive. */ - if (this.multiMainSetup.isEnabled) { + if (this.orchestrationService.isMultiMainSetupEnabled) { if (activationMode !== 'leadershipChange') { - shouldAddWebhooks = this.multiMainSetup.isLeader; - shouldAddTriggersAndPollers = this.multiMainSetup.isLeader; + shouldAddWebhooks = this.orchestrationService.isLeader; + shouldAddTriggersAndPollers = this.orchestrationService.isLeader; } else { shouldAddWebhooks = false; - shouldAddTriggersAndPollers = this.multiMainSetup.isLeader; + shouldAddTriggersAndPollers = this.orchestrationService.isLeader; } } + const shouldActivate = shouldAddWebhooks || shouldAddTriggersAndPollers; + try { const dbWorkflow = existingWorkflow ?? (await this.workflowRepository.findById(workflowId)); @@ -588,6 +588,14 @@ export class ActiveWorkflowRunner { throw new WorkflowActivationError(`Failed to find workflow with ID "${workflowId}"`); } + if (shouldActivate) { + this.logger.info(` - ${dbWorkflow.display()}`); + this.logger.debug(`Initializing active workflow ${dbWorkflow.display()} (startup)`, { + workflowName: dbWorkflow.name, + workflowId: dbWorkflow.id, + }); + } + workflow = new Workflow({ id: dbWorkflow.id, name: dbWorkflow.name, @@ -644,6 +652,8 @@ export class ActiveWorkflowRunner { // If for example webhooks get created it sometimes has to save the // id of them in the static data. So make sure that data gets persisted. await this.workflowStaticDataService.saveStaticData(workflow); + + return shouldActivate; } /** @@ -804,7 +814,7 @@ export class ActiveWorkflowRunner { ); if (workflow.getTriggerNodes().length !== 0 || workflow.getPollNodes().length !== 0) { - this.logger.debug(`Adding triggers and pollers for workflow "${dbWorkflow.display()}"`); + this.logger.debug(`Adding triggers and pollers for workflow ${dbWorkflow.display()}`); await this.activeWorkflows.add( workflow.id, diff --git a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts index 8688230234..0fce34e62e 100644 --- a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts +++ b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts @@ -16,7 +16,7 @@ import { License } from '@/License'; import { InternalHooks } from '@/InternalHooks'; import { updateIntervalTime } from './externalSecretsHelper.ee'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; -import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; +import { OrchestrationService } from '@/services/orchestration.service'; @Service() export class ExternalSecretsManager { @@ -79,7 +79,7 @@ export class ExternalSecretsManager { } async broadcastReloadExternalSecretsProviders() { - await Container.get(SingleMainSetup).broadcastReloadExternalSecretsProviders(); + await Container.get(OrchestrationService).publish('reloadExternalSecretsProviders'); } private decryptSecretsSettings(value: string): ExternalSecretsSettings { diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index 541be2789d..287edbea37 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -16,7 +16,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; import { RedisService } from './services/redis.service'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { OnShutdown } from '@/decorators/OnShutdown'; type FeatureReturnType = Partial< @@ -36,7 +36,7 @@ export class License { constructor( private readonly logger: Logger, private readonly instanceSettings: InstanceSettings, - private readonly multiMainSetup: MultiMainSetup, + private readonly orchestrationService: OrchestrationService, private readonly settingsRepository: SettingsRepository, private readonly workflowRepository: WorkflowRepository, ) {} @@ -51,8 +51,6 @@ export class License { return; } - await this.multiMainSetup.init(); - const isMainInstance = instanceType === 'main'; const server = config.getEnv('license.serverUrl'); const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled'); @@ -123,16 +121,19 @@ export class License { | boolean | undefined; - this.multiMainSetup.setLicensed(isMultiMainLicensed ?? false); + this.orchestrationService.setMultiMainSetupLicensed(isMultiMainLicensed ?? false); - if (this.multiMainSetup.isEnabled && this.multiMainSetup.isFollower) { + if ( + this.orchestrationService.isMultiMainSetupEnabled && + this.orchestrationService.isFollower + ) { this.logger.debug( '[Multi-main setup] Instance is follower, skipping sending of "reloadLicense" command...', ); return; } - if (this.multiMainSetup.isEnabled && !isMultiMainLicensed) { + if (this.orchestrationService.isMultiMainSetupEnabled && !isMultiMainLicensed) { this.logger.debug( '[Multi-main setup] License changed with no support for multi-main setup - no new followers will be allowed to init. To restore multi-main setup, please upgrade to a license that supporst this feature.', ); diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 43c7e60a8a..b23225e4dc 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -101,7 +101,7 @@ import { CollaborationService } from './collaboration/collaboration.service'; import { RoleController } from './controllers/role.controller'; import { BadRequestError } from './errors/response-errors/bad-request.error'; import { NotFoundError } from './errors/response-errors/not-found.error'; -import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowSharingService } from './workflows/workflowSharing.service'; const exec = promisify(callbackExec); @@ -252,7 +252,10 @@ export class Server extends AbstractServer { ExecutionsController, ]; - if (process.env.NODE_ENV !== 'production' && Container.get(MultiMainSetup).isEnabled) { + if ( + process.env.NODE_ENV !== 'production' && + Container.get(OrchestrationService).isMultiMainSetupEnabled + ) { const { DebugController } = await import('@/controllers/debug.controller'); controllers.push(DebugController); } diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index bf3efaea13..fef109cb92 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -25,7 +25,7 @@ import * as NodeExecuteFunctions from 'n8n-core'; import { removeTrailingSlash } from './utils'; import type { TestWebhookRegistration } from '@/services/test-webhook-registrations.service'; import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service'; -import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; @Service() @@ -34,7 +34,7 @@ export class TestWebhooks implements IWebhookManager { private readonly push: Push, private readonly nodeTypes: NodeTypes, private readonly registrations: TestWebhookRegistrationsService, - private readonly multiMainSetup: MultiMainSetup, + private readonly orchestrationService: OrchestrationService, ) {} private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {}; @@ -144,12 +144,12 @@ export class TestWebhooks implements IWebhookManager { * the handler process commands the creator process to clear its test webhooks. */ if ( - this.multiMainSetup.isEnabled && + this.orchestrationService.isMultiMainSetupEnabled && sessionId && !this.push.getBackend().hasSessionId(sessionId) ) { const payload = { webhookKey: key, workflowEntity, sessionId }; - void this.multiMainSetup.publish('clear-test-webhooks', payload); + void this.orchestrationService.publish('clear-test-webhooks', payload); return; } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index f32d3dabbd..906c2b6d7f 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -22,10 +22,9 @@ import { BaseCommand } from './BaseCommand'; import { InternalHooks } from '@/InternalHooks'; import { License } from '@/License'; import type { IConfig } from '@oclif/config'; -import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; +import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; import { PruningService } from '@/services/pruning.service'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { UrlService } from '@/services/url.service'; import { SettingsRepository } from '@db/repositories/settings.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository'; @@ -104,10 +103,10 @@ export class Start extends BaseCommand { await this.externalHooks?.run('n8n.stop', []); - if (Container.get(MultiMainSetup).isEnabled) { + if (Container.get(OrchestrationService).isMultiMainSetupEnabled) { await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); - await Container.get(MultiMainSetup).shutdown(); + await Container.get(OrchestrationService).shutdown(); } await Container.get(InternalHooks).onN8nStop(); @@ -216,43 +215,48 @@ export class Start extends BaseCommand { async initOrchestration() { if (config.getEnv('executions.mode') !== 'queue') return; - // queue mode in single-main scenario - - if (!config.getEnv('multiMainSetup.enabled')) { - await Container.get(SingleMainSetup).init(); - await Container.get(OrchestrationHandlerMainService).init(); - return; - } - - // queue mode in multi-main scenario - - if (!Container.get(License).isMultipleMainInstancesLicensed()) { + if ( + config.getEnv('multiMainSetup.enabled') && + !Container.get(License).isMultipleMainInstancesLicensed() + ) { throw new FeatureNotLicensedError(LICENSE_FEATURES.MULTIPLE_MAIN_INSTANCES); } + const orchestrationService = Container.get(OrchestrationService); + + await orchestrationService.init(); + await Container.get(OrchestrationHandlerMainService).init(); - const multiMainSetup = Container.get(MultiMainSetup); + if (!orchestrationService.isMultiMainSetupEnabled) return; - await multiMainSetup.init(); + orchestrationService.multiMainSetup + .addListener('leadershipChange', async () => { + if (orchestrationService.isLeader) { + this.logger.debug('[Leadership change] Clearing all activation errors...'); - multiMainSetup.on('leadershipChange', async () => { - if (multiMainSetup.isLeader) { - this.logger.debug('[Leadership change] Clearing all activation errors...'); + await this.activeWorkflowRunner.clearAllActivationErrors(); - await this.activeWorkflowRunner.clearAllActivationErrors(); + this.logger.debug( + '[Leadership change] Adding all trigger- and poller-based workflows...', + ); - this.logger.debug('[Leadership change] Adding all trigger- and poller-based workflows...'); + await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); + } else { + this.logger.debug( + '[Leadership change] Removing all trigger- and poller-based workflows...', + ); - await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows(); - } else { + await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); + } + }) + .addListener('leadershipVacant', async () => { this.logger.debug( - '[Leadership change] Removing all trigger- and poller-based workflows...', + '[Leadership vacant] Removing all trigger- and poller-based workflows...', ); await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows(); - } - }); + }); } async run() { @@ -361,27 +365,27 @@ export class Start extends BaseCommand { async initPruning() { this.pruningService = Container.get(PruningService); - if (this.pruningService.isPruningEnabled()) { - this.pruningService.startPruning(); - } + this.pruningService.startPruning(); - if (config.getEnv('executions.mode') === 'queue' && config.getEnv('multiMainSetup.enabled')) { - const multiMainSetup = Container.get(MultiMainSetup); + if (config.getEnv('executions.mode') !== 'queue') return; - await multiMainSetup.init(); + const orchestrationService = Container.get(OrchestrationService); - multiMainSetup.on('leadershipChange', async () => { - if (multiMainSetup.isLeader) { - if (this.pruningService.isPruningEnabled()) { - this.pruningService.startPruning(); - } + await orchestrationService.init(); + + if (!orchestrationService.isMultiMainSetupEnabled) return; + + orchestrationService.multiMainSetup + .addListener('leadershipChange', async () => { + if (orchestrationService.isLeader) { + this.pruningService.startPruning(); } else { - if (this.pruningService.isPruningEnabled()) { - this.pruningService.stopPruning(); - } + this.pruningService.stopPruning(); } + }) + .addListener('leadershipVacant', () => { + this.pruningService.stopPruning(); }); - } } async catch(error: Error) { diff --git a/packages/cli/src/controllers/debug.controller.ts b/packages/cli/src/controllers/debug.controller.ts index 10c45a0618..b74ccd5840 100644 --- a/packages/cli/src/controllers/debug.controller.ts +++ b/packages/cli/src/controllers/debug.controller.ts @@ -1,19 +1,19 @@ import { Get, RestController } from '@/decorators'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; @RestController('/debug') export class DebugController { constructor( - private readonly multiMainSetup: MultiMainSetup, + private readonly orchestrationService: OrchestrationService, private readonly activeWorkflowRunner: ActiveWorkflowRunner, private readonly workflowRepository: WorkflowRepository, ) {} @Get('/multi-main-setup') async getMultiMainSetupDetails() { - const leaderKey = await this.multiMainSetup.fetchLeaderKey(); + const leaderKey = await this.orchestrationService.multiMainSetup.fetchLeaderKey(); const triggersAndPollers = await this.workflowRepository.findIn( this.activeWorkflowRunner.allActiveInMemory(), @@ -24,9 +24,9 @@ export class DebugController { const activationErrors = await this.activeWorkflowRunner.getAllWorkflowActivationErrors(); return { - instanceId: this.multiMainSetup.instanceId, + instanceId: this.orchestrationService.instanceId, leaderKey, - isLeader: this.multiMainSetup.isLeader, + isLeader: this.orchestrationService.isLeader, activeWorkflows: { webhooks, // webhook-based active workflows triggersAndPollers, // poller- and trigger-based active workflows diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index 77187fe84b..fb044014be 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,13 +1,13 @@ import { Authorized, Post, RestController, RequireGlobalScope } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; -import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; +import { OrchestrationService } from '@/services/orchestration.service'; import { License } from '@/License'; @Authorized() @RestController('/orchestration') export class OrchestrationController { constructor( - private readonly singleMainSetup: SingleMainSetup, + private readonly orchestrationService: OrchestrationService, private readonly licenseService: License, ) {} @@ -20,20 +20,20 @@ export class OrchestrationController { async getWorkersStatus(req: OrchestrationRequest.Get) { if (!this.licenseService.isWorkerViewLicensed()) return; const id = req.params.id; - return await this.singleMainSetup.getWorkerStatus(id); + return await this.orchestrationService.getWorkerStatus(id); } @RequireGlobalScope('orchestration:read') @Post('/worker/status') async getWorkersStatusAll() { if (!this.licenseService.isWorkerViewLicensed()) return; - return await this.singleMainSetup.getWorkerStatus(); + return await this.orchestrationService.getWorkerStatus(); } @RequireGlobalScope('orchestration:list') @Post('/worker/ids') async getWorkerIdsAll() { if (!this.licenseService.isWorkerViewLicensed()) return; - return await this.singleMainSetup.getWorkerIds(); + return await this.orchestrationService.getWorkerIds(); } } diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index e578aed95f..5ceef85f6e 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -32,7 +32,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; +import { OrchestrationService } from '@/services/orchestration.service'; import { Logger } from '@/Logger'; import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; @@ -207,7 +207,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(OrchestrationService).publish('restartEventBus'); } return destination; } @@ -233,7 +233,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await Container.get(SingleMainSetup).broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(OrchestrationService).publish('restartEventBus'); } return result; } diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 655e5486b5..c74554f160 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -3,7 +3,7 @@ import { assert, jsonStringify } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; import type { Logger } from '@/Logger'; import type { User } from '@db/entities/User'; -import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import type { OrchestrationService } from '@/services/orchestration.service'; /** * Abstract class for two-way push communication. @@ -21,7 +21,7 @@ export abstract class AbstractPush extends EventEmitter { constructor( protected readonly logger: Logger, - private readonly multiMainSetup: MultiMainSetup, + private readonly orchestrationService: OrchestrationService, ) { super(); } @@ -84,10 +84,10 @@ export abstract class AbstractPush extends EventEmitter { * the webhook. If so, the handler process commands the creator process to * relay the former's execution lifecyle events to the creator's frontend. */ - if (this.multiMainSetup.isEnabled && !this.hasSessionId(sessionId)) { + if (this.orchestrationService.isMultiMainSetupEnabled && !this.hasSessionId(sessionId)) { const payload = { type, args: data, sessionId }; - void this.multiMainSetup.publish('relay-execution-lifecycle-event', payload); + void this.orchestrationService.publish('relay-execution-lifecycle-event', payload); return; } diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index 17f4c7ad9e..6c2432917c 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -4,7 +4,7 @@ import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; import type { User } from '@db/entities/User'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; type Connection = { req: PushRequest; res: PushResponse }; @@ -14,8 +14,8 @@ export class SSEPush extends AbstractPush { readonly connections: Record = {}; - constructor(logger: Logger, multiMainSetup: MultiMainSetup) { - super(logger, multiMainSetup); + constructor(logger: Logger, orchestrationService: OrchestrationService) { + super(logger, orchestrationService); this.channel.on('disconnect', (channel, { req }) => { this.remove(req?.query?.sessionId); diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 6f47b1fb62..cda286274e 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -3,7 +3,7 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { User } from '@db/entities/User'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -11,8 +11,8 @@ function heartbeat(this: WebSocket) { @Service() export class WebSocketPush extends AbstractPush { - constructor(logger: Logger, multiMainSetup: MultiMainSetup) { - super(logger, multiMainSetup); + constructor(logger: Logger, orchestrationService: OrchestrationService) { + super(logger, orchestrationService); // Ping all connected clients every 60 seconds setInterval(() => this.pingAll(), 60 * 1000); diff --git a/packages/cli/src/services/orchestration.base.service.ts b/packages/cli/src/services/orchestration.base.service.ts deleted file mode 100644 index a2540a4a5a..0000000000 --- a/packages/cli/src/services/orchestration.base.service.ts +++ /dev/null @@ -1,55 +0,0 @@ -import Container from 'typedi'; -import { RedisService } from './redis.service'; -import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; -import config from '@/config'; -import { EventEmitter } from 'node:events'; - -export abstract class OrchestrationService extends EventEmitter { - protected isInitialized = false; - - protected queueModeId: string; - - redisPublisher: RedisServicePubSubPublisher; - - readonly redisService: RedisService; - - get isQueueMode(): boolean { - return config.get('executions.mode') === 'queue'; - } - - get isMainInstance(): boolean { - return config.get('generic.instanceType') === 'main'; - } - - get isWebhookInstance(): boolean { - return config.get('generic.instanceType') === 'webhook'; - } - - get isWorkerInstance(): boolean { - return config.get('generic.instanceType') === 'worker'; - } - - constructor() { - super(); - this.redisService = Container.get(RedisService); - this.queueModeId = config.getEnv('redis.queueModeId'); - } - - sanityCheck(): boolean { - return this.isInitialized && this.isQueueMode; - } - - async init() { - await this.initPublisher(); - this.isInitialized = true; - } - - async shutdown() { - await this.redisPublisher?.destroy(); - this.isInitialized = false; - } - - protected async initPublisher() { - this.redisPublisher = await this.redisService.getPubSubPublisher(); - } -} diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts new file mode 100644 index 0000000000..af56955805 --- /dev/null +++ b/packages/cli/src/services/orchestration.service.ts @@ -0,0 +1,121 @@ +import { Service } from 'typedi'; +import { Logger } from '@/Logger'; +import config from '@/config'; +import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; +import type { RedisServiceBaseCommand, RedisServiceCommand } from './redis/RedisServiceCommands'; + +import { RedisService } from './redis.service'; +import { MultiMainSetup } from './orchestration/main/MultiMainSetup.ee'; + +@Service() +export class OrchestrationService { + constructor( + private readonly logger: Logger, + private readonly redisService: RedisService, + readonly multiMainSetup: MultiMainSetup, + ) {} + + protected isInitialized = false; + + private isMultiMainSetupLicensed = false; + + setMultiMainSetupLicensed(newState: boolean) { + this.isMultiMainSetupLicensed = newState; + } + + get isMultiMainSetupEnabled() { + return ( + config.getEnv('executions.mode') === 'queue' && + config.getEnv('multiMainSetup.enabled') && + config.getEnv('generic.instanceType') === 'main' && + this.isMultiMainSetupLicensed + ); + } + + redisPublisher: RedisServicePubSubPublisher; + + get instanceId() { + return config.getEnv('redis.queueModeId'); + } + + get isLeader() { + return config.getEnv('multiMainSetup.instanceType') === 'leader'; + } + + get isFollower() { + return config.getEnv('multiMainSetup.instanceType') !== 'leader'; + } + + sanityCheck() { + return this.isInitialized && config.get('executions.mode') === 'queue'; + } + + async init() { + if (this.isInitialized) return; + + if (config.get('executions.mode') === 'queue') await this.initPublisher(); + + if (this.isMultiMainSetupEnabled) { + await this.multiMainSetup.init(); + } else { + config.set('multiMainSetup.instanceType', 'leader'); + } + + this.isInitialized = true; + } + + async shutdown() { + if (!this.isInitialized) return; + + if (this.isMultiMainSetupEnabled) await this.multiMainSetup.shutdown(); + + await this.redisPublisher.destroy(); + + this.isInitialized = false; + } + + // ---------------------------------- + // pubsub + // ---------------------------------- + + protected async initPublisher() { + this.redisPublisher = await this.redisService.getPubSubPublisher(); + } + + async publish(command: RedisServiceCommand, data?: unknown) { + if (!this.sanityCheck()) return; + + const payload = data as RedisServiceBaseCommand['payload']; + + this.logger.debug(`[Instance ID ${this.instanceId}] Publishing command "${command}"`, payload); + + await this.redisPublisher.publishToCommandChannel({ command, payload }); + } + + // ---------------------------------- + // workers status + // ---------------------------------- + + async getWorkerStatus(id?: string) { + if (!this.sanityCheck()) return; + + const command = 'getStatus'; + + this.logger.debug(`Sending "${command}" to command channel`); + + await this.redisPublisher.publishToCommandChannel({ + command, + targets: id ? [id] : undefined, + }); + } + + async getWorkerIds() { + if (!this.sanityCheck()) return; + + const command = 'getId'; + + this.logger.debug(`Sending "${command}" to command channel`); + + await this.redisPublisher.publishToCommandChannel({ command }); + } +} diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index b5eeb69074..070834ac7b 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -1,43 +1,23 @@ +import { EventEmitter } from 'node:events'; import config from '@/config'; import { Service } from 'typedi'; import { TIME } from '@/constants'; -import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; -import type { - RedisServiceBaseCommand, - RedisServiceCommand, -} from '@/services/redis/RedisServiceCommands'; +import { Logger } from '@/Logger'; +import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; @Service() -export class MultiMainSetup extends SingleMainSetup { - private id = this.queueModeId; - - private isLicensed = false; - - get isEnabled() { - return ( - config.getEnv('executions.mode') === 'queue' && - config.getEnv('multiMainSetup.enabled') && - config.getEnv('generic.instanceType') === 'main' && - this.isLicensed - ); - } - - get isLeader() { - return config.getEnv('multiMainSetup.instanceType') === 'leader'; - } - - get isFollower() { - return !this.isLeader; +export class MultiMainSetup extends EventEmitter { + constructor( + private readonly logger: Logger, + private readonly redisPublisher: RedisServicePubSubPublisher, + ) { + super(); } get instanceId() { - return this.id; - } - - setLicensed(newState: boolean) { - this.isLicensed = newState; + return config.getEnv('redis.queueModeId'); } private readonly leaderKey = getRedisPrefix() + ':main_instance_leader'; @@ -47,12 +27,6 @@ export class MultiMainSetup extends SingleMainSetup { private leaderCheckInterval: NodeJS.Timer | undefined; async init() { - if (!this.isEnabled || this.isInitialized) return; - - await this.initPublisher(); - - this.isInitialized = true; - await this.tryBecomeLeader(); // prevent initial wait this.leaderCheckInterval = setInterval( @@ -64,35 +38,35 @@ export class MultiMainSetup extends SingleMainSetup { } async shutdown() { - if (!this.isInitialized) return; - clearInterval(this.leaderCheckInterval); - if (this.isLeader) await this.redisPublisher.clear(this.leaderKey); + const isLeader = config.getEnv('multiMainSetup.instanceType') === 'leader'; + + if (isLeader) await this.redisPublisher.clear(this.leaderKey); } private async checkLeader() { const leaderId = await this.redisPublisher.get(this.leaderKey); - if (leaderId === this.id) { - this.logger.debug(`[Instance ID ${this.id}] Leader is this instance`); + if (leaderId === this.instanceId) { + this.logger.debug(`[Instance ID ${this.instanceId}] Leader is this instance`); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); return; } - if (leaderId && leaderId !== this.id) { - this.logger.debug(`[Instance ID ${this.id}] Leader is other instance "${leaderId}"`); + if (leaderId && leaderId !== this.instanceId) { + this.logger.debug(`[Instance ID ${this.instanceId}] Leader is other instance "${leaderId}"`); if (config.getEnv('multiMainSetup.instanceType') === 'leader') { - this.emit('leadershipChange', leaderId); // stop triggers, pruning, etc. + config.set('multiMainSetup.instanceType', 'follower'); + + this.emit('leadershipChange'); // stop triggers, pollers, pruning EventReporter.report('[Multi-main setup] Leader failed to renew leader key', { level: 'info', }); - - config.set('multiMainSetup.instanceType', 'follower'); } return; @@ -100,42 +74,37 @@ export class MultiMainSetup extends SingleMainSetup { if (!leaderId) { this.logger.debug( - `[Instance ID ${this.id}] Leadership vacant, attempting to become leader...`, + `[Instance ID ${this.instanceId}] Leadership vacant, attempting to become leader...`, ); config.set('multiMainSetup.instanceType', 'follower'); + this.emit('leadershipVacant'); // stop triggers, pollers, pruning + await this.tryBecomeLeader(); } } private async tryBecomeLeader() { // this can only succeed if leadership is currently vacant - const keySetSuccessfully = await this.redisPublisher.setIfNotExists(this.leaderKey, this.id); + const keySetSuccessfully = await this.redisPublisher.setIfNotExists( + this.leaderKey, + this.instanceId, + ); if (keySetSuccessfully) { - this.logger.debug(`[Instance ID ${this.id}] Leader is now this instance`); + this.logger.debug(`[Instance ID ${this.instanceId}] Leader is now this instance`); config.set('multiMainSetup.instanceType', 'leader'); await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); - this.emit('leadershipChange', this.id); + this.emit('leadershipChange'); // start triggers, pollers, pruning } else { config.set('multiMainSetup.instanceType', 'follower'); } } - async publish(command: RedisServiceCommand, data: unknown) { - if (!this.sanityCheck()) return; - - const payload = data as RedisServiceBaseCommand['payload']; - - this.logger.debug(`[Instance ID ${this.id}] Publishing command "${command}"`, payload); - - await this.redisPublisher.publishToCommandChannel({ command, payload }); - } - async fetchLeaderKey() { return await this.redisPublisher.get(this.leaderKey); } diff --git a/packages/cli/src/services/orchestration/main/SingleMainSetup.ts b/packages/cli/src/services/orchestration/main/SingleMainSetup.ts deleted file mode 100644 index 10b020b7d8..0000000000 --- a/packages/cli/src/services/orchestration/main/SingleMainSetup.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { Logger } from '@/Logger'; -import { Service } from 'typedi'; -import { OrchestrationService } from '@/services/orchestration.base.service'; - -/** - * For use in main instance, in single main instance scenario. - */ -@Service() -export class SingleMainSetup extends OrchestrationService { - constructor(protected readonly logger: Logger) { - super(); - } - - sanityCheck() { - return this.isInitialized && this.isQueueMode && this.isMainInstance; - } - - async getWorkerStatus(id?: string) { - if (!this.sanityCheck()) return; - - const command = 'getStatus'; - - this.logger.debug(`Sending "${command}" to command channel`); - - await this.redisPublisher.publishToCommandChannel({ - command, - targets: id ? [id] : undefined, - }); - } - - async getWorkerIds() { - if (!this.sanityCheck()) return; - - const command = 'getId'; - - this.logger.debug(`Sending "${command}" to command channel`); - - await this.redisPublisher.publishToCommandChannel({ command }); - } - - async broadcastRestartEventbusAfterDestinationUpdate() { - if (!this.sanityCheck()) return; - - const command = 'restartEventBus'; - - this.logger.debug(`Sending "${command}" to command channel`); - - await this.redisPublisher.publishToCommandChannel({ command }); - } - - async broadcastReloadExternalSecretsProviders() { - if (!this.sanityCheck()) return; - - const command = 'reloadExternalSecretsProviders'; - - this.logger.debug(`Sending "${command}" to command channel`); - - await this.redisPublisher.publishToCommandChannel({ command }); - } -} diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index f41106ca2d..ddf2c0e7fe 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -7,7 +7,7 @@ import { License } from '@/License'; import { Logger } from '@/Logger'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { Push } from '@/push'; -import { MultiMainSetup } from './MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { TestWebhooks } from '@/TestWebhooks'; @@ -100,7 +100,7 @@ export async function handleCommandMessageMain(messageString: string) { versionId, }); - await Container.get(MultiMainSetup).publish('workflowFailedToActivate', { + await Container.get(OrchestrationService).publish('workflowFailedToActivate', { workflowId, errorMessage: error.message, }); diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts index c186a9c1d8..9e97078348 100644 --- a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts +++ b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts @@ -1,9 +1,14 @@ import { Service } from 'typedi'; -import { OrchestrationService } from '../../orchestration.base.service'; +import { OrchestrationService } from '../../orchestration.service'; +import config from '@/config'; @Service() export class OrchestrationWebhookService extends OrchestrationService { sanityCheck(): boolean { - return this.isInitialized && this.isQueueMode && this.isWebhookInstance; + return ( + this.isInitialized && + config.get('executions.mode') === 'queue' && + config.get('generic.instanceType') === 'webhook' + ); } } diff --git a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts index f044e5403b..9a00d312b2 100644 --- a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts +++ b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts @@ -1,11 +1,16 @@ import { Service } from 'typedi'; import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; -import { OrchestrationService } from '../../orchestration.base.service'; +import { OrchestrationService } from '../../orchestration.service'; +import config from '@/config'; @Service() export class OrchestrationWorkerService extends OrchestrationService { sanityCheck(): boolean { - return this.isInitialized && this.isQueueMode && this.isWorkerInstance; + return ( + this.isInitialized && + config.get('executions.mode') === 'queue' && + config.get('generic.instanceType') === 'worker' + ); } async publishToEventLog(message: AbstractEventMessage) { diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index 3f8d57d6cb..3f78c220cb 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -28,7 +28,7 @@ export class PruningService { private readonly binaryDataService: BinaryDataService, ) {} - isPruningEnabled() { + private isPruningEnabled() { if ( !config.getEnv('executions.pruneData') || inTest || @@ -52,6 +52,8 @@ export class PruningService { * @important Call this method only after DB migrations have completed. */ startPruning() { + if (!this.isPruningEnabled()) return; + if (this.isShuttingDown) { this.logger.warn('[Pruning] Cannot start pruning while shutting down'); return; @@ -64,6 +66,8 @@ export class PruningService { } stopPruning() { + if (!this.isPruningEnabled()) return; + this.logger.debug('[Pruning] Removing soft-deletion and hard-deletion timers'); clearInterval(this.softDeletionInterval); diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index 186cb2d9c6..ddae49cc3e 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -22,7 +22,7 @@ import { InternalHooks } from '@/InternalHooks'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowHistoryService } from './workflowHistory/workflowHistory.service.ee'; import { Logger } from '@/Logger'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { BadRequestError } from '@/errors/response-errors/bad-request.error'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; @@ -38,7 +38,7 @@ export class WorkflowService { private readonly ownershipService: OwnershipService, private readonly tagService: TagService, private readonly workflowHistoryService: WorkflowHistoryService, - private readonly multiMainSetup: MultiMainSetup, + private readonly orchestrationService: OrchestrationService, private readonly externalHooks: ExternalHooks, private readonly activeWorkflowRunner: ActiveWorkflowRunner, ) {} @@ -227,12 +227,12 @@ export class WorkflowService { } } - await this.multiMainSetup.init(); + await this.orchestrationService.init(); const newState = updatedWorkflow.active; - if (this.multiMainSetup.isEnabled && oldState !== newState) { - await this.multiMainSetup.publish('workflowActiveStateChanged', { + if (this.orchestrationService.isMultiMainSetupEnabled && oldState !== newState) { + await this.orchestrationService.publish('workflowActiveStateChanged', { workflowId, oldState, newState, diff --git a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts index c524aa082a..4e6138fd96 100644 --- a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts @@ -17,7 +17,7 @@ import type { User } from '@db/entities/User'; import type { WebhookEntity } from '@db/entities/WebhookEntity'; import { NodeTypes } from '@/NodeTypes'; import { chooseRandomly } from './shared/random'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { mockInstance } from '../shared/mocking'; import { setSchedulerAsLoadedNode } from './shared/utils'; import * as testDb from './shared/testDb'; @@ -34,8 +34,8 @@ mockInstance(ExecutionService); mockInstance(WorkflowService); const webhookService = mockInstance(WebhookService); -const multiMainSetup = mockInstance(MultiMainSetup, { - isEnabled: false, +const orchestrationService = mockInstance(OrchestrationService, { + isMultiMainSetupEnabled: false, isLeader: false, isFollower: false, }); @@ -266,8 +266,8 @@ describe('add()', () => { const workflow = await createWorkflow({ active: true }, owner); - jest.replaceProperty(multiMainSetup, 'isEnabled', true); - jest.replaceProperty(multiMainSetup, 'isLeader', true); + jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true); + jest.replaceProperty(orchestrationService, 'isLeader', true); const addWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'addWebhooks'); const addTriggersAndPollersSpy = jest.spyOn( @@ -290,8 +290,8 @@ describe('add()', () => { test('should add triggers and pollers only', async () => { const mode = 'leadershipChange'; - jest.replaceProperty(multiMainSetup, 'isEnabled', true); - jest.replaceProperty(multiMainSetup, 'isLeader', true); + jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true); + jest.replaceProperty(orchestrationService, 'isLeader', true); const workflow = await createWorkflow({ active: true }, owner); @@ -318,8 +318,8 @@ describe('add()', () => { test('should not add webhooks, triggers or pollers', async () => { const mode = chooseRandomly(NON_LEADERSHIP_CHANGE_MODES); - jest.replaceProperty(multiMainSetup, 'isEnabled', true); - jest.replaceProperty(multiMainSetup, 'isLeader', false); + jest.replaceProperty(orchestrationService, 'isMultiMainSetupEnabled', true); + jest.replaceProperty(orchestrationService, 'isLeader', false); const workflow = await createWorkflow({ active: true }, owner); diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index a7d5c97a71..3c1f4558e1 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -16,7 +16,7 @@ import { PostHogClient } from '@/posthog'; import { RedisService } from '@/services/redis.service'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { mockInstance } from '../../shared/mocking'; @@ -38,7 +38,7 @@ beforeAll(async () => { mockInstance(RedisService); mockInstance(RedisServicePubSubPublisher); mockInstance(RedisServicePubSubSubscriber); - mockInstance(MultiMainSetup); + mockInstance(OrchestrationService); }); test('worker initializes all its components', async () => { diff --git a/packages/cli/test/integration/debug.controller.test.ts b/packages/cli/test/integration/debug.controller.test.ts index 7b5fb31413..9acd6a3993 100644 --- a/packages/cli/test/integration/debug.controller.test.ts +++ b/packages/cli/test/integration/debug.controller.test.ts @@ -7,6 +7,7 @@ import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; import { setupTestServer } from './shared/utils'; import type { SuperAgentTest } from 'supertest'; import { createOwner } from './shared/db/users'; +import { OrchestrationService } from '@/services/orchestration.service'; import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; describe('DebugController', () => { @@ -36,9 +37,9 @@ describe('DebugController', () => { activeWorkflowRunner.allActiveInMemory.mockReturnValue([workflowId]); activeWorkflowRunner.getAllWorkflowActivationErrors.mockResolvedValue(activationErrors); - jest.spyOn(MultiMainSetup.prototype, 'instanceId', 'get').mockReturnValue(instanceId); + jest.spyOn(OrchestrationService.prototype, 'instanceId', 'get').mockReturnValue(instanceId); jest.spyOn(MultiMainSetup.prototype, 'fetchLeaderKey').mockResolvedValue(leaderKey); - jest.spyOn(MultiMainSetup.prototype, 'isLeader', 'get').mockReturnValue(true); + jest.spyOn(OrchestrationService.prototype, 'isLeader', 'get').mockReturnValue(true); const response = await ownerAgent.get('/debug/multi-main-setup').expect(200); diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index 1959a4deec..711f98a296 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -16,7 +16,7 @@ import { AUTH_COOKIE_NAME } from '@/constants'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { SettingsRepository } from '@db/repositories/settings.repository'; import { mockNodeTypesData } from '../../../unit/Helpers'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { mockInstance } from '../../../shared/mocking'; import { ExecutionService } from '@/executions/execution.service'; @@ -30,7 +30,7 @@ export { setupTestServer } from './testServer'; * Initialize node types. */ export async function initActiveWorkflowRunner() { - mockInstance(MultiMainSetup); + mockInstance(OrchestrationService); mockInstance(ExecutionService); const { ActiveWorkflowRunner } = await import('@/ActiveWorkflowRunner'); diff --git a/packages/cli/test/integration/workflows/workflow.service.test.ts b/packages/cli/test/integration/workflows/workflow.service.test.ts index 34f1a970ed..056c11ca29 100644 --- a/packages/cli/test/integration/workflows/workflow.service.test.ts +++ b/packages/cli/test/integration/workflows/workflow.service.test.ts @@ -4,7 +4,7 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { Telemetry } from '@/telemetry'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; import { WorkflowService } from '@/workflows/workflow.service'; import * as testDb from '../shared/testDb'; @@ -14,13 +14,13 @@ import { createWorkflow } from '../shared/db/workflows'; let workflowService: WorkflowService; let activeWorkflowRunner: ActiveWorkflowRunner; -let multiMainSetup: MultiMainSetup; +let orchestrationService: OrchestrationService; beforeAll(async () => { await testDb.init(); activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); - multiMainSetup = mockInstance(MultiMainSetup); + orchestrationService = mockInstance(OrchestrationService); mockInstance(Telemetry); workflowService = new WorkflowService( @@ -33,7 +33,7 @@ beforeAll(async () => { mock(), mock(), mock(), - multiMainSetup, + orchestrationService, mock(), activeWorkflowRunner, ); @@ -89,7 +89,7 @@ describe('update()', () => { const owner = await createOwner(); const workflow = await createWorkflow({ active: true }, owner); - const publishSpy = jest.spyOn(multiMainSetup, 'publish'); + const publishSpy = jest.spyOn(orchestrationService, 'publish'); workflow.active = false; await workflowService.update(owner, workflow, workflow.id); @@ -109,7 +109,7 @@ describe('update()', () => { const owner = await createOwner(); const workflow = await createWorkflow({ active: true }, owner); - const publishSpy = jest.spyOn(multiMainSetup, 'publish'); + const publishSpy = jest.spyOn(orchestrationService, 'publish'); await workflowService.update(owner, workflow, workflow.id); diff --git a/packages/cli/test/unit/License.test.ts b/packages/cli/test/unit/License.test.ts index 3079695d46..354e672b0b 100644 --- a/packages/cli/test/unit/License.test.ts +++ b/packages/cli/test/unit/License.test.ts @@ -6,7 +6,7 @@ import { License } from '@/License'; import { Logger } from '@/Logger'; import { N8N_VERSION } from '@/constants'; import { mockInstance } from '../shared/mocking'; -import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; jest.mock('@n8n_io/license-sdk'); @@ -28,7 +28,7 @@ describe('License', () => { let license: License; const logger = mockInstance(Logger); const instanceSettings = mockInstance(InstanceSettings, { instanceId: MOCK_INSTANCE_ID }); - mockInstance(MultiMainSetup); + mockInstance(OrchestrationService); beforeEach(async () => { license = new License(logger, instanceSettings, mock(), mock(), mock()); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 8a71d6784e..6d6bd16822 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -1,6 +1,6 @@ import Container from 'typedi'; import config from '@/config'; -import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; +import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { eventBus } from '@/eventbus'; import { RedisService } from '@/services/redis.service'; @@ -14,7 +14,7 @@ import { Push } from '@/push'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { mockInstance } from '../../shared/mocking'; -const os = Container.get(SingleMainSetup); +const os = Container.get(OrchestrationService); const handler = Container.get(OrchestrationHandlerMainService); mockInstance(ActiveWorkflowRunner);