refactor(core): Introduce @OnLeaderTakeover and @OnLeaderStepdown (#14940)

This commit is contained in:
Iván Ovejero
2025-04-28 10:53:01 +02:00
committed by GitHub
parent 9f2182568a
commit 2d60e469f3
10 changed files with 326 additions and 51 deletions

View File

@@ -0,0 +1,193 @@
import { Container } from '@n8n/di';
import { Service } from '@n8n/di';
import { EventEmitter } from 'node:events';
import { MultiMainMetadata } from '../multi-main-metadata';
import { LEADER_TAKEOVER_EVENT_NAME, LEADER_STEPDOWN_EVENT_NAME } from '../multi-main-metadata';
import { NonMethodError, OnLeaderStepdown, OnLeaderTakeover } from '../on-multi-main-event';
class MockMultiMainSetup extends EventEmitter {
registerEventHandlers() {
const handlers = Container.get(MultiMainMetadata).getHandlers();
for (const { eventHandlerClass, methodName, eventName } of handlers) {
const instance = Container.get(eventHandlerClass);
this.on(eventName, async () => {
return await instance[methodName].call(instance);
});
}
}
}
let multiMainSetup: MockMultiMainSetup;
let metadata: MultiMainMetadata;
beforeEach(() => {
Container.reset();
metadata = new MultiMainMetadata();
Container.set(MultiMainMetadata, metadata);
multiMainSetup = new MockMultiMainSetup();
});
it('should register methods decorated with @OnLeaderTakeover', () => {
jest.spyOn(metadata, 'register');
@Service()
class TestService {
@OnLeaderTakeover()
async handleLeaderTakeover() {}
}
expect(metadata.register).toHaveBeenCalledWith({
eventName: LEADER_TAKEOVER_EVENT_NAME,
methodName: 'handleLeaderTakeover',
eventHandlerClass: TestService,
});
});
it('should register methods decorated with @OnLeaderStepdown', () => {
jest.spyOn(metadata, 'register');
@Service()
class TestService {
@OnLeaderStepdown()
async handleLeaderStepdown() {}
}
expect(metadata.register).toHaveBeenCalledTimes(1);
expect(metadata.register).toHaveBeenCalledWith({
eventName: LEADER_STEPDOWN_EVENT_NAME,
methodName: 'handleLeaderStepdown',
eventHandlerClass: TestService,
});
});
it('should throw an error if the decorated target is not a method', () => {
expect(() => {
@Service()
class TestService {
// @ts-expect-error Testing invalid code
@OnLeaderTakeover()
notAFunction = 'string';
}
new TestService();
}).toThrowError(NonMethodError);
});
it('should call decorated methods when events are emitted', async () => {
@Service()
class TestService {
takeoverCalled = false;
stepdownCalled = false;
@OnLeaderTakeover()
async handleLeaderTakeover() {
this.takeoverCalled = true;
}
@OnLeaderStepdown()
async handleLeaderStepdown() {
this.stepdownCalled = true;
}
}
const testService = Container.get(TestService);
jest.spyOn(testService, 'handleLeaderTakeover');
jest.spyOn(testService, 'handleLeaderStepdown');
multiMainSetup.registerEventHandlers();
multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME);
multiMainSetup.emit(LEADER_STEPDOWN_EVENT_NAME);
expect(testService.handleLeaderTakeover).toHaveBeenCalledTimes(1);
expect(testService.handleLeaderStepdown).toHaveBeenCalledTimes(1);
expect(testService.takeoverCalled).toBe(true);
expect(testService.stepdownCalled).toBe(true);
});
it('should register multiple handlers for the same event', async () => {
@Service()
class TestService {
firstHandlerCalled = false;
secondHandlerCalled = false;
@OnLeaderTakeover()
async firstHandler() {
this.firstHandlerCalled = true;
}
@OnLeaderTakeover()
async secondHandler() {
this.secondHandlerCalled = true;
}
}
const testService = Container.get(TestService);
multiMainSetup.registerEventHandlers();
multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME);
expect(testService.firstHandlerCalled).toBe(true);
expect(testService.secondHandlerCalled).toBe(true);
});
it('should register handlers from multiple service classes', async () => {
@Service()
class FirstService {
handlerCalled = false;
@OnLeaderTakeover()
async handleTakeover() {
this.handlerCalled = true;
}
}
@Service()
class SecondService {
handlerCalled = false;
@OnLeaderTakeover()
async handleTakeover() {
this.handlerCalled = true;
}
}
const firstService = Container.get(FirstService);
const secondService = Container.get(SecondService);
multiMainSetup.registerEventHandlers();
multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME);
expect(firstService.handlerCalled).toBe(true);
expect(secondService.handlerCalled).toBe(true);
});
it('should handle async methods correctly', async () => {
@Service()
class TestService {
result = '';
@OnLeaderTakeover()
async handleLeaderTakeover() {
await new Promise((resolve) => setTimeout(resolve, 10));
this.result = 'completed';
}
}
const testService = Container.get(TestService);
multiMainSetup.registerEventHandlers();
multiMainSetup.emit(LEADER_TAKEOVER_EVENT_NAME);
await new Promise((resolve) => setTimeout(resolve, 20));
expect(testService.result).toBe('completed');
});

View File

@@ -18,3 +18,5 @@ export { BaseN8nModule, N8nModule } from './module';
export { Debounce } from './debounce';
export type { AccessScope, Controller, RateLimit } from './types';
export type { ShutdownHandler } from './types';
export { MultiMainMetadata } from './multi-main-metadata';
export { OnLeaderTakeover, OnLeaderStepdown } from './on-multi-main-event';

View File

@@ -1,10 +1,4 @@
import { Container, Service, type Constructable } from '@n8n/di';
import type EventEmitter from 'node:events';
/**
* @TODO Temporary dummy type until `MultiMainSetup` registers listeners via decorators.
*/
type MultiMainSetup = EventEmitter;
/**
* @TODO Temporary dummy type until `ExecutionLifecycleHooks` registers hooks via decorators.
@@ -14,7 +8,6 @@ export type ExecutionLifecycleHooks = object;
export interface BaseN8nModule {
initialize?(): void;
registerLifecycleHooks?(hooks: ExecutionLifecycleHooks): void;
registerMultiMainListeners?(multiMainSetup: MultiMainSetup): void;
}
type Module = Constructable<BaseN8nModule>;
@@ -47,10 +40,4 @@ export class ModuleRegistry {
}
}
}
registerMultiMainListeners(multiMainSetup: MultiMainSetup) {
for (const ModuleClass of registry.keys()) {
Container.get(ModuleClass).registerMultiMainListeners?.(multiMainSetup);
}
}
}

View File

@@ -0,0 +1,36 @@
import { Service } from '@n8n/di';
import type { Class } from './types';
export const LEADER_TAKEOVER_EVENT_NAME = 'leader-takeover';
export const LEADER_STEPDOWN_EVENT_NAME = 'leader-stepdown';
export type MultiMainEvent = typeof LEADER_TAKEOVER_EVENT_NAME | typeof LEADER_STEPDOWN_EVENT_NAME;
type EventHandlerFn = () => Promise<void> | void;
export type EventHandlerClass = Class<Record<string, EventHandlerFn>>;
type EventHandler = {
/** Class holding the method to call on a multi-main event. */
eventHandlerClass: EventHandlerClass;
/** Name of the method to call on a multi-main event. */
methodName: string;
/** Name of the multi-main event to listen to. */
eventName: MultiMainEvent;
};
@Service()
export class MultiMainMetadata {
private readonly handlers: EventHandler[] = [];
register(handler: EventHandler) {
this.handlers.push(handler);
}
getHandlers(): EventHandler[] {
return this.handlers;
}
}

View File

@@ -0,0 +1,66 @@
import { Container } from '@n8n/di';
import { UnexpectedError } from 'n8n-workflow';
import type { EventHandlerClass, MultiMainEvent } from './multi-main-metadata';
import {
LEADER_TAKEOVER_EVENT_NAME,
LEADER_STEPDOWN_EVENT_NAME,
MultiMainMetadata,
} from './multi-main-metadata';
export class NonMethodError extends UnexpectedError {
constructor(name: string) {
super(`${name} must be a method on a class to use this decorator`);
}
}
const OnMultiMainEvent =
(eventName: MultiMainEvent): MethodDecorator =>
(prototype, propertyKey, descriptor) => {
const eventHandlerClass = prototype.constructor as EventHandlerClass;
const methodName = String(propertyKey);
if (typeof descriptor?.value !== 'function') {
throw new NonMethodError(`${eventHandlerClass.name}.${methodName}()`);
}
Container.get(MultiMainMetadata).register({
eventHandlerClass,
methodName,
eventName,
});
};
/**
* Decorator that registers a method to be called when this main instance becomes the leader.
*
* @example
*
* ```ts
* @Service()
* class MyService {
* @OnLeaderTakeover()
* async startDoingThings() {
* // ...
* }
* }
* ```
*/
export const OnLeaderTakeover = () => OnMultiMainEvent(LEADER_TAKEOVER_EVENT_NAME);
/**
* Decorator that registers a method to be called when this main instance stops being the leader.
*
* @example
*
* ```ts
* @Service()
* class MyService {
* @OnLeaderStepdown()
* async stopDoingThings() {
* // ...
* }
* }
* ```
*/
export const OnLeaderStepdown = () => OnMultiMainEvent(LEADER_STEPDOWN_EVENT_NAME);

View File

@@ -50,7 +50,7 @@ export type Controller = Constructable<object> &
type RouteHandlerFn = () => Promise<void> | void;
type Class<T = object, A extends unknown[] = unknown[]> = new (...args: A) => T;
export type Class<T = object, A extends unknown[] = unknown[]> = new (...args: A) => T;
export type ServiceClass = Class<Record<string, RouteHandlerFn>>;

View File

@@ -92,12 +92,10 @@ export abstract class BaseCommand extends Command {
}
}
const moduleRegistry = Container.get(ModuleRegistry);
moduleRegistry.initializeModules();
Container.get(ModuleRegistry).initializeModules();
if (this.instanceSettings.isMultiMain) {
moduleRegistry.registerMultiMainListeners(Container.get(MultiMainSetup));
Container.get(MultiMainSetup).registerEventHandlers();
}
}

View File

@@ -1,9 +1,7 @@
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import type { Logger } from 'n8n-core';
import { OrchestrationService } from '@/services/orchestration.service';
import { mockInstance } from '@test/mocking';
import { InsightsModule } from '../insights.module';
@@ -13,7 +11,6 @@ describe('InsightsModule', () => {
let logger: Logger;
let insightsService: InsightsService;
let instanceSettings: InstanceSettings;
let orchestrationService: OrchestrationService;
beforeEach(() => {
logger = mock<Logger>({
@@ -24,7 +21,6 @@ describe('InsightsModule', () => {
),
});
insightsService = mockInstance(InsightsService);
orchestrationService = Container.get(OrchestrationService);
});
describe('backgroundProcess', () => {
@@ -41,25 +37,5 @@ describe('InsightsModule', () => {
insightsModule.initialize();
expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled();
});
it('should start background process on leader takeover', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false });
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled();
insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup);
orchestrationService.multiMainSetup.emit('leader-takeover');
expect(insightsService.startBackgroundProcess).toHaveBeenCalled();
});
it('should stop background process on leader stepdown', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true });
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.stopBackgroundProcess).not.toHaveBeenCalled();
insightsModule.registerMultiMainListeners(orchestrationService.multiMainSetup);
orchestrationService.multiMainSetup.emit('leader-stepdown');
expect(insightsService.stopBackgroundProcess).toHaveBeenCalled();
});
});
});

View File

@@ -1,10 +1,8 @@
import type { BaseN8nModule } from '@n8n/decorators';
import { N8nModule } from '@n8n/decorators';
import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import { InstanceSettings, Logger } from 'n8n-core';
import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import { InsightsService } from './insights.service';
import './insights.controller';
@@ -35,8 +33,13 @@ export class InsightsModule implements BaseN8nModule {
});
}
registerMultiMainListeners(multiMainSetup: MultiMainSetup) {
multiMainSetup.on('leader-takeover', () => this.insightsService.startBackgroundProcess());
multiMainSetup.on('leader-stepdown', () => this.insightsService.stopBackgroundProcess());
@OnLeaderTakeover()
startBackgroundProcess() {
this.insightsService.startBackgroundProcess();
}
@OnLeaderStepdown()
stopBackgroundProcess() {
this.insightsService.stopBackgroundProcess();
}
}

View File

@@ -1,5 +1,6 @@
import { GlobalConfig } from '@n8n/config';
import { Service } from '@n8n/di';
import { MultiMainMetadata } from '@n8n/decorators';
import { Container, Service } from '@n8n/di';
import { InstanceSettings, Logger } from 'n8n-core';
import config from '@/config';
@@ -12,14 +13,14 @@ type MultiMainEvents = {
/**
* Emitted when this instance loses leadership. In response, its various
* services will stop triggers, pollers, pruning, wait-tracking, license
* renewal, queue recovery, etc.
* renewal, queue recovery, insights, etc.
*/
'leader-stepdown': never;
/**
* Emitted when this instance gains leadership. In response, its various
* services will start triggers, pollers, pruning, wait-tracking, license
* renewal, queue recovery, etc.
* renewal, queue recovery, insights, etc.
*/
'leader-takeover': never;
};
@@ -33,6 +34,7 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
private readonly publisher: Publisher,
private readonly redisClientService: RedisClientService,
private readonly globalConfig: GlobalConfig,
private readonly metadata: MultiMainMetadata,
) {
super();
this.logger = this.logger.scoped(['scaling', 'multi-main-setup']);
@@ -128,4 +130,16 @@ export class MultiMainSetup extends TypedEmitter<MultiMainEvents> {
async fetchLeaderKey() {
return await this.publisher.get(this.leaderKey);
}
registerEventHandlers() {
const handlers = this.metadata.getHandlers();
for (const { eventHandlerClass, methodName, eventName } of handlers) {
const instance = Container.get(eventHandlerClass);
this.on(eventName, async () => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return instance[methodName].call(instance);
});
}
}
}