refactor(core): Decouple insights module from multi-main (#14778)

This commit is contained in:
Iván Ovejero
2025-04-22 10:40:53 +02:00
committed by GitHub
parent 88ed7beff2
commit 6f92d26bbc
5 changed files with 34 additions and 36 deletions

View File

@@ -39,6 +39,7 @@ import type { ModulePreInit } from '@/modules/modules.config';
import { ModulesConfig } from '@/modules/modules.config';
import { NodeTypes } from '@/node-types';
import { PostHogClient } from '@/posthog';
import { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { ShutdownService } from '@/shutdown/shutdown.service';
import { WorkflowHistoryManager } from '@/workflows/workflow-history.ee/workflow-history-manager.ee';
@@ -96,7 +97,13 @@ export abstract class BaseCommand extends Command {
}
}
Container.get(ModuleRegistry).initializeModules();
const moduleRegistry = Container.get(ModuleRegistry);
moduleRegistry.initializeModules();
if (this.instanceSettings.isMultiMain) {
moduleRegistry.registerMultiMainListeners(Container.get(MultiMainSetup));
}
}
async init(): Promise<void> {

View File

@@ -1,9 +1,12 @@
import { Container, Service, type Constructable } from '@n8n/di';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
export interface BaseN8nModule {
initialize?(): void;
registerLifecycleHooks?(hooks: ExecutionLifecycleHooks): void;
registerMultiMainListeners?(multiMainSetup: MultiMainSetup): void;
}
type Module = Constructable<BaseN8nModule>;
@@ -36,4 +39,10 @@ export class ModuleRegistry {
}
}
}
registerMultiMainListeners(multiMainSetup: MultiMainSetup) {
for (const ModuleClass of registry.keys()) {
Container.get(ModuleClass).registerMultiMainListeners?.(multiMainSetup);
}
}
}

View File

@@ -30,52 +30,34 @@ describe('InsightsModule', () => {
describe('backgroundProcess', () => {
it('should start background process if instance is main and leader', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true });
const insightsModule = new InsightsModule(
logger,
insightsService,
instanceSettings,
orchestrationService,
);
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.startBackgroundProcess).toHaveBeenCalled();
});
it('should not start background process if instance is main but not leader', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false });
const insightsModule = new InsightsModule(
logger,
insightsService,
instanceSettings,
orchestrationService,
);
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled();
});
it('should start background process on leader takeover', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false });
const insightsModule = new InsightsModule(
logger,
insightsService,
instanceSettings,
orchestrationService,
);
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled();
insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup);
orchestrationService.multiMainSetup.emit('leader-takeover');
expect(insightsService.startBackgroundProcess).toHaveBeenCalled();
});
it('should stop background process on leader stepdown', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true });
const insightsModule = new InsightsModule(
logger,
insightsService,
instanceSettings,
orchestrationService,
);
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.stopBackgroundProcess).not.toHaveBeenCalled();
insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup);
orchestrationService.multiMainSetup.emit('leader-stepdown');
expect(insightsService.stopBackgroundProcess).toHaveBeenCalled();
});

View File

@@ -3,9 +3,10 @@ import { InstanceSettings, Logger } from 'n8n-core';
import type { BaseN8nModule } from '@/decorators/module';
import { N8nModule } from '@/decorators/module';
import { OrchestrationService } from '@/services/orchestration.service';
import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { InsightsService } from './insights.service';
import './insights.controller';
@N8nModule()
@@ -14,7 +15,6 @@ export class InsightsModule implements BaseN8nModule {
private readonly logger: Logger,
private readonly insightsService: InsightsService,
private readonly instanceSettings: InstanceSettings,
private readonly orchestrationService: OrchestrationService,
) {
this.logger = this.logger.scoped('insights');
}
@@ -25,15 +25,6 @@ export class InsightsModule implements BaseN8nModule {
if (this.instanceSettings.isLeader) {
this.insightsService.startBackgroundProcess();
}
if (this.instanceSettings.isMultiMain) {
this.orchestrationService.multiMainSetup.on('leader-takeover', () =>
this.insightsService.startBackgroundProcess(),
);
this.orchestrationService.multiMainSetup.on('leader-stepdown', () =>
this.insightsService.stopBackgroundProcess(),
);
}
}
registerLifecycleHooks(hooks: ExecutionLifecycleHooks) {
@@ -43,4 +34,9 @@ export class InsightsModule implements BaseN8nModule {
await insightsService.workflowExecuteAfterHandler(this, fullRunData);
});
}
registerMultiMainListeners(multiMainSetup: MultiMainSetup) {
multiMainSetup.on('leader-takeover', () => this.insightsService.startBackgroundProcess());
multiMainSetup.on('leader-stepdown', () => this.insightsService.stopBackgroundProcess());
}
}

View File

@@ -85,11 +85,15 @@ export class InsightsService {
startBackgroundProcess() {
this.startCompactionScheduler();
this.startFlushingScheduler();
this.logger.debug('Started compaction and flushing schedulers');
}
stopBackgroundProcess() {
this.stopCompactionScheduler();
this.stopFlushingScheduler();
this.logger.debug('Stopped compaction and flushing schedulers');
}
// Initialize regular compaction of insights data