From afa683a06f0e06b0e02b552f46ddd0f6cf041dcd Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Fri, 6 Oct 2023 13:58:11 +0200 Subject: [PATCH] refactor(core): Have one orchestration service per instance type (#7303) webhook instances will not listen to either worker or event log messages on the Redis pub/sub channel --- packages/cli/src/AbstractServer.ts | 8 -- .../ExternalSecretsManager.ee.ts | 4 +- packages/cli/src/commands/start.ts | 11 +++ packages/cli/src/commands/webhook.ts | 9 +++ packages/cli/src/commands/worker.ts | 57 ++++++------- .../controllers/orchestration.controller.ts | 4 +- .../MessageEventBus/MessageEventBus.ts | 10 ++- .../services/orchestration.base.service.ts | 50 ++++++++++++ .../orchestration.handler.base.service.ts | 33 ++++++++ .../services/orchestration.handler.service.ts | 47 ----------- .../cli/src/services/orchestration.service.ts | 79 ------------------- .../cli/src/services/orchestration/helpers.ts | 16 ++++ .../handleCommandMessageMain.ts} | 27 +++++-- .../handleWorkerResponseMessageMain.ts} | 4 +- .../orchestration.handler.main.service.ts | 34 ++++++++ .../main/orchestration.main.service.ts | 38 +++++++++ .../webhook/handleCommandMessageWebhook.ts | 6 ++ .../orchestration.handler.webhook.service.ts | 22 ++++++ .../webhook/orchestration.webhook.service.ts | 9 +++ .../worker/handleCommandMessageWorker.ts} | 17 +++- .../orchestration.handler.worker.service.ts | 17 ++++ .../worker/orchestration.worker.service.ts | 15 ++++ .../integration/commands/worker.cmd.test.ts | 29 +++---- .../cli/test/integration/eventbus.ee.test.ts | 4 +- .../services/orchestration.service.test.ts | 45 ++++++++--- 25 files changed, 380 insertions(+), 215 deletions(-) create mode 100644 packages/cli/src/services/orchestration.base.service.ts create mode 100644 packages/cli/src/services/orchestration.handler.base.service.ts delete mode 100644 packages/cli/src/services/orchestration.handler.service.ts delete mode 100644 packages/cli/src/services/orchestration.service.ts rename packages/cli/src/services/orchestration/{handleCommandMessage.ts => main/handleCommandMessageMain.ts} (71%) rename packages/cli/src/services/orchestration/{handleWorkerResponseMessage.ts => main/handleWorkerResponseMessageMain.ts} (63%) create mode 100644 packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts create mode 100644 packages/cli/src/services/orchestration/main/orchestration.main.service.ts create mode 100644 packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts create mode 100644 packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts create mode 100644 packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts rename packages/cli/src/{worker/workerCommandHandler.ts => services/orchestration/worker/handleCommandMessageWorker.ts} (84%) create mode 100644 packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts create mode 100644 packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 0170b8a9f9..1002476c5b 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -19,8 +19,6 @@ import { TestWebhooks } from '@/TestWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks'; import { webhookRequestHandler } from '@/WebhookHelpers'; import { generateHostInstanceId } from './databases/utils/generators'; -import { OrchestrationService } from './services/orchestration.service'; -import { OrchestrationHandlerService } from './services/orchestration.handler.service'; export abstract class AbstractServer { protected server: Server; @@ -115,12 +113,6 @@ export abstract class AbstractServer { else res.send('n8n is starting up. Please wait'); } else sendErrorResponse(res, new ServiceUnavailableError('Database is not ready!')); }); - - if (config.getEnv('executions.mode') === 'queue') { - // will start the redis connections - await Container.get(OrchestrationService).init(); - await Container.get(OrchestrationHandlerService).init(); - } } async init(): Promise { diff --git a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts index e8b0e00589..d9cdab44be 100644 --- a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts +++ b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts @@ -20,7 +20,7 @@ import { import { License } from '@/License'; import { InternalHooks } from '@/InternalHooks'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; const logger = getLogger(); @@ -83,7 +83,7 @@ export class ExternalSecretsManager { } async broadcastReloadExternalSecretsProviders() { - await Container.get(OrchestrationService).broadcastReloadExternalSecretsProviders(); + await Container.get(OrchestrationMainService).broadcastReloadExternalSecretsProviders(); } private async getEncryptionKey(): Promise { diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index df3f42faa9..98326e81db 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -31,6 +31,8 @@ import { InternalHooks } from '@/InternalHooks'; import { License } from '@/License'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { IConfig } from '@oclif/config'; +import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; +import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -214,6 +216,8 @@ export class Start extends BaseCommand { await this.initLicense(); this.logger.debug('License init complete'); + await this.initOrchestration(); + this.logger.debug('Orchestration init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); await this.initExternalHooks(); @@ -228,6 +232,13 @@ export class Start extends BaseCommand { } } + async initOrchestration() { + if (config.get('executions.mode') === 'queue') { + await Container.get(OrchestrationMainService).init(); + await Container.get(OrchestrationHandlerMainService).init(); + } + } + async run() { // eslint-disable-next-line @typescript-eslint/no-shadow const { flags } = this.parse(Start); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index bc33a80531..c99de60e9f 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -7,6 +7,8 @@ import { Queue } from '@/Queue'; import { BaseCommand } from './BaseCommand'; import { Container } from 'typedi'; import { IConfig } from '@oclif/config'; +import { OrchestrationWebhookService } from '@/services/orchestration/webhook/orchestration.webhook.service'; +import { OrchestrationHandlerWebhookService } from '@/services/orchestration/webhook/orchestration.handler.webhook.service'; export class Webhook extends BaseCommand { static description = 'Starts n8n webhook process. Intercepts only production URLs.'; @@ -94,6 +96,8 @@ export class Webhook extends BaseCommand { await this.initLicense(); this.logger.debug('License init complete'); + await this.initOrchestration(); + this.logger.debug('Orchestration init complete'); await this.initBinaryDataService(); this.logger.debug('Binary data service init complete'); await this.initExternalHooks(); @@ -115,4 +119,9 @@ export class Webhook extends BaseCommand { async catch(error: Error) { await this.exitWithCrash('Exiting due to an error.', error); } + + async initOrchestration() { + await Container.get(OrchestrationWebhookService).init(); + await Container.get(OrchestrationHandlerWebhookService).init(); + } } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index b784adc1ef..0ea21c18b3 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -32,12 +32,12 @@ import { OwnershipService } from '@/services/ownership.service'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; -import { eventBus } from '../eventbus'; -import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; -import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; -import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric'; -import { getWorkerCommandReceivedHandler } from '../worker/workerCommandHandler'; +import { eventBus } from '@/eventbus'; +import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; +import { EventMessageGeneric } from '@/eventbus/EventMessageClasses/EventMessageGeneric'; import { IConfig } from '@oclif/config'; +import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; +import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -58,8 +58,6 @@ export class Worker extends BaseCommand { static jobQueue: JobQueue; - redisPublisher: RedisServicePubSubPublisher; - redisSubscriber: RedisServicePubSubSubscriber; /** @@ -272,10 +270,20 @@ export class Worker extends BaseCommand { this.logger.debug('External secrets init complete'); await this.initEventBus(); this.logger.debug('Event bus init complete'); - await this.initRedis(); - this.logger.debug('Redis init complete'); await this.initQueue(); this.logger.debug('Queue init complete'); + await this.initOrchestration(); + this.logger.debug('Orchestration init complete'); + await this.initQueue(); + + await Container.get(OrchestrationWorkerService).publishToEventLog( + new EventMessageGeneric({ + eventName: 'n8n.worker.started', + payload: { + workerId: this.queueModeId, + }, + }), + ); } async initEventBus() { @@ -290,29 +298,14 @@ export class Worker extends BaseCommand { * A subscription connection to redis is created to subscribe to commands from the main process * The subscription connection adds a handler to handle the command messages */ - async initRedis() { - this.redisPublisher = Container.get(RedisServicePubSubPublisher); - this.redisSubscriber = Container.get(RedisServicePubSubSubscriber); - await this.redisPublisher.init(); - await this.redisPublisher.publishToEventLog( - new EventMessageGeneric({ - eventName: 'n8n.worker.started', - payload: { - workerId: this.queueModeId, - }, - }), - ); - await this.redisSubscriber.subscribeToCommandChannel(); - this.redisSubscriber.addMessageHandler( - 'WorkerCommandReceivedHandler', - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - getWorkerCommandReceivedHandler({ - queueModeId: this.queueModeId, - instanceId: this.instanceId, - redisPublisher: this.redisPublisher, - getRunningJobIds: () => Object.keys(Worker.runningJobs), - }), - ); + async initOrchestration() { + await Container.get(OrchestrationWorkerService).init(); + await Container.get(OrchestrationHandlerWorkerService).initWithOptions({ + queueModeId: this.queueModeId, + instanceId: this.instanceId, + redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher, + getRunningJobIds: () => Object.keys(Worker.runningJobs), + }); } async initQueue() { diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index 5386b75698..4d997107cc 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,13 +1,13 @@ import { Authorized, Get, RestController } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; import { Service } from 'typedi'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; @Authorized(['global', 'owner']) @RestController('/orchestration') @Service() export class OrchestrationController { - constructor(private readonly orchestrationService: OrchestrationService) {} + constructor(private readonly orchestrationService: OrchestrationMainService) {} /** * These endpoint currently do not return anything, they just trigger the messsage to diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 05c0f7fee3..7f230602bd 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -31,7 +31,7 @@ import Container, { Service } from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { OrchestrationService } from '../../services/orchestration.service'; +import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -190,7 +190,9 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get( + OrchestrationMainService, + ).broadcastRestartEventbusAfterDestinationUpdate(); } return destination; } @@ -216,7 +218,9 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get( + OrchestrationMainService, + ).broadcastRestartEventbusAfterDestinationUpdate(); } return result; } diff --git a/packages/cli/src/services/orchestration.base.service.ts b/packages/cli/src/services/orchestration.base.service.ts new file mode 100644 index 0000000000..8440407283 --- /dev/null +++ b/packages/cli/src/services/orchestration.base.service.ts @@ -0,0 +1,50 @@ +import Container from 'typedi'; +import { RedisService } from './redis.service'; +import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; +import config from '@/config'; + +export abstract class OrchestrationService { + protected initialized = false; + + redisPublisher: RedisServicePubSubPublisher; + + readonly redisService: RedisService; + + get isQueueMode(): boolean { + return config.get('executions.mode') === 'queue'; + } + + get isMainInstance(): boolean { + return config.get('generic.instanceType') === 'main'; + } + + get isWebhookInstance(): boolean { + return config.get('generic.instanceType') === 'webhook'; + } + + get isWorkerInstance(): boolean { + return config.get('generic.instanceType') === 'worker'; + } + + constructor() { + this.redisService = Container.get(RedisService); + } + + sanityCheck(): boolean { + return this.initialized && this.isQueueMode; + } + + async init() { + await this.initPublisher(); + this.initialized = true; + } + + async shutdown() { + await this.redisPublisher?.destroy(); + this.initialized = false; + } + + private async initPublisher() { + this.redisPublisher = await this.redisService.getPubSubPublisher(); + } +} diff --git a/packages/cli/src/services/orchestration.handler.base.service.ts b/packages/cli/src/services/orchestration.handler.base.service.ts new file mode 100644 index 0000000000..933ac0dd40 --- /dev/null +++ b/packages/cli/src/services/orchestration.handler.base.service.ts @@ -0,0 +1,33 @@ +import Container from 'typedi'; +import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/handleCommandMessageWorker'; +import { RedisService } from './redis.service'; +import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; + +export abstract class OrchestrationHandlerService { + protected initialized = false; + + redisSubscriber: RedisServicePubSubSubscriber; + + readonly redisService: RedisService; + + constructor() { + this.redisService = Container.get(RedisService); + } + + async init() { + await this.initSubscriber(); + this.initialized = true; + } + + async initWithOptions(options: WorkerCommandReceivedHandlerOptions) { + await this.initSubscriber(options); + this.initialized = true; + } + + async shutdown() { + await this.redisSubscriber?.destroy(); + this.initialized = false; + } + + protected abstract initSubscriber(options?: WorkerCommandReceivedHandlerOptions): Promise; +} diff --git a/packages/cli/src/services/orchestration.handler.service.ts b/packages/cli/src/services/orchestration.handler.service.ts deleted file mode 100644 index aa64926068..0000000000 --- a/packages/cli/src/services/orchestration.handler.service.ts +++ /dev/null @@ -1,47 +0,0 @@ -import Container, { Service } from 'typedi'; -import { RedisService } from './redis.service'; -import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './redis/RedisServiceHelper'; -import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; -import { handleCommandMessage } from './orchestration/handleCommandMessage'; -import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; - -@Service() -export class OrchestrationHandlerService { - redisSubscriber: RedisServicePubSubSubscriber; - - constructor(readonly redisService: RedisService) {} - - async init() { - await this.initSubscriber(); - } - - async shutdown() { - await this.redisSubscriber?.destroy(); - } - - private async initSubscriber() { - this.redisSubscriber = await this.redisService.getPubSubSubscriber(); - - await this.redisSubscriber.subscribeToWorkerResponseChannel(); - await this.redisSubscriber.subscribeToCommandChannel(); - await this.redisSubscriber.subscribeToEventLog(); - - this.redisSubscriber.addMessageHandler( - 'OrchestrationMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessage(messageString); - } else if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessage(messageString); - } else if (channel === EVENT_BUS_REDIS_CHANNEL) { - await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString); - } - }, - ); - } -} diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts deleted file mode 100644 index c81874ad9d..0000000000 --- a/packages/cli/src/services/orchestration.service.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { Service } from 'typedi'; -import { RedisService } from './redis.service'; -import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; -import config from '@/config'; - -@Service() -export class OrchestrationService { - private initialized = false; - - redisPublisher: RedisServicePubSubPublisher; - - get isQueueMode() { - return config.getEnv('executions.mode') === 'queue'; - } - - constructor(readonly redisService: RedisService) {} - - async init() { - await this.initPublisher(); - this.initialized = true; - } - - async shutdown() { - await this.redisPublisher?.destroy(); - } - - private async initPublisher() { - this.redisPublisher = await this.redisService.getPubSubPublisher(); - } - - async getWorkerStatus(id?: string) { - if (!this.isQueueMode) { - return; - } - if (!this.initialized) { - throw new Error('OrchestrationService not initialized'); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'getStatus', - targets: id ? [id] : undefined, - }); - } - - async getWorkerIds() { - if (!this.isQueueMode) { - return; - } - if (!this.initialized) { - throw new Error('OrchestrationService not initialized'); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'getId', - }); - } - - async broadcastRestartEventbusAfterDestinationUpdate() { - if (!this.isQueueMode) { - return; - } - if (!this.initialized) { - throw new Error('OrchestrationService not initialized'); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'restartEventBus', - }); - } - - async broadcastReloadExternalSecretsProviders() { - if (!this.isQueueMode) { - return; - } - if (!this.initialized) { - throw new Error('OrchestrationService not initialized'); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'reloadExternalSecretsProviders', - }); - } -} diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts index 6996391f40..4928a6feb1 100644 --- a/packages/cli/src/services/orchestration/helpers.ts +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -2,6 +2,10 @@ import { LoggerProxy, jsonParse } from 'n8n-workflow'; import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; +export interface RedisServiceCommandLastReceived { + [date: string]: Date; +} + export function messageToRedisServiceCommandObject(messageString: string) { if (!messageString) return; let message: RedisServiceCommandObject; @@ -15,3 +19,15 @@ export function messageToRedisServiceCommandObject(messageString: string) { } return message; } + +const lastReceived: RedisServiceCommandLastReceived = {}; + +export function debounceMessageReceiver(message: RedisServiceCommandObject, timeout: number = 100) { + const now = new Date(); + const lastReceivedDate = lastReceived[message.command]; + if (lastReceivedDate && now.getTime() - lastReceivedDate.getTime() < timeout) { + return false; + } + lastReceived[message.command] = now; + return true; +} diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts similarity index 71% rename from packages/cli/src/services/orchestration/handleCommandMessage.ts rename to packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 6939555cdc..f4fa25e54c 100644 --- a/packages/cli/src/services/orchestration/handleCommandMessage.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -1,17 +1,14 @@ import { LoggerProxy } from 'n8n-workflow'; -import { messageToRedisServiceCommandObject } from './helpers'; +import { debounceMessageReceiver, messageToRedisServiceCommandObject } from '../helpers'; import config from '@/config'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import Container from 'typedi'; import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; -import type { N8nInstanceType } from '@/Interfaces'; import { License } from '@/License'; -// this function handles commands sent to the MAIN instance. the workers handle their own commands -export async function handleCommandMessage(messageString: string) { +export async function handleCommandMessageMain(messageString: string) { const queueModeId = config.get('redis.queueModeId'); - const instanceType = config.get('generic.instanceType') as N8nInstanceType; - const isMainInstance = instanceType === 'main'; + const isMainInstance = config.get('generic.instanceType') === 'main'; const message = messageToRedisServiceCommandObject(messageString); if (message) { @@ -30,6 +27,12 @@ export async function handleCommandMessage(messageString: string) { } switch (message.command) { case 'reloadLicense': + if (!debounceMessageReceiver(message, 500)) { + message.payload = { + result: 'debounced', + }; + return message; + } if (isMainInstance) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently LoggerProxy.error( @@ -40,8 +43,20 @@ export async function handleCommandMessage(messageString: string) { await Container.get(License).reload(); break; case 'restartEventBus': + if (!debounceMessageReceiver(message, 200)) { + message.payload = { + result: 'debounced', + }; + return message; + } await Container.get(MessageEventBus).restart(); case 'reloadExternalSecretsProviders': + if (!debounceMessageReceiver(message, 200)) { + message.payload = { + result: 'debounced', + }; + return message; + } await Container.get(ExternalSecretsManager).reloadAllProviders(); default: break; diff --git a/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts similarity index 63% rename from packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts rename to packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts index ee22318638..2d5251dfb5 100644 --- a/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts +++ b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts @@ -1,7 +1,7 @@ import { jsonParse, LoggerProxy } from 'n8n-workflow'; -import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands'; +import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands'; -export async function handleWorkerResponseMessage(messageString: string) { +export async function handleWorkerResponseMessageMain(messageString: string) { const workerResponse = jsonParse(messageString); if (workerResponse) { // TODO: Handle worker response diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts new file mode 100644 index 0000000000..4a57d140d7 --- /dev/null +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -0,0 +1,34 @@ +import Container, { Service } from 'typedi'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from '../../redis/RedisServiceHelper'; +import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain'; +import { handleCommandMessageMain } from './handleCommandMessageMain'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; + +@Service() +export class OrchestrationHandlerMainService extends OrchestrationHandlerService { + async initSubscriber() { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + await this.redisSubscriber.subscribeToCommandChannel(); + await this.redisSubscriber.subscribeToWorkerResponseChannel(); + await this.redisSubscriber.subscribeToEventLog(); + + this.redisSubscriber.addMessageHandler( + 'OrchestrationMessageReceiver', + async (channel: string, messageString: string) => { + if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await handleWorkerResponseMessageMain(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await handleCommandMessageMain(messageString); + } else if (channel === EVENT_BUS_REDIS_CHANNEL) { + await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString); + } + }, + ); + } +} diff --git a/packages/cli/src/services/orchestration/main/orchestration.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.main.service.ts new file mode 100644 index 0000000000..c8105dbe17 --- /dev/null +++ b/packages/cli/src/services/orchestration/main/orchestration.main.service.ts @@ -0,0 +1,38 @@ +import { Service } from 'typedi'; +import { OrchestrationService } from '../../orchestration.base.service'; + +@Service() +export class OrchestrationMainService extends OrchestrationService { + sanityCheck(): boolean { + return this.initialized && this.isQueueMode && this.isMainInstance; + } + + async getWorkerStatus(id?: string) { + if (!this.sanityCheck()) return; + await this.redisPublisher.publishToCommandChannel({ + command: 'getStatus', + targets: id ? [id] : undefined, + }); + } + + async getWorkerIds() { + if (!this.sanityCheck()) return; + await this.redisPublisher.publishToCommandChannel({ + command: 'getId', + }); + } + + async broadcastRestartEventbusAfterDestinationUpdate() { + if (!this.sanityCheck()) return; + await this.redisPublisher.publishToCommandChannel({ + command: 'restartEventBus', + }); + } + + async broadcastReloadExternalSecretsProviders() { + if (!this.sanityCheck()) return; + await this.redisPublisher.publishToCommandChannel({ + command: 'reloadExternalSecretsProviders', + }); + } +} diff --git a/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts b/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts new file mode 100644 index 0000000000..f93474e043 --- /dev/null +++ b/packages/cli/src/services/orchestration/webhook/handleCommandMessageWebhook.ts @@ -0,0 +1,6 @@ +import { handleCommandMessageMain } from '../main/handleCommandMessageMain'; + +export async function handleCommandMessageWebhook(messageString: string) { + // currently webhooks handle commands the same way as the main instance + return handleCommandMessageMain(messageString); +} diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts new file mode 100644 index 0000000000..548086c040 --- /dev/null +++ b/packages/cli/src/services/orchestration/webhook/orchestration.handler.webhook.service.ts @@ -0,0 +1,22 @@ +import { Service } from 'typedi'; +import { COMMAND_REDIS_CHANNEL } from '../../redis/RedisServiceHelper'; +import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; +import { handleCommandMessageWebhook } from './handleCommandMessageWebhook'; + +@Service() +export class OrchestrationHandlerWebhookService extends OrchestrationHandlerService { + async initSubscriber() { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + await this.redisSubscriber.subscribeToCommandChannel(); + + this.redisSubscriber.addMessageHandler( + 'OrchestrationMessageReceiver', + async (channel: string, messageString: string) => { + if (channel === COMMAND_REDIS_CHANNEL) { + await handleCommandMessageWebhook(messageString); + } + }, + ); + } +} diff --git a/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts new file mode 100644 index 0000000000..dc9dc8172e --- /dev/null +++ b/packages/cli/src/services/orchestration/webhook/orchestration.webhook.service.ts @@ -0,0 +1,9 @@ +import { Service } from 'typedi'; +import { OrchestrationService } from '../../orchestration.base.service'; + +@Service() +export class OrchestrationWebhookService extends OrchestrationService { + sanityCheck(): boolean { + return this.initialized && this.isQueueMode && this.isWebhookInstance; + } +} diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts similarity index 84% rename from packages/cli/src/worker/workerCommandHandler.ts rename to packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts index 63866fda78..9fa68311f6 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts @@ -5,15 +5,18 @@ import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServiceP import * as os from 'os'; import Container from 'typedi'; import { License } from '@/License'; -import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; -import { ExternalSecretsManager } from '../ExternalSecrets/ExternalSecretsManager.ee'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; +import { debounceMessageReceiver } from '../helpers'; -export function getWorkerCommandReceivedHandler(options: { +export interface WorkerCommandReceivedHandlerOptions { queueModeId: string; instanceId: string; redisPublisher: RedisServicePubSubPublisher; getRunningJobIds: () => string[]; -}) { +} + +export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHandlerOptions) { return async (channel: string, messageString: string) => { if (channel === COMMAND_REDIS_CHANNEL) { if (!messageString) return; @@ -35,6 +38,7 @@ export function getWorkerCommandReceivedHandler(options: { } switch (message.command) { case 'getStatus': + if (!debounceMessageReceiver(message, 200)) return; await options.redisPublisher.publishToWorkerChannel({ workerId: options.queueModeId, command: message.command, @@ -57,12 +61,14 @@ export function getWorkerCommandReceivedHandler(options: { }); break; case 'getId': + if (!debounceMessageReceiver(message, 200)) return; await options.redisPublisher.publishToWorkerChannel({ workerId: options.queueModeId, command: message.command, }); break; case 'restartEventBus': + if (!debounceMessageReceiver(message, 100)) return; try { await Container.get(MessageEventBus).restart(); await options.redisPublisher.publishToWorkerChannel({ @@ -84,6 +90,7 @@ export function getWorkerCommandReceivedHandler(options: { } break; case 'reloadExternalSecretsProviders': + if (!debounceMessageReceiver(message, 200)) return; try { await Container.get(ExternalSecretsManager).reloadAllProviders(); await options.redisPublisher.publishToWorkerChannel({ @@ -105,9 +112,11 @@ export function getWorkerCommandReceivedHandler(options: { } break; case 'reloadLicense': + if (!debounceMessageReceiver(message, 500)) return; await Container.get(License).reload(); break; case 'stopWorker': + if (!debounceMessageReceiver(message, 500)) return; // TODO: implement proper shutdown // await this.stopProcess(); break; diff --git a/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts new file mode 100644 index 0000000000..111bdb97de --- /dev/null +++ b/packages/cli/src/services/orchestration/worker/orchestration.handler.worker.service.ts @@ -0,0 +1,17 @@ +import { Service } from 'typedi'; +import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; +import type { WorkerCommandReceivedHandlerOptions } from './handleCommandMessageWorker'; +import { getWorkerCommandReceivedHandler } from './handleCommandMessageWorker'; + +@Service() +export class OrchestrationHandlerWorkerService extends OrchestrationHandlerService { + async initSubscriber(options: WorkerCommandReceivedHandlerOptions) { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + await this.redisSubscriber.subscribeToCommandChannel(); + this.redisSubscriber.addMessageHandler( + 'WorkerCommandReceivedHandler', + getWorkerCommandReceivedHandler(options), + ); + } +} diff --git a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts new file mode 100644 index 0000000000..0a6fb9cff2 --- /dev/null +++ b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts @@ -0,0 +1,15 @@ +import { Service } from 'typedi'; +import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; +import { OrchestrationService } from '../../orchestration.base.service'; + +@Service() +export class OrchestrationWorkerService extends OrchestrationService { + sanityCheck(): boolean { + return this.initialized && this.isQueueMode && this.isWorkerInstance; + } + + async publishToEventLog(message: AbstractEventMessage) { + if (!this.sanityCheck()) return; + await this.redisPublisher.publishToEventLog(message); + } +} diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index e171864158..462ea1f7ff 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -17,6 +17,8 @@ import { NodeTypes } from '@/NodeTypes'; import { InternalHooks } from '@/InternalHooks'; import { PostHogClient } from '@/posthog'; import { RedisService } from '@/services/redis.service'; +import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; +import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; const oclifConfig: Config.IConfig = new Config.Config({ root: __dirname }); @@ -48,17 +50,14 @@ test('worker initializes all its components', async () => { jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {}); jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {}); jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {}); - jest.spyOn(worker, 'initRedis'); + jest.spyOn(worker, 'initOrchestration'); + jest + .spyOn(OrchestrationWorkerService.prototype, 'publishToEventLog') + .mockImplementation(async () => {}); + jest + .spyOn(OrchestrationHandlerWorkerService.prototype, 'initSubscriber') + .mockImplementation(async () => {}); jest.spyOn(RedisServicePubSubPublisher.prototype, 'init').mockImplementation(async () => {}); - jest - .spyOn(RedisServicePubSubPublisher.prototype, 'publishToEventLog') - .mockImplementation(async () => {}); - jest - .spyOn(RedisServicePubSubSubscriber.prototype, 'subscribeToCommandChannel') - .mockImplementation(async () => {}); - jest - .spyOn(RedisServicePubSubSubscriber.prototype, 'addMessageHandler') - .mockImplementation(async () => {}); jest.spyOn(worker, 'initQueue').mockImplementation(async () => {}); await worker.init(); @@ -71,13 +70,9 @@ test('worker initializes all its components', async () => { expect(worker.initExternalHooks).toHaveBeenCalled(); expect(worker.initExternalSecrets).toHaveBeenCalled(); expect(worker.initEventBus).toHaveBeenCalled(); - expect(worker.initRedis).toHaveBeenCalled(); - expect(worker.redisPublisher).toBeDefined(); - expect(worker.redisPublisher.init).toHaveBeenCalled(); - expect(worker.redisPublisher.publishToEventLog).toHaveBeenCalled(); - expect(worker.redisSubscriber).toBeDefined(); - expect(worker.redisSubscriber.subscribeToCommandChannel).toHaveBeenCalled(); - expect(worker.redisSubscriber.addMessageHandler).toHaveBeenCalled(); + expect(worker.initOrchestration).toHaveBeenCalled(); + expect(OrchestrationHandlerWorkerService.prototype.initSubscriber).toHaveBeenCalled(); + expect(OrchestrationWorkerService.prototype.publishToEventLog).toHaveBeenCalled(); expect(worker.initQueue).toHaveBeenCalled(); jest.restoreAllMocks(); diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index 264c2a5835..1000613848 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -91,9 +91,7 @@ beforeAll(async () => { config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.keepLogCount', 1); - await eventBus.initialize({ - uniqueInstanceId: 'test', - }); + await eventBus.initialize({}); }); afterAll(async () => { diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index a39c4bd789..ff4d410c4b 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -2,22 +2,25 @@ import Container from 'typedi'; import config from '@/config'; import { LoggerProxy } from 'n8n-workflow'; import { getLogger } from '@/Logger'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { OrchestrationMainService } from '@/services/orchestration/main/orchestration.main.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { eventBus } from '@/eventbus'; import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; -import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; -import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; -import { OrchestrationHandlerService } from '../../../src/services/orchestration.handler.service'; +import { handleWorkerResponseMessageMain } from '@/services/orchestration/main/handleWorkerResponseMessageMain'; +import { handleCommandMessageMain } from '@/services/orchestration/main/handleCommandMessageMain'; +import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service'; +import * as helpers from '@/services/orchestration/helpers'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; -const os = Container.get(OrchestrationService); -const handler = Container.get(OrchestrationHandlerService); +const os = Container.get(OrchestrationMainService); +const handler = Container.get(OrchestrationHandlerMainService); let queueModeId: string; function setDefaultConfig() { config.set('executions.mode', 'queue'); + config.set('generic.instanceType', 'main'); } const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { @@ -32,6 +35,7 @@ const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { describe('Orchestration Service', () => { beforeAll(async () => { mockInstance(RedisService); + mockInstance(ExternalSecretsManager); LoggerProxy.init(getLogger()); jest.mock('ioredis', () => { const Redis = require('ioredis-mock'); @@ -85,7 +89,7 @@ describe('Orchestration Service', () => { }); test('should handle worker responses', async () => { - const response = await handleWorkerResponseMessage( + const response = await handleWorkerResponseMessageMain( JSON.stringify(workerRestartEventbusResponse), ); expect(response.command).toEqual('restartEventBus'); @@ -93,7 +97,7 @@ describe('Orchestration Service', () => { test('should handle command messages from others', async () => { jest.spyOn(LoggerProxy, 'error'); - const responseFalseId = await handleCommandMessage( + const responseFalseId = await handleCommandMessageMain( JSON.stringify({ senderId: 'test', command: 'reloadLicense', @@ -108,7 +112,7 @@ describe('Orchestration Service', () => { test('should reject command messages from iteslf', async () => { jest.spyOn(eventBus, 'restart'); - const response = await handleCommandMessage( + const response = await handleCommandMessageMain( JSON.stringify({ ...workerRestartEventbusResponse, senderId: queueModeId }), ); expect(response).toBeDefined(); @@ -119,9 +123,30 @@ describe('Orchestration Service', () => { }); test('should send command messages', async () => { - jest.spyOn(os.redisPublisher, 'publishToCommandChannel'); + setDefaultConfig(); + jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockImplementation(async () => {}); await os.getWorkerIds(); expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); }); + + test('should prevent receiving commands too often', async () => { + setDefaultConfig(); + jest.spyOn(helpers, 'debounceMessageReceiver'); + const res1 = await handleCommandMessageMain( + JSON.stringify({ + senderId: 'test', + command: 'reloadExternalSecretsProviders', + }), + ); + const res2 = await handleCommandMessageMain( + JSON.stringify({ + senderId: 'test', + command: 'reloadExternalSecretsProviders', + }), + ); + expect(helpers.debounceMessageReceiver).toHaveBeenCalledTimes(2); + expect(res1!.payload).toBeUndefined(); + expect(res2!.payload!.result).toEqual('debounced'); + }); });