diff --git a/packages/@n8n/config/src/configs/logging.config.ts b/packages/@n8n/config/src/configs/logging.config.ts index 1b7676cdd0..791dc6af64 100644 --- a/packages/@n8n/config/src/configs/logging.config.ts +++ b/packages/@n8n/config/src/configs/logging.config.ts @@ -84,6 +84,8 @@ export class LoggingConfig { * - `scaling` * - `waiting-executions` * - `task-runner` + * - `workflow-activation` + * - `insights` * * @example * `N8N_LOG_SCOPES=license` diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts index 15a5363e2d..6b84a99012 100644 --- a/packages/cli/src/__tests__/active-workflow-manager.test.ts +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -36,7 +36,6 @@ describe('ActiveWorkflowManager', () => { mock(), mock(), mock(), - mock(), instanceSettings, mock(), mock(), diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index fe1b60c384..66bc9ebf9a 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -34,6 +34,7 @@ import { WebhookPathTakenError, UnexpectedError, } from 'n8n-workflow'; +import { strict } from 'node:assert'; import { ActivationErrorsService } from '@/activation-errors.service'; import { ActiveExecutions } from '@/active-executions'; @@ -49,7 +50,6 @@ import { ExternalHooks } from '@/external-hooks'; import { NodeTypes } from '@/node-types'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { ActiveWorkflowsService } from '@/services/active-workflows.service'; -import { OrchestrationService } from '@/services/orchestration.service'; import * as WebhookHelpers from '@/webhooks/webhook-helpers'; import { WebhookService } from '@/webhooks/webhook.service'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; @@ -77,7 +77,6 @@ export class ActiveWorkflowManager { private readonly nodeTypes: NodeTypes, private readonly webhookService: WebhookService, private readonly workflowRepository: WorkflowRepository, - private readonly orchestrationService: OrchestrationService, private readonly activationErrorsService: ActivationErrorsService, private readonly executionService: ExecutionService, private readonly workflowStaticDataService: WorkflowStaticDataService, @@ -89,7 +88,10 @@ export class ActiveWorkflowManager { ) {} async init() { - await this.orchestrationService.init(); + strict( + this.instanceSettings.instanceRole !== 'unset', + 'Active workflow manager expects instance role to be set', + ); await this.addActiveWorkflows('init'); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 071a60a8c6..55965ccf14 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -21,10 +21,11 @@ import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { EventService } from '@/events/event.service'; import { ExecutionService } from '@/executions/execution.service'; +import { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { Server } from '@/server'; -import { OrchestrationService } from '@/services/orchestration.service'; import { OwnershipService } from '@/services/ownership.service'; import { PruningService } from '@/services/pruning/pruning.service'; import { UrlService } from '@/services/url.service'; @@ -104,7 +105,12 @@ export class Start extends BaseCommand { await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows(); if (this.instanceSettings.isMultiMain) { - await Container.get(OrchestrationService).shutdown(); + await Container.get(MultiMainSetup).shutdown(); + } + + if (config.getEnv('executions.mode') === 'queue') { + Container.get(Publisher).shutdown(); + Container.get(Subscriber).shutdown(); } Container.get(EventService).emit('instance-stopped'); @@ -201,13 +207,18 @@ export class Start extends BaseCommand { this.instanceSettings.setMultiMainEnabled(isMultiMainEnabled); /** - * We temporarily license multi-main to allow orchestration to set instance - * role, which is needed by license init. Once the license is initialized, + * We temporarily license multi-main to allow it to set instance role, + * which is needed by license init. Once the license is initialized, * the actual value will be used for the license check. */ if (isMultiMainEnabled) this.instanceSettings.setMultiMainLicensed(true); - await this.initOrchestration(); + if (config.getEnv('executions.mode') === 'regular') { + this.instanceSettings.markAsLeader(); + } else { + await this.initOrchestration(); + } + await this.initLicense(); if (isMultiMainEnabled && !this.license.isMultiMainLicensed()) { @@ -240,14 +251,7 @@ export class Start extends BaseCommand { } async initOrchestration() { - if (config.getEnv('executions.mode') === 'regular') { - this.instanceSettings.markAsLeader(); - return; - } - - const orchestrationService = Container.get(OrchestrationService); - - await orchestrationService.init(); + Container.get(Publisher); Container.get(PubSubHandler).init(); @@ -255,7 +259,11 @@ export class Start extends BaseCommand { await subscriber.subscribe('n8n.commands'); await subscriber.subscribe('n8n.worker-response'); - this.logger.scoped(['scaling', 'pubsub']).debug('Pubsub setup completed'); + if (this.instanceSettings.isMultiMain) { + await Container.get(MultiMainSetup).init(); + } else { + this.instanceSettings.markAsLeader(); + } } async run() { diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 9ab1718b85..0cb446cc74 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -3,9 +3,9 @@ import { Flags } from '@oclif/core'; import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; -import { OrchestrationService } from '@/services/orchestration.service'; import { WebhookServer } from '@/webhooks/webhook-server'; import { BaseCommand } from './base-command'; @@ -98,7 +98,7 @@ export class Webhook extends BaseCommand { } async initOrchestration() { - await Container.get(OrchestrationService).init(); + Container.get(Publisher); Container.get(PubSubHandler).init(); await Container.get(Subscriber).subscribe('n8n.commands'); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 4a407e1f79..bf77b08cc5 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -6,11 +6,11 @@ import { N8N_VERSION, inTest } from '@/constants'; import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay'; +import { Publisher } from '@/scaling/pubsub/publisher.service'; import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import type { ScalingService } from '@/scaling/scaling.service'; import type { WorkerServerEndpointsConfig } from '@/scaling/worker-server'; -import { OrchestrationService } from '@/services/orchestration.service'; import { BaseCommand } from './base-command'; @@ -127,12 +127,10 @@ export class Worker extends BaseCommand { * The subscription connection adds a handler to handle the command messages */ async initOrchestration() { - await Container.get(OrchestrationService).init(); + Container.get(Publisher); Container.get(PubSubHandler).init(); await Container.get(Subscriber).subscribe('n8n.commands'); - - this.logger.scoped(['scaling', 'pubsub']).debug('Pubsub setup completed'); } async setConcurrency() { diff --git a/packages/cli/src/controllers/debug.controller.ts b/packages/cli/src/controllers/debug.controller.ts index 58ce67bc38..80d4ae7ff7 100644 --- a/packages/cli/src/controllers/debug.controller.ts +++ b/packages/cli/src/controllers/debug.controller.ts @@ -3,12 +3,12 @@ import { InstanceSettings } from 'n8n-core'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; @RestController('/debug') export class DebugController { constructor( - private readonly orchestrationService: OrchestrationService, + private readonly multiMainSetup: MultiMainSetup, private readonly activeWorkflowManager: ActiveWorkflowManager, private readonly workflowRepository: WorkflowRepository, private readonly instanceSettings: InstanceSettings, @@ -16,7 +16,7 @@ export class DebugController { @Get('/multi-main-setup', { skipAuth: true }) async getMultiMainSetupDetails() { - const leaderKey = await this.orchestrationService.multiMainSetup.fetchLeaderKey(); + const leaderKey = await this.multiMainSetup.fetchLeaderKey(); const triggersAndPollers = await this.workflowRepository.findIn( this.activeWorkflowManager.allActiveInMemory(), diff --git a/packages/cli/src/scaling/multi-main-setup.ee.ts b/packages/cli/src/scaling/multi-main-setup.ee.ts index 3f74784ef1..334c82d79e 100644 --- a/packages/cli/src/scaling/multi-main-setup.ee.ts +++ b/packages/cli/src/scaling/multi-main-setup.ee.ts @@ -58,6 +58,7 @@ export class MultiMainSetup extends TypedEmitter { }, this.globalConfig.multiMainSetup.interval * Time.seconds.toMilliseconds); } + // @TODO: Use `@OnShutdown()` decorator async shutdown() { clearInterval(this.leaderCheckInterval); @@ -117,7 +118,7 @@ export class MultiMainSetup extends TypedEmitter { ); if (keySetSuccessfully) { - this.logger.debug(`[Instance ID ${hostId}] Leader is now this instance`); + this.logger.info(`[Instance ID ${hostId}] Leader is now this instance`); this.instanceSettings.markAsLeader(); diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts deleted file mode 100644 index 89c043a589..0000000000 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Container } from '@n8n/di'; -import type Redis from 'ioredis'; -import { mock } from 'jest-mock-extended'; -import { InstanceSettings } from 'n8n-core'; - -import { ActiveWorkflowManager } from '@/active-workflow-manager'; -import config from '@/config'; -import { ExternalSecretsManager } from '@/external-secrets.ee/external-secrets-manager.ee'; -import { Push } from '@/push'; -import { OrchestrationService } from '@/services/orchestration.service'; -import { RedisClientService } from '@/services/redis-client.service'; -import { mockInstance } from '@test/mocking'; - -config.set('executions.mode', 'queue'); -config.set('generic.instanceType', 'main'); - -const instanceSettings = Container.get(InstanceSettings); -const redisClientService = mockInstance(RedisClientService); -const mockRedisClient = mock(); -redisClientService.createClient.mockReturnValue(mockRedisClient); - -const os = Container.get(OrchestrationService); -mockInstance(ActiveWorkflowManager); - -describe('Orchestration Service', () => { - mockInstance(Push); - mockInstance(ExternalSecretsManager); - - beforeAll(async () => { - // @ts-expect-error readonly property - instanceSettings.instanceType = 'main'; - }); - - beforeEach(() => { - instanceSettings.markAsLeader(); - }); - - afterAll(async () => { - await os.shutdown(); - }); - - test('should initialize', async () => { - await os.init(); - // @ts-expect-error Private field - expect(os.publisher).toBeDefined(); - }); -}); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts deleted file mode 100644 index 2745108c69..0000000000 --- a/packages/cli/src/services/orchestration.service.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { GlobalConfig } from '@n8n/config'; -import { Container, Service } from '@n8n/di'; -import { InstanceSettings } from 'n8n-core'; - -import config from '@/config'; -import type { Publisher } from '@/scaling/pubsub/publisher.service'; -import type { Subscriber } from '@/scaling/pubsub/subscriber.service'; - -import { MultiMainSetup } from '../scaling/multi-main-setup.ee'; - -@Service() -export class OrchestrationService { - constructor( - readonly instanceSettings: InstanceSettings, - readonly multiMainSetup: MultiMainSetup, - readonly globalConfig: GlobalConfig, - ) {} - - private publisher: Publisher; - - private subscriber: Subscriber; - - isInitialized = false; - - async init() { - if (this.isInitialized) return; - - if (config.get('executions.mode') === 'queue') { - const { Publisher } = await import('@/scaling/pubsub/publisher.service'); - this.publisher = Container.get(Publisher); - - const { Subscriber } = await import('@/scaling/pubsub/subscriber.service'); - this.subscriber = Container.get(Subscriber); - } - - if (this.instanceSettings.isMultiMain) { - await this.multiMainSetup.init(); - } else { - this.instanceSettings.markAsLeader(); - } - - this.isInitialized = true; - } - - // @TODO: Use `@OnShutdown()` decorator - async shutdown() { - if (!this.isInitialized) return; - - if (this.instanceSettings.isMultiMain) await this.multiMainSetup.shutdown(); - - this.publisher.shutdown(); - this.subscriber.shutdown(); - - this.isInitialized = false; - } -} diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index f248225a1e..d81edab112 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -28,7 +28,6 @@ import { validateEntity } from '@/generic-helpers'; import type { ListQuery } from '@/requests'; import { hasSharing } from '@/requests'; import { FolderService } from '@/services/folder.service'; -import { OrchestrationService } from '@/services/orchestration.service'; import { OwnershipService } from '@/services/ownership.service'; import { ProjectService } from '@/services/project.service.ee'; import { RoleService } from '@/services/role.service'; @@ -50,7 +49,6 @@ export class WorkflowService { private readonly ownershipService: OwnershipService, private readonly tagService: TagService, private readonly workflowHistoryService: WorkflowHistoryService, - private readonly orchestrationService: OrchestrationService, private readonly externalHooks: ExternalHooks, private readonly activeWorkflowManager: ActiveWorkflowManager, private readonly roleService: RoleService, @@ -370,8 +368,6 @@ export class WorkflowService { } } - await this.orchestrationService.init(); - return updatedWorkflow; } diff --git a/packages/cli/test/integration/active-workflow-manager.test.ts b/packages/cli/test/integration/active-workflow-manager.test.ts index 5ebe80af08..a1ee102ef1 100644 --- a/packages/cli/test/integration/active-workflow-manager.test.ts +++ b/packages/cli/test/integration/active-workflow-manager.test.ts @@ -1,7 +1,7 @@ import type { WebhookEntity } from '@n8n/db'; import { Container } from '@n8n/di'; import { mock } from 'jest-mock-extended'; -import { Logger } from 'n8n-core'; +import { InstanceSettings, Logger } from 'n8n-core'; import { FormTrigger } from 'n8n-nodes-base/nodes/Form/FormTrigger.node'; import { ScheduleTrigger } from 'n8n-nodes-base/nodes/Schedule/ScheduleTrigger.node'; import { NodeApiError, Workflow } from 'n8n-workflow'; @@ -67,6 +67,7 @@ beforeAll(async () => { const owner = await createOwner(); createActiveWorkflow = async () => await createWorkflow({ active: true }, owner); createInactiveWorkflow = async () => await createWorkflow({ active: false }, owner); + Container.get(InstanceSettings).markAsLeader(); }); afterEach(async () => { diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 36d6322ec7..a21da2ac23 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -16,7 +16,6 @@ import { Push } from '@/push'; import { Publisher } from '@/scaling/pubsub/publisher.service'; import { Subscriber } from '@/scaling/pubsub/subscriber.service'; import { ScalingService } from '@/scaling/scaling.service'; -import { OrchestrationService } from '@/services/orchestration.service'; import { TaskBrokerServer } from '@/task-runners/task-broker/task-broker-server'; import { TaskRunnerProcess } from '@/task-runners/task-runner-process'; import { Telemetry } from '@/telemetry'; @@ -35,7 +34,6 @@ const license = mockInstance(License, { loadCertStr: async () => '' }); const messageEventBus = mockInstance(MessageEventBus); const logStreamingEventRelay = mockInstance(LogStreamingEventRelay); const scalingService = mockInstance(ScalingService); -const orchestrationService = mockInstance(OrchestrationService); const taskBrokerServer = mockInstance(TaskBrokerServer); const taskRunnerProcess = mockInstance(TaskRunnerProcess); mockInstance(Publisher); @@ -58,7 +56,6 @@ test('worker initializes all its components', async () => { expect(scalingService.setupQueue).toHaveBeenCalledTimes(1); expect(scalingService.setupWorker).toHaveBeenCalledTimes(1); expect(logStreamingEventRelay.init).toHaveBeenCalledTimes(1); - expect(orchestrationService.init).toHaveBeenCalledTimes(1); expect(messageEventBus.send).toHaveBeenCalledTimes(1); expect(taskBrokerServer.start).toHaveBeenCalledTimes(1); expect(taskRunnerProcess.start).toHaveBeenCalledTimes(1); diff --git a/packages/cli/test/integration/public-api/workflows.test.ts b/packages/cli/test/integration/public-api/workflows.test.ts index edeb83be32..4b2c07a880 100644 --- a/packages/cli/test/integration/public-api/workflows.test.ts +++ b/packages/cli/test/integration/public-api/workflows.test.ts @@ -4,6 +4,7 @@ import type { TagEntity } from '@n8n/db'; import type { User } from '@n8n/db'; import { ProjectRepository } from '@n8n/db'; import { Container } from '@n8n/di'; +import { InstanceSettings } from 'n8n-core'; import type { INode } from 'n8n-workflow'; import { ActiveWorkflowManager } from '@/active-workflow-manager'; @@ -42,6 +43,7 @@ mockInstance(ExecutionService); beforeAll(async () => { owner = await createOwnerWithApiKey(); + Container.get(InstanceSettings).markAsLeader(); ownerPersonalProject = await Container.get(ProjectRepository).getPersonalProjectForUserOrFail( owner.id, ); diff --git a/packages/cli/test/integration/workflows/workflow.service.test.ts b/packages/cli/test/integration/workflows/workflow.service.test.ts index 93042f5dd1..99a9f6fff1 100644 --- a/packages/cli/test/integration/workflows/workflow.service.test.ts +++ b/packages/cli/test/integration/workflows/workflow.service.test.ts @@ -5,7 +5,6 @@ import { ActiveWorkflowManager } from '@/active-workflow-manager'; import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; -import { OrchestrationService } from '@/services/orchestration.service'; import { Telemetry } from '@/telemetry'; import { WorkflowFinderService } from '@/workflows/workflow-finder.service'; import { WorkflowService } from '@/workflows/workflow.service'; @@ -17,7 +16,6 @@ import * as testDb from '../shared/test-db'; let workflowService: WorkflowService; const activeWorkflowManager = mockInstance(ActiveWorkflowManager); -const orchestrationService = mockInstance(OrchestrationService); mockInstance(MessageEventBus); mockInstance(Telemetry); @@ -33,7 +31,6 @@ beforeAll(async () => { mock(), mock(), mock(), - orchestrationService, mock(), activeWorkflowManager, mock(),