From 9f797b96d818a5ae74ad82917347c99f3c249688 Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Sun, 17 Sep 2023 11:05:54 +0200 Subject: [PATCH] feat(core): Add command to trigger license refresh on workers (#7184) This PR implements the updated license SDK so that worker and webhook instances do not auto-renew licenses any more. Instead, they receive a `reloadLicense` command via the Redis client that will fetch the updated license after it was saved on the main instance This also contains some refactoring with moving redis sub and pub clients into the event bus directly, to prevent cyclic dependency issues. --- packages/cli/package.json | 2 +- packages/cli/src/License.ts | 40 ++++++- packages/cli/src/Server.ts | 4 +- packages/cli/src/commands/BaseCommand.ts | 6 +- packages/cli/src/commands/start.ts | 2 +- packages/cli/src/commands/webhook.ts | 2 +- packages/cli/src/commands/worker.ts | 4 +- .../cli/src/controllers/e2e.controller.ts | 2 +- .../controllers/orchestration.controller.ts | 5 +- .../MessageEventBus/MessageEventBus.ts | 104 ++++++++++++++++-- .../cli/src/services/orchestration.service.ts | 87 ++------------- .../orchestration/handleCommandMessage.ts | 29 +++++ .../handleWorkerResponseMessage.ts | 11 ++ .../cli/src/services/orchestration/helpers.ts | 17 +++ .../services/redis/RedisServiceBaseClasses.ts | 1 + .../services/redis/RedisServiceCommands.ts | 9 +- .../redis/RedisServicePubSubSubscriber.ts | 25 +++++ .../cli/src/worker/workerCommandHandler.ts | 10 +- .../cli/test/integration/eventbus.ee.test.ts | 4 +- packages/cli/test/unit/License.test.ts | 18 +++ .../services/orchestration.service.test.ts | 42 ++++--- pnpm-lock.yaml | 8 +- 22 files changed, 293 insertions(+), 139 deletions(-) create mode 100644 packages/cli/src/services/orchestration/handleCommandMessage.ts create mode 100644 packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts create mode 100644 packages/cli/src/services/orchestration/helpers.ts diff --git a/packages/cli/package.json b/packages/cli/package.json index 0d4b714566..5f29c24378 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -100,7 +100,7 @@ }, "dependencies": { "@n8n/client-oauth2": "workspace:*", - "@n8n_io/license-sdk": "~2.5.1", + "@n8n_io/license-sdk": "~2.6.0", "@oclif/command": "^1.8.16", "@oclif/core": "^1.16.4", "@oclif/errors": "^1.3.6", diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index 8b3a62caf5..e9660be3f1 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -11,8 +11,10 @@ import { SETTINGS_LICENSE_CERT_KEY, UNLIMITED_LICENSE_QUOTA, } from './constants'; -import { Service } from 'typedi'; -import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces'; +import Container, { Service } from 'typedi'; +import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; +import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; +import { RedisService } from './services/redis.service'; type FeatureReturnType = Partial< { @@ -26,18 +28,28 @@ export class License { private manager: LicenseManager | undefined; + instanceId: string | undefined; + + private redisPublisher: RedisServicePubSubPublisher; + constructor() { this.logger = getLogger(); } - async init(instanceId: string) { + async init(instanceId: string, instanceType: N8nInstanceType = 'main') { if (this.manager) { return; } + this.instanceId = instanceId; + const isMainInstance = instanceType === 'main'; const server = config.getEnv('license.serverUrl'); - const autoRenewEnabled = config.getEnv('license.autoRenewEnabled'); + const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled'); + const offlineMode = !isMainInstance; const autoRenewOffset = config.getEnv('license.autoRenewOffset'); + const saveCertStr = isMainInstance + ? async (value: TLicenseBlock) => this.saveCertStr(value) + : async () => {}; try { this.manager = new LicenseManager({ @@ -46,9 +58,10 @@ export class License { productIdentifier: `n8n-${N8N_VERSION}`, autoRenewEnabled, autoRenewOffset, + offlineMode, logger: this.logger, loadCertStr: async () => this.loadCertStr(), - saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value), + saveCertStr, deviceFingerprint: () => instanceId, }); @@ -86,6 +99,15 @@ export class License { }, ['key'], ); + if (config.getEnv('executions.mode') === 'queue') { + if (!this.redisPublisher) { + this.logger.debug('Initializing Redis publisher for License Service'); + this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'reloadLicense', + }); + } } async activate(activationKey: string): Promise { @@ -96,6 +118,14 @@ export class License { await this.manager.activate(activationKey); } + async reload(): Promise { + if (!this.manager) { + return; + } + this.logger.debug('Reloading license'); + await this.manager.reload(); + } + async renew() { if (!this.manager) { return; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 61c14fa1b8..a93882f267 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1470,7 +1470,9 @@ export class Server extends AbstractServer { // ---------------------------------------- if (!eventBus.isInitialized) { - await eventBus.initialize(); + await eventBus.initialize({ + uniqueInstanceId: this.uniqueInstanceId, + }); } if (this.endpointPresetCredentials !== '') { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 008e9f4e93..9b2658ac7f 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting'; import { ExternalHooks } from '@/ExternalHooks'; import { NodeTypes } from '@/NodeTypes'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import type { IExternalHooksClass } from '@/Interfaces'; +import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces'; import { InternalHooks } from '@/InternalHooks'; import { PostHogClient } from '@/posthog'; import { License } from '@/License'; @@ -113,9 +113,9 @@ export abstract class BaseCommand extends Command { await this.externalHooks.init(); } - async initLicense(): Promise { + async initLicense(instanceType: N8nInstanceType = 'main'): Promise { const license = Container.get(License); - await license.init(this.instanceId); + await license.init(this.instanceId, instanceType); const activationKey = config.getEnv('license.activationKey'); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 15000ab56a..b0a9b0b739 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -197,7 +197,7 @@ export class Start extends BaseCommand { this.logger.info('Initializing n8n process'); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); - await this.initLicense(); + await this.initLicense('main'); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 7d5bf46302..912f8a2276 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -77,7 +77,7 @@ export class Webhook extends BaseCommand { await this.initCrashJournal(); await super.init(); - await this.initLicense(); + await this.initLicense('webhook'); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 30cf4be73e..bffaea3e4a 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -256,7 +256,7 @@ export class Worker extends BaseCommand { this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`); this.logger.debug('Starting n8n worker...'); - await this.initLicense(); + await this.initLicense('worker'); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); @@ -268,6 +268,7 @@ export class Worker extends BaseCommand { async initEventBus() { await eventBus.initialize({ workerId: this.uniqueInstanceId, + uniqueInstanceId: this.uniqueInstanceId, }); } @@ -295,6 +296,7 @@ export class Worker extends BaseCommand { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument getWorkerCommandReceivedHandler({ uniqueInstanceId: this.uniqueInstanceId, + instanceId: this.instanceId, redisPublisher: this.redisPublisher, getRunningJobIds: () => Object.keys(Worker.runningJobs), }), diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index 0957ea95ee..f4e7a74499 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -109,7 +109,7 @@ export class E2EController { private async resetLogStreaming() { for (const id in eventBus.destinations) { - await eventBus.removeDestination(id); + await eventBus.removeDestination(id, false); } } diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index fd6b68ad4e..5386b75698 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,15 +1,12 @@ -import config from '@/config'; import { Authorized, Get, RestController } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; import { Service } from 'typedi'; -import { OrchestrationService } from '../services/orchestration.service'; +import { OrchestrationService } from '@/services/orchestration.service'; @Authorized(['global', 'owner']) @RestController('/orchestration') @Service() export class OrchestrationController { - private config = config; - constructor(private readonly orchestrationService: OrchestrationService) {} /** diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 43059bf159..5a6738c5f8 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,4 +1,4 @@ -import { LoggerProxy } from 'n8n-workflow'; +import { LoggerProxy, jsonParse } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { DeleteResult } from 'typeorm'; import type { @@ -27,9 +27,18 @@ import { } from '../EventMessageClasses/EventMessageGeneric'; import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; -import Container from 'typedi'; +import Container, { Service } from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; -import { OrchestrationService } from '../../services/orchestration.service'; +import { RedisService } from '@/services/redis.service'; +import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, +} from '@/services/redis/RedisServiceHelper'; +import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; +import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; +import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -41,13 +50,21 @@ export interface MessageWithCallback { export interface MessageEventBusInitializeOptions { skipRecoveryPass?: boolean; workerId?: string; + uniqueInstanceId?: string; } +@Service() export class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; isInitialized: boolean; + uniqueInstanceId: string; + + redisPublisher: RedisServicePubSubPublisher; + + redisSubscriber: RedisServicePubSubSubscriber; + logWriter: MessageEventBusLogWriter; destinations: { @@ -76,11 +93,30 @@ export class MessageEventBus extends EventEmitter { * * Sets `isInitialized` to `true` once finished. */ - async initialize(options?: MessageEventBusInitializeOptions): Promise { + async initialize(options: MessageEventBusInitializeOptions): Promise { if (this.isInitialized) { return; } + this.uniqueInstanceId = options?.uniqueInstanceId ?? ''; + + if (config.getEnv('executions.mode') === 'queue') { + this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); + this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber(); + await this.redisSubscriber.subscribeToEventLog(); + await this.redisSubscriber.subscribeToCommandChannel(); + this.redisSubscriber.addMessageHandler( + 'MessageEventBusMessageReceiver', + async (channel: string, messageString: string) => { + if (channel === EVENT_BUS_REDIS_CHANNEL) { + await this.handleRedisEventBusMessage(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await this.handleRedisCommandMessage(messageString); + } + }, + ); + } + LoggerProxy.debug('Initializing event bus...'); const savedEventDestinations = await Db.collections.EventDestinations.find({}); @@ -89,7 +125,7 @@ export class MessageEventBus extends EventEmitter { try { const destination = messageEventBusDestinationFromDb(this, destinationData); if (destination) { - await this.addDestination(destination); + await this.addDestination(destination, false); } } catch (error) { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access @@ -182,10 +218,13 @@ export class MessageEventBus extends EventEmitter { this.isInitialized = true; } - async addDestination(destination: MessageEventBusDestination) { - await this.removeDestination(destination.getId()); + async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) { + await this.removeDestination(destination.getId(), false); this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); + if (notifyWorkers) { + await this.broadcastRestartEventbusAfterDestinationUpdate(); + } return destination; } @@ -199,19 +238,62 @@ export class MessageEventBus extends EventEmitter { return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); } - async removeDestination(id: string): Promise { + async removeDestination( + id: string, + notifyWorkers: boolean = true, + ): Promise { let result; if (Object.keys(this.destinations).includes(id)) { await this.destinations[id].close(); result = await this.destinations[id].deleteFromDb(); delete this.destinations[id]; } + if (notifyWorkers) { + await this.broadcastRestartEventbusAfterDestinationUpdate(); + } return result; } + async handleRedisEventBusMessage(messageString: string) { + const eventData = jsonParse(messageString); + if (eventData) { + const eventMessage = getEventMessageObjectByType(eventData); + if (eventMessage) { + await Container.get(MessageEventBus).send(eventMessage); + } + } + return eventData; + } + + async handleRedisCommandMessage(messageString: string) { + const message = messageToRedisServiceCommandObject(messageString); + if (message) { + if ( + message.senderId === this.uniqueInstanceId || + (message.targets && !message.targets.includes(this.uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'restartEventBus': + await this.restart(); + default: + break; + } + return message; + } + return; + } + async broadcastRestartEventbusAfterDestinationUpdate() { if (config.getEnv('executions.mode') === 'queue') { - await Container.get(OrchestrationService).restartEventBus(); + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'restartEventBus', + }); } } @@ -235,6 +317,8 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } + await this.redisSubscriber?.unSubscribeFromCommandChannel(); + await this.redisSubscriber?.unSubscribeFromEventLog(); this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } @@ -417,4 +501,4 @@ export class MessageEventBus extends EventEmitter { } } -export const eventBus = MessageEventBus.getInstance(); +export const eventBus = Container.get(MessageEventBus); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index d3f8c7e27e..17fcab79dd 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -2,19 +2,9 @@ import { Service } from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; -import { LoggerProxy, jsonParse } from 'n8n-workflow'; -import { eventBus } from '../eventbus'; -import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions'; -import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers'; -import type { - RedisServiceCommandObject, - RedisServiceWorkerResponseObject, -} from './redis/RedisServiceCommands'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './redis/RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper'; +import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; +import { handleCommandMessage } from './orchestration/handleCommandMessage'; @Service() export class OrchestrationService { @@ -51,81 +41,21 @@ export class OrchestrationService { private async initSubscriber() { this.redisSubscriber = await this.redisService.getPubSubSubscriber(); - // TODO: these are all proof of concept implementations for the moment - // until worker communication is implemented - // #region proof of concept - await this.redisSubscriber.subscribeToEventLog(); await this.redisSubscriber.subscribeToWorkerResponseChannel(); await this.redisSubscriber.subscribeToCommandChannel(); this.redisSubscriber.addMessageHandler( 'OrchestrationMessageReceiver', async (channel: string, messageString: string) => { - // TODO: this is a proof of concept implementation to forward events to the main instance's event bus - // Events are arriving through a pub/sub channel and are forwarded to the eventBus - // In the future, a stream should probably replace this implementation entirely - if (channel === EVENT_BUS_REDIS_CHANNEL) { - await this.handleEventBusMessage(messageString); - } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await this.handleWorkerResponseMessage(messageString); + if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await handleWorkerResponseMessage(messageString); } else if (channel === COMMAND_REDIS_CHANNEL) { - await this.handleCommandMessage(messageString); + await handleCommandMessage(messageString, this.uniqueInstanceId); } }, ); } - async handleWorkerResponseMessage(messageString: string) { - const workerResponse = jsonParse(messageString); - if (workerResponse) { - // TODO: Handle worker response - LoggerProxy.debug('Received worker response', workerResponse); - } - return workerResponse; - } - - async handleEventBusMessage(messageString: string) { - const eventData = jsonParse(messageString); - if (eventData) { - const eventMessage = getEventMessageObjectByType(eventData); - if (eventMessage) { - await eventBus.send(eventMessage); - } - } - return eventData; - } - - async handleCommandMessage(messageString: string) { - if (!messageString) return; - let message: RedisServiceCommandObject; - try { - message = jsonParse(messageString); - } catch { - LoggerProxy.debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - if ( - message.senderId === this.uniqueInstanceId || - (message.targets && !message.targets.includes(this.uniqueInstanceId)) - ) { - LoggerProxy.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - switch (message.command) { - case 'restartEventBus': - await eventBus.restart(); - break; - } - return message; - } - return; - } - async getWorkerStatus(id?: string) { if (!this.initialized) { throw new Error('OrchestrationService not initialized'); @@ -159,13 +89,14 @@ export class OrchestrationService { }); } - async restartEventBus(id?: string) { + // reload the license on workers after it was changed on the main instance + async reloadLicense(id?: string) { if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } await this.redisPublisher.publishToCommandChannel({ senderId: this.uniqueInstanceId, - command: 'restartEventBus', + command: 'reloadLicense', targets: id ? [id] : undefined, }); } diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/handleCommandMessage.ts new file mode 100644 index 0000000000..8a04cb3ba7 --- /dev/null +++ b/packages/cli/src/services/orchestration/handleCommandMessage.ts @@ -0,0 +1,29 @@ +import { LoggerProxy } from 'n8n-workflow'; +import { messageToRedisServiceCommandObject } from './helpers'; +import Container from 'typedi'; +import { License } from '@/License'; + +// this function handles commands sent to the MAIN instance. the workers handle their own commands +export async function handleCommandMessage(messageString: string, uniqueInstanceId: string) { + const message = messageToRedisServiceCommandObject(messageString); + if (message) { + if ( + message.senderId === uniqueInstanceId || + (message.targets && !message.targets.includes(uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'reloadLicense': + await Container.get(License).reload(); + break; + default: + break; + } + return message; + } + return; +} diff --git a/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts b/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts new file mode 100644 index 0000000000..ee22318638 --- /dev/null +++ b/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts @@ -0,0 +1,11 @@ +import { jsonParse, LoggerProxy } from 'n8n-workflow'; +import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands'; + +export async function handleWorkerResponseMessage(messageString: string) { + const workerResponse = jsonParse(messageString); + if (workerResponse) { + // TODO: Handle worker response + LoggerProxy.debug('Received worker response', workerResponse); + } + return workerResponse; +} diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts new file mode 100644 index 0000000000..6996391f40 --- /dev/null +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -0,0 +1,17 @@ +import { LoggerProxy, jsonParse } from 'n8n-workflow'; +import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; +import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; + +export function messageToRedisServiceCommandObject(messageString: string) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + return message; +} diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index 2ed9d94eee..da16aa25e7 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -49,6 +49,7 @@ class RedisServiceBase { return; } await this.redisClient.quit(); + this.isInitialized = false; this.redisClient = undefined; } } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 5796560d4b..a57e190047 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,4 +1,9 @@ -export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands +export type RedisServiceCommand = + | 'getStatus' + | 'getId' + | 'restartEventBus' + | 'stopWorker' + | 'reloadLicense'; /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -7,7 +12,7 @@ export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 's * @field payload: Optional arguments to be sent with the command. */ type RedisServiceBaseCommand = { - senderId: string; + senderId?: string; command: RedisServiceCommand; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index 404544d6f9..240fe7e1af 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -32,6 +32,19 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { }); } + async unsubscribe(channel: string): Promise { + if (!this.redisClient) { + return; + } + await this.redisClient?.unsubscribe(channel, (error, _count: number) => { + if (error) { + Logger.error(`Error unsubscribing from channel ${channel}`); + } else { + Logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`); + } + }); + } + async subscribeToEventLog(): Promise { await this.subscribe(EVENT_BUS_REDIS_CHANNEL); } @@ -43,4 +56,16 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { async subscribeToWorkerResponseChannel(): Promise { await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); } + + async unSubscribeFromEventLog(): Promise { + await this.unsubscribe(EVENT_BUS_REDIS_CHANNEL); + } + + async unSubscribeFromCommandChannel(): Promise { + await this.unsubscribe(COMMAND_REDIS_CHANNEL); + } + + async unSubscribeFromWorkerResponseChannel(): Promise { + await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL); + } } diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts index 874ead410c..acd488624a 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -1,12 +1,14 @@ import { jsonParse, LoggerProxy } from 'n8n-workflow'; -import { eventBus } from '../eventbus'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; -import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import * as os from 'os'; +import Container from 'typedi'; +import { License } from '@/License'; export function getWorkerCommandReceivedHandler(options: { uniqueInstanceId: string; + instanceId: string; redisPublisher: RedisServicePubSubPublisher; getRunningJobIds: () => string[]; }) { @@ -56,7 +58,6 @@ export function getWorkerCommandReceivedHandler(options: { }); break; case 'restartEventBus': - await eventBus.restart(); await options.redisPublisher.publishToWorkerChannel({ workerId: options.uniqueInstanceId, command: message.command, @@ -65,6 +66,9 @@ export function getWorkerCommandReceivedHandler(options: { }, }); break; + case 'reloadLicense': + await Container.get(License).reload(); + break; case 'stopWorker': // TODO: implement proper shutdown // await this.stopProcess(); diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index e06e10f6c0..264c2a5835 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -91,7 +91,9 @@ beforeAll(async () => { config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.keepLogCount', 1); - await eventBus.initialize(); + await eventBus.initialize({ + uniqueInstanceId: 'test', + }); }); afterAll(async () => { diff --git a/packages/cli/test/unit/License.test.ts b/packages/cli/test/unit/License.test.ts index 395d163545..f035e6d927 100644 --- a/packages/cli/test/unit/License.test.ts +++ b/packages/cli/test/unit/License.test.ts @@ -31,6 +31,24 @@ describe('License', () => { expect(LicenseManager).toHaveBeenCalledWith({ autoRenewEnabled: true, autoRenewOffset: MOCK_RENEW_OFFSET, + offlineMode: false, + deviceFingerprint: expect.any(Function), + productIdentifier: `n8n-${N8N_VERSION}`, + logger: expect.anything(), + loadCertStr: expect.any(Function), + saveCertStr: expect.any(Function), + server: MOCK_SERVER_URL, + tenantId: 1, + }); + }); + + test('initializes license manager for worker', async () => { + license = new License(); + await license.init(MOCK_INSTANCE_ID, 'worker'); + expect(LicenseManager).toHaveBeenCalledWith({ + autoRenewEnabled: false, + autoRenewOffset: MOCK_RENEW_OFFSET, + offlineMode: true, deviceFingerprint: expect.any(Function), productIdentifier: `n8n-${N8N_VERSION}`, logger: expect.anything(), diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 18204cea2b..127aac690d 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -6,9 +6,11 @@ import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import { eventBus } from '@/eventbus'; -import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers'; import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; +import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; +import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; +import { License } from '../../../src/License'; const os = Container.get(OrchestrationService); @@ -77,6 +79,7 @@ describe('Orchestration Service', () => { afterAll(async () => { jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); + await os.shutdown(); }); test('should initialize', async () => { @@ -87,38 +90,35 @@ describe('Orchestration Service', () => { }); test('should handle worker responses', async () => { - const response = await os.handleWorkerResponseMessage( + const response = await handleWorkerResponseMessage( JSON.stringify(workerRestartEventbusResponse), ); expect(response.command).toEqual('restartEventBus'); }); - test('should handle event messages', async () => { - const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage)); - jest.spyOn(eventBus, 'send'); - jest.spyOn(EventHelpers, 'getEventMessageObjectByType'); - expect(eventBus.send).toHaveBeenCalled(); - expect(response.eventName).toEqual('n8n.workflow.success'); - jest.spyOn(eventBus, 'send').mockRestore(); - jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore(); - }); - test('should handle command messages from others', async () => { - jest.spyOn(eventBus, 'restart'); - const responseFalseId = await os.handleCommandMessage( - JSON.stringify(workerRestartEventbusResponse), + const license = Container.get(License); + license.instanceId = 'test'; + jest.spyOn(license, 'reload'); + const responseFalseId = await handleCommandMessage( + JSON.stringify({ + senderId: 'test', + command: 'reloadLicense', + }), + os.uniqueInstanceId, ); expect(responseFalseId).toBeDefined(); - expect(responseFalseId!.command).toEqual('restartEventBus'); + expect(responseFalseId!.command).toEqual('reloadLicense'); expect(responseFalseId!.senderId).toEqual('test'); - expect(eventBus.restart).toHaveBeenCalled(); - jest.spyOn(eventBus, 'restart').mockRestore(); + expect(license.reload).toHaveBeenCalled(); + jest.spyOn(license, 'reload').mockRestore(); }); test('should reject command messages from iteslf', async () => { jest.spyOn(eventBus, 'restart'); - const response = await os.handleCommandMessage( + const response = await handleCommandMessage( JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), + os.uniqueInstanceId, ); expect(response).toBeDefined(); expect(response!.command).toEqual('restartEventBus'); @@ -133,8 +133,4 @@ describe('Orchestration Service', () => { expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); }); - - afterAll(async () => { - await os.shutdown(); - }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b7a4dc8180..be5a4146a0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -195,8 +195,8 @@ importers: specifier: workspace:* version: link:../@n8n/client-oauth2 '@n8n_io/license-sdk': - specifier: ~2.5.1 - version: 2.5.1 + specifier: ~2.6.0 + version: 2.6.0 '@oclif/command': specifier: ^1.8.16 version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) @@ -4656,8 +4656,8 @@ packages: acorn-walk: 8.2.0 dev: false - /@n8n_io/license-sdk@2.5.1: - resolution: {integrity: sha512-CL4JVJS8nvI8qPFQ1jSG7CiPnNkeKJSgbDxWOLVX4MRjTKrwL8Cpd1LeYMx5g5StmHzkoxz2TDqL8WT6qyMlrQ==} + /@n8n_io/license-sdk@2.6.0: + resolution: {integrity: sha512-jPUn8xKAZMWgFw8w6BwqbdlZ1Et4tZcPUdOfEzxpWxEmgtCEAdbl3V0ygP3pTXyWY0hblvv8QzbHOUrK25hQSA==} engines: {node: '>=14.0.0', npm: '>=7.10.0'} dependencies: crypto-js: 4.1.1