diff --git a/packages/cli/package.json b/packages/cli/package.json index 0d4b714566..5f29c24378 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -100,7 +100,7 @@ }, "dependencies": { "@n8n/client-oauth2": "workspace:*", - "@n8n_io/license-sdk": "~2.5.1", + "@n8n_io/license-sdk": "~2.6.0", "@oclif/command": "^1.8.16", "@oclif/core": "^1.16.4", "@oclif/errors": "^1.3.6", diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index 8b3a62caf5..e9660be3f1 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -11,8 +11,10 @@ import { SETTINGS_LICENSE_CERT_KEY, UNLIMITED_LICENSE_QUOTA, } from './constants'; -import { Service } from 'typedi'; -import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces'; +import Container, { Service } from 'typedi'; +import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; +import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; +import { RedisService } from './services/redis.service'; type FeatureReturnType = Partial< { @@ -26,18 +28,28 @@ export class License { private manager: LicenseManager | undefined; + instanceId: string | undefined; + + private redisPublisher: RedisServicePubSubPublisher; + constructor() { this.logger = getLogger(); } - async init(instanceId: string) { + async init(instanceId: string, instanceType: N8nInstanceType = 'main') { if (this.manager) { return; } + this.instanceId = instanceId; + const isMainInstance = instanceType === 'main'; const server = config.getEnv('license.serverUrl'); - const autoRenewEnabled = config.getEnv('license.autoRenewEnabled'); + const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled'); + const offlineMode = !isMainInstance; const autoRenewOffset = config.getEnv('license.autoRenewOffset'); + const saveCertStr = isMainInstance + ? async (value: TLicenseBlock) => this.saveCertStr(value) + : async () => {}; try { this.manager = new LicenseManager({ @@ -46,9 +58,10 @@ export class License { productIdentifier: `n8n-${N8N_VERSION}`, autoRenewEnabled, autoRenewOffset, + offlineMode, logger: this.logger, loadCertStr: async () => this.loadCertStr(), - saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value), + saveCertStr, deviceFingerprint: () => instanceId, }); @@ -86,6 +99,15 @@ export class License { }, ['key'], ); + if (config.getEnv('executions.mode') === 'queue') { + if (!this.redisPublisher) { + this.logger.debug('Initializing Redis publisher for License Service'); + this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'reloadLicense', + }); + } } async activate(activationKey: string): Promise { @@ -96,6 +118,14 @@ export class License { await this.manager.activate(activationKey); } + async reload(): Promise { + if (!this.manager) { + return; + } + this.logger.debug('Reloading license'); + await this.manager.reload(); + } + async renew() { if (!this.manager) { return; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 61c14fa1b8..a93882f267 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1470,7 +1470,9 @@ export class Server extends AbstractServer { // ---------------------------------------- if (!eventBus.isInitialized) { - await eventBus.initialize(); + await eventBus.initialize({ + uniqueInstanceId: this.uniqueInstanceId, + }); } if (this.endpointPresetCredentials !== '') { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 008e9f4e93..9b2658ac7f 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting'; import { ExternalHooks } from '@/ExternalHooks'; import { NodeTypes } from '@/NodeTypes'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import type { IExternalHooksClass } from '@/Interfaces'; +import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces'; import { InternalHooks } from '@/InternalHooks'; import { PostHogClient } from '@/posthog'; import { License } from '@/License'; @@ -113,9 +113,9 @@ export abstract class BaseCommand extends Command { await this.externalHooks.init(); } - async initLicense(): Promise { + async initLicense(instanceType: N8nInstanceType = 'main'): Promise { const license = Container.get(License); - await license.init(this.instanceId); + await license.init(this.instanceId, instanceType); const activationKey = config.getEnv('license.activationKey'); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 15000ab56a..b0a9b0b739 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -197,7 +197,7 @@ export class Start extends BaseCommand { this.logger.info('Initializing n8n process'); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); - await this.initLicense(); + await this.initLicense('main'); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 7d5bf46302..912f8a2276 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -77,7 +77,7 @@ export class Webhook extends BaseCommand { await this.initCrashJournal(); await super.init(); - await this.initLicense(); + await this.initLicense('webhook'); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 30cf4be73e..bffaea3e4a 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -256,7 +256,7 @@ export class Worker extends BaseCommand { this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`); this.logger.debug('Starting n8n worker...'); - await this.initLicense(); + await this.initLicense('worker'); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); @@ -268,6 +268,7 @@ export class Worker extends BaseCommand { async initEventBus() { await eventBus.initialize({ workerId: this.uniqueInstanceId, + uniqueInstanceId: this.uniqueInstanceId, }); } @@ -295,6 +296,7 @@ export class Worker extends BaseCommand { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument getWorkerCommandReceivedHandler({ uniqueInstanceId: this.uniqueInstanceId, + instanceId: this.instanceId, redisPublisher: this.redisPublisher, getRunningJobIds: () => Object.keys(Worker.runningJobs), }), diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index 0957ea95ee..f4e7a74499 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -109,7 +109,7 @@ export class E2EController { private async resetLogStreaming() { for (const id in eventBus.destinations) { - await eventBus.removeDestination(id); + await eventBus.removeDestination(id, false); } } diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index fd6b68ad4e..5386b75698 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,15 +1,12 @@ -import config from '@/config'; import { Authorized, Get, RestController } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; import { Service } from 'typedi'; -import { OrchestrationService } from '../services/orchestration.service'; +import { OrchestrationService } from '@/services/orchestration.service'; @Authorized(['global', 'owner']) @RestController('/orchestration') @Service() export class OrchestrationController { - private config = config; - constructor(private readonly orchestrationService: OrchestrationService) {} /** diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 43059bf159..5a6738c5f8 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,4 +1,4 @@ -import { LoggerProxy } from 'n8n-workflow'; +import { LoggerProxy, jsonParse } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { DeleteResult } from 'typeorm'; import type { @@ -27,9 +27,18 @@ import { } from '../EventMessageClasses/EventMessageGeneric'; import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; -import Container from 'typedi'; +import Container, { Service } from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; -import { OrchestrationService } from '../../services/orchestration.service'; +import { RedisService } from '@/services/redis.service'; +import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, +} from '@/services/redis/RedisServiceHelper'; +import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; +import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; +import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -41,13 +50,21 @@ export interface MessageWithCallback { export interface MessageEventBusInitializeOptions { skipRecoveryPass?: boolean; workerId?: string; + uniqueInstanceId?: string; } +@Service() export class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; isInitialized: boolean; + uniqueInstanceId: string; + + redisPublisher: RedisServicePubSubPublisher; + + redisSubscriber: RedisServicePubSubSubscriber; + logWriter: MessageEventBusLogWriter; destinations: { @@ -76,11 +93,30 @@ export class MessageEventBus extends EventEmitter { * * Sets `isInitialized` to `true` once finished. */ - async initialize(options?: MessageEventBusInitializeOptions): Promise { + async initialize(options: MessageEventBusInitializeOptions): Promise { if (this.isInitialized) { return; } + this.uniqueInstanceId = options?.uniqueInstanceId ?? ''; + + if (config.getEnv('executions.mode') === 'queue') { + this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); + this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber(); + await this.redisSubscriber.subscribeToEventLog(); + await this.redisSubscriber.subscribeToCommandChannel(); + this.redisSubscriber.addMessageHandler( + 'MessageEventBusMessageReceiver', + async (channel: string, messageString: string) => { + if (channel === EVENT_BUS_REDIS_CHANNEL) { + await this.handleRedisEventBusMessage(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await this.handleRedisCommandMessage(messageString); + } + }, + ); + } + LoggerProxy.debug('Initializing event bus...'); const savedEventDestinations = await Db.collections.EventDestinations.find({}); @@ -89,7 +125,7 @@ export class MessageEventBus extends EventEmitter { try { const destination = messageEventBusDestinationFromDb(this, destinationData); if (destination) { - await this.addDestination(destination); + await this.addDestination(destination, false); } } catch (error) { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access @@ -182,10 +218,13 @@ export class MessageEventBus extends EventEmitter { this.isInitialized = true; } - async addDestination(destination: MessageEventBusDestination) { - await this.removeDestination(destination.getId()); + async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) { + await this.removeDestination(destination.getId(), false); this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); + if (notifyWorkers) { + await this.broadcastRestartEventbusAfterDestinationUpdate(); + } return destination; } @@ -199,19 +238,62 @@ export class MessageEventBus extends EventEmitter { return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); } - async removeDestination(id: string): Promise { + async removeDestination( + id: string, + notifyWorkers: boolean = true, + ): Promise { let result; if (Object.keys(this.destinations).includes(id)) { await this.destinations[id].close(); result = await this.destinations[id].deleteFromDb(); delete this.destinations[id]; } + if (notifyWorkers) { + await this.broadcastRestartEventbusAfterDestinationUpdate(); + } return result; } + async handleRedisEventBusMessage(messageString: string) { + const eventData = jsonParse(messageString); + if (eventData) { + const eventMessage = getEventMessageObjectByType(eventData); + if (eventMessage) { + await Container.get(MessageEventBus).send(eventMessage); + } + } + return eventData; + } + + async handleRedisCommandMessage(messageString: string) { + const message = messageToRedisServiceCommandObject(messageString); + if (message) { + if ( + message.senderId === this.uniqueInstanceId || + (message.targets && !message.targets.includes(this.uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'restartEventBus': + await this.restart(); + default: + break; + } + return message; + } + return; + } + async broadcastRestartEventbusAfterDestinationUpdate() { if (config.getEnv('executions.mode') === 'queue') { - await Container.get(OrchestrationService).restartEventBus(); + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'restartEventBus', + }); } } @@ -235,6 +317,8 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } + await this.redisSubscriber?.unSubscribeFromCommandChannel(); + await this.redisSubscriber?.unSubscribeFromEventLog(); this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } @@ -417,4 +501,4 @@ export class MessageEventBus extends EventEmitter { } } -export const eventBus = MessageEventBus.getInstance(); +export const eventBus = Container.get(MessageEventBus); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index d3f8c7e27e..17fcab79dd 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -2,19 +2,9 @@ import { Service } from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; -import { LoggerProxy, jsonParse } from 'n8n-workflow'; -import { eventBus } from '../eventbus'; -import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions'; -import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers'; -import type { - RedisServiceCommandObject, - RedisServiceWorkerResponseObject, -} from './redis/RedisServiceCommands'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './redis/RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper'; +import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; +import { handleCommandMessage } from './orchestration/handleCommandMessage'; @Service() export class OrchestrationService { @@ -51,81 +41,21 @@ export class OrchestrationService { private async initSubscriber() { this.redisSubscriber = await this.redisService.getPubSubSubscriber(); - // TODO: these are all proof of concept implementations for the moment - // until worker communication is implemented - // #region proof of concept - await this.redisSubscriber.subscribeToEventLog(); await this.redisSubscriber.subscribeToWorkerResponseChannel(); await this.redisSubscriber.subscribeToCommandChannel(); this.redisSubscriber.addMessageHandler( 'OrchestrationMessageReceiver', async (channel: string, messageString: string) => { - // TODO: this is a proof of concept implementation to forward events to the main instance's event bus - // Events are arriving through a pub/sub channel and are forwarded to the eventBus - // In the future, a stream should probably replace this implementation entirely - if (channel === EVENT_BUS_REDIS_CHANNEL) { - await this.handleEventBusMessage(messageString); - } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await this.handleWorkerResponseMessage(messageString); + if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await handleWorkerResponseMessage(messageString); } else if (channel === COMMAND_REDIS_CHANNEL) { - await this.handleCommandMessage(messageString); + await handleCommandMessage(messageString, this.uniqueInstanceId); } }, ); } - async handleWorkerResponseMessage(messageString: string) { - const workerResponse = jsonParse(messageString); - if (workerResponse) { - // TODO: Handle worker response - LoggerProxy.debug('Received worker response', workerResponse); - } - return workerResponse; - } - - async handleEventBusMessage(messageString: string) { - const eventData = jsonParse(messageString); - if (eventData) { - const eventMessage = getEventMessageObjectByType(eventData); - if (eventMessage) { - await eventBus.send(eventMessage); - } - } - return eventData; - } - - async handleCommandMessage(messageString: string) { - if (!messageString) return; - let message: RedisServiceCommandObject; - try { - message = jsonParse(messageString); - } catch { - LoggerProxy.debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - if ( - message.senderId === this.uniqueInstanceId || - (message.targets && !message.targets.includes(this.uniqueInstanceId)) - ) { - LoggerProxy.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - switch (message.command) { - case 'restartEventBus': - await eventBus.restart(); - break; - } - return message; - } - return; - } - async getWorkerStatus(id?: string) { if (!this.initialized) { throw new Error('OrchestrationService not initialized'); @@ -159,13 +89,14 @@ export class OrchestrationService { }); } - async restartEventBus(id?: string) { + // reload the license on workers after it was changed on the main instance + async reloadLicense(id?: string) { if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } await this.redisPublisher.publishToCommandChannel({ senderId: this.uniqueInstanceId, - command: 'restartEventBus', + command: 'reloadLicense', targets: id ? [id] : undefined, }); } diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/handleCommandMessage.ts new file mode 100644 index 0000000000..8a04cb3ba7 --- /dev/null +++ b/packages/cli/src/services/orchestration/handleCommandMessage.ts @@ -0,0 +1,29 @@ +import { LoggerProxy } from 'n8n-workflow'; +import { messageToRedisServiceCommandObject } from './helpers'; +import Container from 'typedi'; +import { License } from '@/License'; + +// this function handles commands sent to the MAIN instance. the workers handle their own commands +export async function handleCommandMessage(messageString: string, uniqueInstanceId: string) { + const message = messageToRedisServiceCommandObject(messageString); + if (message) { + if ( + message.senderId === uniqueInstanceId || + (message.targets && !message.targets.includes(uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'reloadLicense': + await Container.get(License).reload(); + break; + default: + break; + } + return message; + } + return; +} diff --git a/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts b/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts new file mode 100644 index 0000000000..ee22318638 --- /dev/null +++ b/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts @@ -0,0 +1,11 @@ +import { jsonParse, LoggerProxy } from 'n8n-workflow'; +import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands'; + +export async function handleWorkerResponseMessage(messageString: string) { + const workerResponse = jsonParse(messageString); + if (workerResponse) { + // TODO: Handle worker response + LoggerProxy.debug('Received worker response', workerResponse); + } + return workerResponse; +} diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts new file mode 100644 index 0000000000..6996391f40 --- /dev/null +++ b/packages/cli/src/services/orchestration/helpers.ts @@ -0,0 +1,17 @@ +import { LoggerProxy, jsonParse } from 'n8n-workflow'; +import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; +import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; + +export function messageToRedisServiceCommandObject(messageString: string) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + return message; +} diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index 2ed9d94eee..da16aa25e7 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -49,6 +49,7 @@ class RedisServiceBase { return; } await this.redisClient.quit(); + this.isInitialized = false; this.redisClient = undefined; } } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 5796560d4b..a57e190047 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,4 +1,9 @@ -export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands +export type RedisServiceCommand = + | 'getStatus' + | 'getId' + | 'restartEventBus' + | 'stopWorker' + | 'reloadLicense'; /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -7,7 +12,7 @@ export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 's * @field payload: Optional arguments to be sent with the command. */ type RedisServiceBaseCommand = { - senderId: string; + senderId?: string; command: RedisServiceCommand; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index 404544d6f9..240fe7e1af 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -32,6 +32,19 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { }); } + async unsubscribe(channel: string): Promise { + if (!this.redisClient) { + return; + } + await this.redisClient?.unsubscribe(channel, (error, _count: number) => { + if (error) { + Logger.error(`Error unsubscribing from channel ${channel}`); + } else { + Logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`); + } + }); + } + async subscribeToEventLog(): Promise { await this.subscribe(EVENT_BUS_REDIS_CHANNEL); } @@ -43,4 +56,16 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { async subscribeToWorkerResponseChannel(): Promise { await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); } + + async unSubscribeFromEventLog(): Promise { + await this.unsubscribe(EVENT_BUS_REDIS_CHANNEL); + } + + async unSubscribeFromCommandChannel(): Promise { + await this.unsubscribe(COMMAND_REDIS_CHANNEL); + } + + async unSubscribeFromWorkerResponseChannel(): Promise { + await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL); + } } diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts index 874ead410c..acd488624a 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -1,12 +1,14 @@ import { jsonParse, LoggerProxy } from 'n8n-workflow'; -import { eventBus } from '../eventbus'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; -import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; import * as os from 'os'; +import Container from 'typedi'; +import { License } from '@/License'; export function getWorkerCommandReceivedHandler(options: { uniqueInstanceId: string; + instanceId: string; redisPublisher: RedisServicePubSubPublisher; getRunningJobIds: () => string[]; }) { @@ -56,7 +58,6 @@ export function getWorkerCommandReceivedHandler(options: { }); break; case 'restartEventBus': - await eventBus.restart(); await options.redisPublisher.publishToWorkerChannel({ workerId: options.uniqueInstanceId, command: message.command, @@ -65,6 +66,9 @@ export function getWorkerCommandReceivedHandler(options: { }, }); break; + case 'reloadLicense': + await Container.get(License).reload(); + break; case 'stopWorker': // TODO: implement proper shutdown // await this.stopProcess(); diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index e06e10f6c0..264c2a5835 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -91,7 +91,9 @@ beforeAll(async () => { config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.keepLogCount', 1); - await eventBus.initialize(); + await eventBus.initialize({ + uniqueInstanceId: 'test', + }); }); afterAll(async () => { diff --git a/packages/cli/test/unit/License.test.ts b/packages/cli/test/unit/License.test.ts index 395d163545..f035e6d927 100644 --- a/packages/cli/test/unit/License.test.ts +++ b/packages/cli/test/unit/License.test.ts @@ -31,6 +31,24 @@ describe('License', () => { expect(LicenseManager).toHaveBeenCalledWith({ autoRenewEnabled: true, autoRenewOffset: MOCK_RENEW_OFFSET, + offlineMode: false, + deviceFingerprint: expect.any(Function), + productIdentifier: `n8n-${N8N_VERSION}`, + logger: expect.anything(), + loadCertStr: expect.any(Function), + saveCertStr: expect.any(Function), + server: MOCK_SERVER_URL, + tenantId: 1, + }); + }); + + test('initializes license manager for worker', async () => { + license = new License(); + await license.init(MOCK_INSTANCE_ID, 'worker'); + expect(LicenseManager).toHaveBeenCalledWith({ + autoRenewEnabled: false, + autoRenewOffset: MOCK_RENEW_OFFSET, + offlineMode: true, deviceFingerprint: expect.any(Function), productIdentifier: `n8n-${N8N_VERSION}`, logger: expect.anything(), diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 18204cea2b..127aac690d 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -6,9 +6,11 @@ import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import { eventBus } from '@/eventbus'; -import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers'; import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; +import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; +import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; +import { License } from '../../../src/License'; const os = Container.get(OrchestrationService); @@ -77,6 +79,7 @@ describe('Orchestration Service', () => { afterAll(async () => { jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); + await os.shutdown(); }); test('should initialize', async () => { @@ -87,38 +90,35 @@ describe('Orchestration Service', () => { }); test('should handle worker responses', async () => { - const response = await os.handleWorkerResponseMessage( + const response = await handleWorkerResponseMessage( JSON.stringify(workerRestartEventbusResponse), ); expect(response.command).toEqual('restartEventBus'); }); - test('should handle event messages', async () => { - const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage)); - jest.spyOn(eventBus, 'send'); - jest.spyOn(EventHelpers, 'getEventMessageObjectByType'); - expect(eventBus.send).toHaveBeenCalled(); - expect(response.eventName).toEqual('n8n.workflow.success'); - jest.spyOn(eventBus, 'send').mockRestore(); - jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore(); - }); - test('should handle command messages from others', async () => { - jest.spyOn(eventBus, 'restart'); - const responseFalseId = await os.handleCommandMessage( - JSON.stringify(workerRestartEventbusResponse), + const license = Container.get(License); + license.instanceId = 'test'; + jest.spyOn(license, 'reload'); + const responseFalseId = await handleCommandMessage( + JSON.stringify({ + senderId: 'test', + command: 'reloadLicense', + }), + os.uniqueInstanceId, ); expect(responseFalseId).toBeDefined(); - expect(responseFalseId!.command).toEqual('restartEventBus'); + expect(responseFalseId!.command).toEqual('reloadLicense'); expect(responseFalseId!.senderId).toEqual('test'); - expect(eventBus.restart).toHaveBeenCalled(); - jest.spyOn(eventBus, 'restart').mockRestore(); + expect(license.reload).toHaveBeenCalled(); + jest.spyOn(license, 'reload').mockRestore(); }); test('should reject command messages from iteslf', async () => { jest.spyOn(eventBus, 'restart'); - const response = await os.handleCommandMessage( + const response = await handleCommandMessage( JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), + os.uniqueInstanceId, ); expect(response).toBeDefined(); expect(response!.command).toEqual('restartEventBus'); @@ -133,8 +133,4 @@ describe('Orchestration Service', () => { expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); }); - - afterAll(async () => { - await os.shutdown(); - }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b7a4dc8180..be5a4146a0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -195,8 +195,8 @@ importers: specifier: workspace:* version: link:../@n8n/client-oauth2 '@n8n_io/license-sdk': - specifier: ~2.5.1 - version: 2.5.1 + specifier: ~2.6.0 + version: 2.6.0 '@oclif/command': specifier: ^1.8.16 version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) @@ -4656,8 +4656,8 @@ packages: acorn-walk: 8.2.0 dev: false - /@n8n_io/license-sdk@2.5.1: - resolution: {integrity: sha512-CL4JVJS8nvI8qPFQ1jSG7CiPnNkeKJSgbDxWOLVX4MRjTKrwL8Cpd1LeYMx5g5StmHzkoxz2TDqL8WT6qyMlrQ==} + /@n8n_io/license-sdk@2.6.0: + resolution: {integrity: sha512-jPUn8xKAZMWgFw8w6BwqbdlZ1Et4tZcPUdOfEzxpWxEmgtCEAdbl3V0ygP3pTXyWY0hblvv8QzbHOUrK25hQSA==} engines: {node: '>=14.0.0', npm: '>=7.10.0'} dependencies: crypto-js: 4.1.1