refactor(core): Simplify main pubsub message handler (#11156)

This commit is contained in:
Iván Ovejero
2024-10-11 10:31:33 +02:00
committed by GitHub
parent 4aaebfd435
commit 0820cb5ab9
21 changed files with 813 additions and 531 deletions

View File

@@ -61,7 +61,7 @@ describe('Publisher', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<PubSub.WorkerResponse>({
command: 'get-worker-status',
response: 'response-to-get-worker-status',
});
await publisher.publishWorkerResponse(msg);

View File

@@ -1,15 +1,25 @@
import type { WorkerStatus } from '@n8n/api-types';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { Workflow } from 'n8n-workflow';
import type { ActiveWorkflowManager } from '@/active-workflow-manager';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import type { IWorkflowDb } from '@/interfaces';
import type { License } from '@/license';
import type { Push } from '@/push';
import type { WebSocketPush } from '@/push/websocket.push';
import type { CommunityPackagesService } from '@/services/community-packages.service';
import type { TestWebhooks } from '@/webhooks/test-webhooks';
import type { Publisher } from '../pubsub/publisher.service';
import { PubSubHandler } from '../pubsub/pubsub-handler';
import type { WorkerStatus } from '../worker-status';
import type { WorkerStatusService } from '../worker-status.service';
const flushPromises = async () => await new Promise((resolve) => setImmediate(resolve));
describe('PubSubHandler', () => {
const eventService = new EventService();
@@ -18,7 +28,11 @@ describe('PubSubHandler', () => {
const externalSecretsManager = mock<ExternalSecretsManager>();
const communityPackagesService = mock<CommunityPackagesService>();
const publisher = mock<Publisher>();
const workerStatus = mock<WorkerStatus>();
const workerStatusService = mock<WorkerStatusService>();
const activeWorkflowManager = mock<ActiveWorkflowManager>();
const push = mock<Push>();
const workflowRepository = mock<WorkflowRepository>();
const testWebhooks = mock<TestWebhooks>();
afterEach(() => {
eventService.removeAllListeners();
@@ -29,7 +43,7 @@ describe('PubSubHandler', () => {
it('should set up handlers in webhook process', () => {
// @ts-expect-error Spying on private method
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
const setupHandlers = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
new PubSubHandler(
eventService,
@@ -39,10 +53,14 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
expect(setupHandlersSpy).toHaveBeenCalledWith({
expect(setupHandlers).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
@@ -61,7 +79,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('reload-license');
@@ -78,7 +100,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('restart-event-bus');
@@ -95,7 +121,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('reload-external-secrets-providers');
@@ -112,7 +142,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-install', {
@@ -135,7 +169,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-update', {
@@ -158,7 +196,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-uninstall', {
@@ -184,7 +226,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
expect(setupHandlersSpy).toHaveBeenCalledWith({
@@ -207,7 +253,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('reload-license');
@@ -224,7 +274,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('restart-event-bus');
@@ -241,7 +295,11 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('reload-external-secrets-providers');
@@ -249,6 +307,83 @@ describe('PubSubHandler', () => {
expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});
it('should install community package on `community-package-install` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-install', {
packageName: 'test-package',
packageVersion: '1.0.0',
});
expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});
it('should update community package on `community-package-update` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-update', {
packageName: 'test-package',
packageVersion: '1.0.0',
});
expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});
it('should uninstall community package on `community-package-uninstall` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-uninstall', {
packageName: 'test-package',
});
expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
it('should generate status on `get-worker-status` event', () => {
new PubSubHandler(
eventService,
@@ -258,12 +393,486 @@ describe('PubSubHandler', () => {
externalSecretsManager,
communityPackagesService,
publisher,
workerStatus,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('get-worker-status');
expect(workerStatus.generateStatus).toHaveBeenCalled();
expect(workerStatusService.generateStatus).toHaveBeenCalled();
});
});
describe('in main process', () => {
const instanceSettings = mock<InstanceSettings>({
instanceType: 'main',
isLeader: true,
isFollower: false,
});
afterEach(() => {
jest.clearAllMocks();
});
it('should set up command and worker response handlers in main process', () => {
// @ts-expect-error Spying on private method
const setupHandlersSpy = jest.spyOn(PubSubHandler.prototype, 'setupHandlers');
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
expect(setupHandlersSpy).toHaveBeenCalledWith({
'reload-license': expect.any(Function),
'restart-event-bus': expect.any(Function),
'reload-external-secrets-providers': expect.any(Function),
'community-package-install': expect.any(Function),
'community-package-update': expect.any(Function),
'community-package-uninstall': expect.any(Function),
'add-webhooks-triggers-and-pollers': expect.any(Function),
'remove-triggers-and-pollers': expect.any(Function),
'display-workflow-activation': expect.any(Function),
'display-workflow-deactivation': expect.any(Function),
'display-workflow-activation-error': expect.any(Function),
'relay-execution-lifecycle-event': expect.any(Function),
'clear-test-webhooks': expect.any(Function),
'response-to-get-worker-status': expect.any(Function),
});
});
it('should reload license on `reload-license` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('reload-license');
expect(license.reload).toHaveBeenCalled();
});
it('should restart event bus on `restart-event-bus` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('restart-event-bus');
expect(eventbus.restart).toHaveBeenCalled();
});
it('should reload providers on `reload-external-secrets-providers` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('reload-external-secrets-providers');
expect(externalSecretsManager.reloadAllProviders).toHaveBeenCalled();
});
it('should install community package on `community-package-install` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-install', {
packageName: 'test-package',
packageVersion: '1.0.0',
});
expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});
it('should update community package on `community-package-update` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-update', {
packageName: 'test-package',
packageVersion: '1.0.0',
});
expect(communityPackagesService.installOrUpdateNpmPackage).toHaveBeenCalledWith(
'test-package',
'1.0.0',
);
});
it('should uninstall community package on `community-package-uninstall` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
eventService.emit('community-package-uninstall', {
packageName: 'test-package',
});
expect(communityPackagesService.removeNpmPackage).toHaveBeenCalledWith('test-package');
});
describe('multi-main setup', () => {
it('if leader, should handle `add-webhooks-triggers-and-pollers` event', async () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
eventService.emit('add-webhooks-triggers-and-pollers', { workflowId });
await flushPromises();
expect(activeWorkflowManager.add).toHaveBeenCalledWith(workflowId, 'activate', undefined, {
shouldPublish: false,
});
expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId });
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'display-workflow-activation',
payload: { workflowId },
});
});
it('if follower, should skip `add-webhooks-triggers-and-pollers` event', async () => {
new PubSubHandler(
eventService,
mock<InstanceSettings>({ instanceType: 'main', isLeader: false, isFollower: true }),
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
eventService.emit('add-webhooks-triggers-and-pollers', { workflowId });
await flushPromises();
expect(activeWorkflowManager.add).not.toHaveBeenCalled();
expect(push.broadcast).not.toHaveBeenCalled();
expect(publisher.publishCommand).not.toHaveBeenCalled();
});
it('if leader, should handle `remove-triggers-and-pollers` event', async () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
eventService.emit('remove-triggers-and-pollers', { workflowId });
await flushPromises();
expect(activeWorkflowManager.removeActivationError).toHaveBeenCalledWith(workflowId);
expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).toHaveBeenCalledWith(
workflowId,
);
expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId });
expect(publisher.publishCommand).toHaveBeenCalledWith({
command: 'display-workflow-deactivation',
payload: { workflowId },
});
});
it('if follower, should skip `remove-triggers-and-pollers` event', async () => {
new PubSubHandler(
eventService,
mock<InstanceSettings>({ instanceType: 'main', isLeader: false, isFollower: true }),
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
eventService.emit('remove-triggers-and-pollers', { workflowId });
await flushPromises();
expect(activeWorkflowManager.removeActivationError).not.toHaveBeenCalled();
expect(activeWorkflowManager.removeWorkflowTriggersAndPollers).not.toHaveBeenCalled();
expect(push.broadcast).not.toHaveBeenCalled();
expect(publisher.publishCommand).not.toHaveBeenCalled();
});
it('should handle `display-workflow-activation` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
eventService.emit('display-workflow-activation', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith('workflowActivated', { workflowId });
});
it('should handle `display-workflow-deactivation` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
eventService.emit('display-workflow-deactivation', { workflowId });
expect(push.broadcast).toHaveBeenCalledWith('workflowDeactivated', { workflowId });
});
it('should handle `display-workflow-activation-error` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workflowId = 'test-workflow-id';
const errorMessage = 'Test error message';
eventService.emit('display-workflow-activation-error', { workflowId, errorMessage });
expect(push.broadcast).toHaveBeenCalledWith('workflowFailedToActivate', {
workflowId,
errorMessage,
});
});
it('should handle `relay-execution-lifecycle-event` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const pushRef = 'test-push-ref';
const type = 'executionStarted';
const args = { testArg: 'value' };
push.getBackend.mockReturnValue(
mock<WebSocketPush>({ hasPushRef: jest.fn().mockReturnValue(true) }),
);
eventService.emit('relay-execution-lifecycle-event', { type, args, pushRef });
expect(push.send).toHaveBeenCalledWith(type, args, pushRef);
});
it('should handle `clear-test-webhooks` event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const webhookKey = 'test-webhook-key';
const workflowEntity = mock<IWorkflowDb>({ id: 'test-workflow-id' });
const pushRef = 'test-push-ref';
push.getBackend.mockReturnValue(
mock<WebSocketPush>({ hasPushRef: jest.fn().mockReturnValue(true) }),
);
testWebhooks.toWorkflow.mockReturnValue(mock<Workflow>({ id: 'test-workflow-id' }));
eventService.emit('clear-test-webhooks', { webhookKey, workflowEntity, pushRef });
expect(testWebhooks.clearTimeout).toHaveBeenCalledWith(webhookKey);
expect(testWebhooks.deactivateWebhooks).toHaveBeenCalled();
});
it('should handle `response-to-get-worker-status event', () => {
new PubSubHandler(
eventService,
instanceSettings,
license,
eventbus,
externalSecretsManager,
communityPackagesService,
publisher,
workerStatusService,
activeWorkflowManager,
push,
workflowRepository,
testWebhooks,
).init();
const workerStatus = mock<WorkerStatus>({ senderId: 'worker-1', loadAvg: [123] });
eventService.emit('response-to-get-worker-status', workerStatus);
expect(push.broadcast).toHaveBeenCalledWith('sendWorkerStatusMessage', {
workerId: workerStatus.senderId,
status: workerStatus,
});
});
});
});
});

View File

@@ -47,17 +47,4 @@ describe('Subscriber', () => {
expect(client.subscribe).toHaveBeenCalledWith('n8n.commands', expect.any(Function));
});
});
describe('setMessageHandler', () => {
it('should set message handler function for channel', () => {
const subscriber = new Subscriber(mock(), redisClientService, mock());
const channel = 'n8n.commands';
const handlerFn = jest.fn();
subscriber.setMessageHandler(channel, handlerFn);
// @ts-expect-error Private field
expect(subscriber.handlers).toEqual(new Map([[channel, handlerFn]]));
});
});
});

View File

@@ -59,7 +59,7 @@ export class Publisher {
async publishWorkerResponse(msg: PubSub.WorkerResponse) {
await this.client.publish('n8n.worker-response', JSON.stringify(msg));
this.logger.debug(`Published response for ${msg.command} to worker response channel`);
this.logger.debug(`Published response ${msg.response} to worker response channel`);
}
// #endregion

View File

@@ -1,17 +1,23 @@
import { InstanceSettings } from 'n8n-core';
import { ensureError } from 'n8n-workflow';
import { Service } from 'typedi';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import config from '@/config';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import type { PubSubEventMap } from '@/events/maps/pub-sub.event-map';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
import { License } from '@/license';
import { Push } from '@/push';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { CommunityPackagesService } from '@/services/community-packages.service';
import { assertNever } from '@/utils';
import { TestWebhooks } from '@/webhooks/test-webhooks';
import { WorkerStatus } from '../worker-status';
import type { PubSub } from './pubsub.types';
import { WorkerStatusService } from '../worker-status.service';
/**
* Responsible for handling events emitted from messages received via a pubsub channel.
@@ -26,7 +32,11 @@ export class PubSubHandler {
private readonly externalSecretsManager: ExternalSecretsManager,
private readonly communityPackagesService: CommunityPackagesService,
private readonly publisher: Publisher,
private readonly workerStatus: WorkerStatus,
private readonly workerStatusService: WorkerStatusService,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly push: Push,
private readonly workflowRepository: WorkflowRepository,
private readonly testWebhooks: TestWebhooks,
) {}
init() {
@@ -39,14 +49,23 @@ export class PubSubHandler {
...this.commonHandlers,
'get-worker-status': async () =>
await this.publisher.publishWorkerResponse({
workerId: config.getEnv('redis.queueModeId'),
command: 'get-worker-status',
payload: this.workerStatus.generateStatus(),
senderId: config.getEnv('redis.queueModeId'),
response: 'response-to-get-worker-status',
payload: this.workerStatusService.generateStatus(),
}),
});
break;
case 'main':
// TODO
this.setupHandlers({
...this.commonHandlers,
...this.multiMainHandlers,
'response-to-get-worker-status': async (payload) =>
this.push.broadcast('sendWorkerStatusMessage', {
workerId: payload.senderId,
status: payload,
}),
});
break;
default:
assertNever(this.instanceSettings.instanceType);
@@ -67,17 +86,8 @@ export class PubSubHandler {
}
}
/** Handlers shared by webhook and worker processes. */
private commonHandlers: {
[K in keyof Pick<
PubSubEventMap,
| 'reload-license'
| 'restart-event-bus'
| 'reload-external-secrets-providers'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
>]: (event: PubSubEventMap[K]) => Promise<void>;
[EventName in keyof PubSub.CommonEvents]: (event: PubSubEventMap[EventName]) => Promise<void>;
} = {
'reload-license': async () => await this.license.reload(),
'restart-event-bus': async () => await this.eventbus.restart(),
@@ -90,4 +100,73 @@ export class PubSubHandler {
'community-package-uninstall': async ({ packageName }) =>
await this.communityPackagesService.removeNpmPackage(packageName),
};
private multiMainHandlers: {
[EventName in keyof PubSub.MultiMainEvents]: (
event: PubSubEventMap[EventName],
) => Promise<void>;
} = {
'add-webhooks-triggers-and-pollers': async ({ workflowId }) => {
if (this.instanceSettings.isFollower) return;
try {
await this.activeWorkflowManager.add(workflowId, 'activate', undefined, {
shouldPublish: false, // prevent leader from re-publishing message
});
this.push.broadcast('workflowActivated', { workflowId });
await this.publisher.publishCommand({
command: 'display-workflow-activation',
payload: { workflowId },
}); // instruct followers to show activation in UI
} catch (e) {
const error = ensureError(e);
const { message } = error;
await this.workflowRepository.update(workflowId, { active: false });
this.push.broadcast('workflowFailedToActivate', { workflowId, errorMessage: message });
await this.publisher.publishCommand({
command: 'display-workflow-activation-error',
payload: { workflowId, errorMessage: message },
}); // instruct followers to show activation error in UI
}
},
'remove-triggers-and-pollers': async ({ workflowId }) => {
if (this.instanceSettings.isFollower) return;
await this.activeWorkflowManager.removeActivationError(workflowId);
await this.activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId);
this.push.broadcast('workflowDeactivated', { workflowId });
// instruct followers to show workflow deactivation in UI
await this.publisher.publishCommand({
command: 'display-workflow-deactivation',
payload: { workflowId },
});
},
'display-workflow-activation': async ({ workflowId }) =>
this.push.broadcast('workflowActivated', { workflowId }),
'display-workflow-deactivation': async ({ workflowId }) =>
this.push.broadcast('workflowDeactivated', { workflowId }),
'display-workflow-activation-error': async ({ workflowId, errorMessage }) =>
this.push.broadcast('workflowFailedToActivate', { workflowId, errorMessage }),
'relay-execution-lifecycle-event': async ({ type, args, pushRef }) => {
if (!this.push.getBackend().hasPushRef(pushRef)) return;
this.push.send(type, args, pushRef);
},
'clear-test-webhooks': async ({ webhookKey, workflowEntity, pushRef }) => {
if (!this.push.getBackend().hasPushRef(pushRef)) return;
this.testWebhooks.clearTimeout(webhookKey);
const workflow = this.testWebhooks.toWorkflow(workflowEntity);
await this.testWebhooks.deactivateWebhooks(workflow);
},
};
}

View File

@@ -1,4 +1,8 @@
import type { PubSubCommandMap, PubSubWorkerResponseMap } from '@/events/maps/pub-sub.event-map';
import type {
PubSubCommandMap,
PubSubEventMap,
PubSubWorkerResponseMap,
} from '@/events/maps/pub-sub.event-map';
import type { Resolve } from '@/utlity.types';
import type { COMMAND_PUBSUB_CHANNEL, WORKER_RESPONSE_PUBSUB_CHANNEL } from '../constants';
@@ -75,9 +79,17 @@ export namespace PubSub {
// ----------------------------------
type _ToWorkerResponse<WorkerResponseKey extends keyof PubSubWorkerResponseMap> = {
workerId: string;
/** ID of worker sending the response. */
senderId: string;
/** IDs of processes to send the response to. */
targets?: string[];
command: WorkerResponseKey;
/** Content of worker response. */
response: WorkerResponseKey;
/** Whether the command should be debounced when received. */
debounce?: boolean;
} & (PubSubWorkerResponseMap[WorkerResponseKey] extends never
? { payload?: never } // some responses carry no payload
: { payload: PubSubWorkerResponseMap[WorkerResponseKey] });
@@ -87,5 +99,31 @@ export namespace PubSub {
>;
/** Response sent via the `n8n.worker-response` pubsub channel. */
export type WorkerResponse = ToWorkerResponse<'get-worker-status'>;
export type WorkerResponse = ToWorkerResponse<'response-to-get-worker-status'>;
/**
* Of all events emitted from pubsub messages, those whose handlers
* are all present in main, worker, and webhook processes.
*/
export type CommonEvents = Pick<
PubSubEventMap,
| 'reload-license'
| 'restart-event-bus'
| 'reload-external-secrets-providers'
| 'community-package-install'
| 'community-package-update'
| 'community-package-uninstall'
>;
/** Multi-main events emitted from pubsub messages. */
export type MultiMainEvents = Pick<
PubSubEventMap,
| 'add-webhooks-triggers-and-pollers'
| 'remove-triggers-and-pollers'
| 'display-workflow-activation'
| 'display-workflow-deactivation'
| 'display-workflow-activation-error'
| 'relay-execution-lifecycle-event'
| 'clear-test-webhooks'
>;
}

View File

@@ -17,8 +17,6 @@ import type { PubSub } from './pubsub.types';
export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient;
private readonly handlers: Map<PubSub.Channel, PubSub.HandlerFn> = new Map();
// #region Lifecycle
constructor(
@@ -31,8 +29,18 @@ export class Subscriber {
this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });
this.client.on('message', (channel: PubSub.Channel, message) => {
this.handlers.get(channel)?.(message);
const handlerFn = (msg: PubSub.Command | PubSub.WorkerResponse) => {
const eventName = 'command' in msg ? msg.command : msg.response;
this.eventService.emit(eventName, msg.payload);
};
const debouncedHandlerFn = debounce(handlerFn, 300);
this.client.on('message', (_channel: PubSub.Channel, str) => {
const msg = this.parseMessage(str);
if (!msg) return;
if (msg.debounce) debouncedHandlerFn(msg);
else handlerFn(msg);
});
}
@@ -60,49 +68,31 @@ export class Subscriber {
});
}
/** Set the message handler function for a channel. */
setMessageHandler(channel: PubSub.Channel, handlerFn: PubSub.HandlerFn) {
this.handlers.set(channel, handlerFn);
}
// #endregion
// #region Commands
setCommandMessageHandler() {
const handlerFn = (msg: PubSub.Command) => this.eventService.emit(msg.command, msg.payload);
const debouncedHandlerFn = debounce(handlerFn, 300);
this.setMessageHandler('n8n.commands', (str: string) => {
const msg = this.parseCommandMessage(str);
if (!msg) return;
if (msg.debounce) debouncedHandlerFn(msg);
else handlerFn(msg);
private parseMessage(str: string) {
const msg = jsonParse<PubSub.Command | PubSub.WorkerResponse | null>(str, {
fallbackValue: null,
});
}
private parseCommandMessage(str: string) {
const msg = jsonParse<PubSub.Command | null>(str, { fallbackValue: null });
if (!msg) {
this.logger.debug('Received invalid string via command channel', { message: str });
this.logger.debug('Received invalid string via pubsub channel', { message: str });
return null;
}
this.logger.debug('Received message via command channel', msg);
const queueModeId = config.getEnv('redis.queueModeId');
if (
'command' in msg &&
!msg.selfSend &&
(msg.senderId === queueModeId || (msg.targets && !msg.targets.includes(queueModeId)))
) {
this.logger.debug('Disregarding message - not for this instance', msg);
return null;
}
this.logger.debug('Received message via pubsub channel', msg);
return msg;
}

View File

@@ -1,3 +1,4 @@
import type { WorkerStatus } from '@n8n/api-types';
import os from 'node:os';
import { Service } from 'typedi';
@@ -7,12 +8,12 @@ import { N8N_VERSION } from '@/constants';
import { JobProcessor } from './job-processor';
@Service()
export class WorkerStatus {
export class WorkerStatusService {
constructor(private readonly jobProcessor: JobProcessor) {}
generateStatus() {
generateStatus(): WorkerStatus {
return {
workerId: config.getEnv('redis.queueModeId'),
senderId: config.getEnv('redis.queueModeId'),
runningJobsSummary: this.jobProcessor.getRunningJobsSummary(),
freeMem: os.freemem(),
totalMem: os.totalmem(),