diff --git a/packages/cli/src/commands/base-command.ts b/packages/cli/src/commands/base-command.ts index 376c78dcc1..ea57dc6abd 100644 --- a/packages/cli/src/commands/base-command.ts +++ b/packages/cli/src/commands/base-command.ts @@ -39,6 +39,7 @@ import type { ModulePreInit } from '@/modules/modules.config'; import { ModulesConfig } from '@/modules/modules.config'; import { NodeTypes } from '@/node-types'; import { PostHogClient } from '@/posthog'; +import { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import { ShutdownService } from '@/shutdown/shutdown.service'; import { WorkflowHistoryManager } from '@/workflows/workflow-history.ee/workflow-history-manager.ee'; @@ -96,7 +97,13 @@ export abstract class BaseCommand extends Command { } } - Container.get(ModuleRegistry).initializeModules(); + const moduleRegistry = Container.get(ModuleRegistry); + + moduleRegistry.initializeModules(); + + if (this.instanceSettings.isMultiMain) { + moduleRegistry.registerMultiMainListeners(Container.get(MultiMainSetup)); + } } async init(): Promise { diff --git a/packages/cli/src/decorators/module.ts b/packages/cli/src/decorators/module.ts index 2afac7a07d..4b4358c548 100644 --- a/packages/cli/src/decorators/module.ts +++ b/packages/cli/src/decorators/module.ts @@ -1,9 +1,12 @@ import { Container, Service, type Constructable } from '@n8n/di'; import type { ExecutionLifecycleHooks } from 'n8n-core'; +import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; + export interface BaseN8nModule { initialize?(): void; registerLifecycleHooks?(hooks: ExecutionLifecycleHooks): void; + registerMultiMainListeners?(multiMainSetup: MultiMainSetup): void; } type Module = Constructable; @@ -36,4 +39,10 @@ export class ModuleRegistry { } } } + + registerMultiMainListeners(multiMainSetup: MultiMainSetup) { + for (const ModuleClass of registry.keys()) { + Container.get(ModuleClass).registerMultiMainListeners?.(multiMainSetup); + } + } } diff --git a/packages/cli/src/modules/insights/__tests__/insights.module.test.ts b/packages/cli/src/modules/insights/__tests__/insights.module.test.ts index b0b2b8ce7e..53592cfded 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.module.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.module.test.ts @@ -30,52 +30,34 @@ describe('InsightsModule', () => { describe('backgroundProcess', () => { it('should start background process if instance is main and leader', () => { instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true }); - const insightsModule = new InsightsModule( - logger, - insightsService, - instanceSettings, - orchestrationService, - ); + const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); insightsModule.initialize(); expect(insightsService.startBackgroundProcess).toHaveBeenCalled(); }); it('should not start background process if instance is main but not leader', () => { instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false }); - const insightsModule = new InsightsModule( - logger, - insightsService, - instanceSettings, - orchestrationService, - ); + const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); insightsModule.initialize(); expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled(); }); it('should start background process on leader takeover', () => { instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false }); - const insightsModule = new InsightsModule( - logger, - insightsService, - instanceSettings, - orchestrationService, - ); + const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); insightsModule.initialize(); expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled(); + insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup); orchestrationService.multiMainSetup.emit('leader-takeover'); expect(insightsService.startBackgroundProcess).toHaveBeenCalled(); }); it('should stop background process on leader stepdown', () => { instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true }); - const insightsModule = new InsightsModule( - logger, - insightsService, - instanceSettings, - orchestrationService, - ); + const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); insightsModule.initialize(); expect(insightsService.stopBackgroundProcess).not.toHaveBeenCalled(); + insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup); orchestrationService.multiMainSetup.emit('leader-stepdown'); expect(insightsService.stopBackgroundProcess).toHaveBeenCalled(); }); diff --git a/packages/cli/src/modules/insights/insights.module.ts b/packages/cli/src/modules/insights/insights.module.ts index 063ac1659e..e751e15383 100644 --- a/packages/cli/src/modules/insights/insights.module.ts +++ b/packages/cli/src/modules/insights/insights.module.ts @@ -3,9 +3,10 @@ import { InstanceSettings, Logger } from 'n8n-core'; import type { BaseN8nModule } from '@/decorators/module'; import { N8nModule } from '@/decorators/module'; -import { OrchestrationService } from '@/services/orchestration.service'; +import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee'; import { InsightsService } from './insights.service'; + import './insights.controller'; @N8nModule() @@ -14,7 +15,6 @@ export class InsightsModule implements BaseN8nModule { private readonly logger: Logger, private readonly insightsService: InsightsService, private readonly instanceSettings: InstanceSettings, - private readonly orchestrationService: OrchestrationService, ) { this.logger = this.logger.scoped('insights'); } @@ -25,15 +25,6 @@ export class InsightsModule implements BaseN8nModule { if (this.instanceSettings.isLeader) { this.insightsService.startBackgroundProcess(); } - - if (this.instanceSettings.isMultiMain) { - this.orchestrationService.multiMainSetup.on('leader-takeover', () => - this.insightsService.startBackgroundProcess(), - ); - this.orchestrationService.multiMainSetup.on('leader-stepdown', () => - this.insightsService.stopBackgroundProcess(), - ); - } } registerLifecycleHooks(hooks: ExecutionLifecycleHooks) { @@ -43,4 +34,9 @@ export class InsightsModule implements BaseN8nModule { await insightsService.workflowExecuteAfterHandler(this, fullRunData); }); } + + registerMultiMainListeners(multiMainSetup: MultiMainSetup) { + multiMainSetup.on('leader-takeover', () => this.insightsService.startBackgroundProcess()); + multiMainSetup.on('leader-stepdown', () => this.insightsService.stopBackgroundProcess()); + } } diff --git a/packages/cli/src/modules/insights/insights.service.ts b/packages/cli/src/modules/insights/insights.service.ts index da97477dbd..82a88505bb 100644 --- a/packages/cli/src/modules/insights/insights.service.ts +++ b/packages/cli/src/modules/insights/insights.service.ts @@ -85,11 +85,15 @@ export class InsightsService { startBackgroundProcess() { this.startCompactionScheduler(); this.startFlushingScheduler(); + + this.logger.debug('Started compaction and flushing schedulers'); } stopBackgroundProcess() { this.stopCompactionScheduler(); this.stopFlushingScheduler(); + + this.logger.debug('Stopped compaction and flushing schedulers'); } // Initialize regular compaction of insights data