refactor(core): Simplify worker pubsub message handler (#11086)

This commit is contained in:
Iván Ovejero
2024-10-07 16:19:58 +02:00
committed by GitHub
parent 2343634c64
commit 383b4765d2
8 changed files with 259 additions and 209 deletions

View File

@@ -7,7 +7,9 @@ import type { ExternalSecretsManager } from '@/external-secrets/external-secrets
import type { License } from '@/license';
import type { CommunityPackagesService } from '@/services/community-packages.service';
import type { Publisher } from '../pubsub/publisher.service';
import { PubSubHandler } from '../pubsub/pubsub-handler';
import type { WorkerStatus } from '../worker-status';
describe('PubSubHandler', () => {
const eventService = new EventService();
@@ -15,13 +17,19 @@ describe('PubSubHandler', () => {
const eventbus = mock<MessageEventBus>();
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();
const publisher = mock<Publisher>();
const workerStatus = mock<WorkerStatus>();
afterEach(() => {
eventService.removeAllListeners();
});
describe('in webhook process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'webhook' });
it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupWebhookHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupWebhookHandlers');
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
new PubSubHandler(
eventService,
@@ -30,9 +38,18 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
expect(setupWebhookHandlersSpy).toHaveBeenCalled();
expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
});
});
it('should reload license on `reload-license` event', () => {
@@ -43,6 +60,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-license');
@@ -58,6 +77,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('restart-event-bus');
@@ -73,6 +94,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-external-secrets-providers');
@@ -88,6 +111,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('community-package-install', {
@@ -109,6 +134,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('community-package-update', {
@@ -130,6 +157,8 @@ describe('PubSubHandler', () => {
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('community-package-uninstall', {
@@ -139,4 +168,123 @@ describe('PubSubHandler', () => {
expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
});
describe('in worker process', () => {
const instanceSettings = mock<InstanceSettings>({ instanceType: 'worker' });
it('should set up handlers in worker process', () => {
// @ts-expect-error Spying on private method
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
'get-worker-status': expect.any(Function),
'get-worker-id': expect.any(Function),
});
});
it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-license');
expect(license.reload).toHaveBeenCalled();
});
it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('restart-event-bus');
expect(eventbus.restart).toHaveBeenCalled();
});
it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('reload-external-secrets-providers');
expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});
it('should generate status on `get-worker-status` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('get-worker-status');
expect(workerStatus.generateStatus).toHaveBeenCalled();
});
it('should get worker ID on `get-worker-id` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
).init();
eventService.emit('get-worker-id');
expect(publisher.publishWorkerResponse).toHaveBeenCalledWith({
workerId: expect.any(String),
command: 'get-worker-id',
});
});
});
});

View File

@@ -1,12 +1,17 @@
import { InstanceSettings } from 'n8n-core';
import { Service } from 'typedi';
import config from '@/config';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { CommunityPackagesService } from '@/services/community-packages.service';
import { assertNever } from '@/utils';
import { WorkerStatus } from '../worker-status';
/**
* Responsible for handling events emitted from messages received via a pubsub channel.
@@ -20,10 +25,37 @@ export class PubSubHandler {
private readonly eventbus: MessageEventBus,
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
private readonly publisher: Publisher,
private readonly workerStatus: WorkerStatus,
) {}
init() {
if (this.instanceSettings.instanceType === 'webhook') this.setupWebhookHandlers();
switch (this.instanceSettings.instanceType) {
case 'webhook':
this.setupHandlers(this.commonHandlers);
break;
case 'worker':
this.setupHandlers({
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-status',
payload: this.workerStatus.generateStatus(),
}),
'get-worker-id': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-id',
}),
});
break;
case 'main':
// TODO
break;
default:
assertNever(this.instanceSettings.instanceType);
}
}
private setupHandlers<EventNames extends keyof PubSubEventMap>(
@@ -40,22 +72,27 @@ export class PubSubHandler {
}
}
// #region Webhook process
private setupWebhookHandlers() {
this.setupHandlers({
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
});
}
// #endregion
/** Handlers shared by webhook and worker processes. */
private commonHandlers: {
[K in keyof Pick<
PubSubEventMap,
| 'reload-license'
| 'restart-event-bus'
| 'reload-external-secrets-providers'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
>]: (event: PubSubEventMap[K]) => Promise<void>;
} = {
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
'reload-external-secrets-providers': async () =>
await this.externalSecretsManager.reloadAllProviders(),
'community-package-install': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-update': async ({ packageName, packageVersion }) =>
await this.communityPackagesService.installOrUpdateNpmPackage(packageName, packageVersion),
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
};
}

View File

@@ -0,0 +1,43 @@
import os from 'node:os';
import { Service } from 'typedi';
import config from '@/config';
import { N8N_VERSION } from '@/constants';
import { JobProcessor } from './job-processor';
@Service()
export class WorkerStatus {
constructor(private readonly jobProcessor: JobProcessor) {}
generateStatus() {
return {
workerId: config.getEnv('redis.queueModeId'),
runningJobsSummary: this.jobProcessor.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),
uptime: process.uptime(),
loadAvg: os.loadavg(),
cpus: this.getOsCpuString(),
arch: os.arch(),
platform: os.platform(),
hostname: os.hostname(),
interfaces: Object.values(os.networkInterfaces()).flatMap((interfaces) =>
(interfaces ?? [])?.map((net) => ({
family: net.family,
address: net.address,
internal: net.internal,
})),
),
version: N8N_VERSION,
};
}
private getOsCpuString() {
const cpus = os.cpus();
if (cpus.length === 0) return 'no CPU info';
return `${cpus.length}x ${cpus[0].model} - speed: ${cpus[0].speed}`;
}
}