diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 320c08aa9c..8985d3addb 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -2,12 +2,13 @@ import express from 'express'; import http from 'http'; import type PCancelable from 'p-cancelable'; import { Container } from 'typedi'; +import * as os from 'os'; import { flags } from '@oclif/command'; import { WorkflowExecute } from 'n8n-core'; import type { ExecutionStatus, IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; -import { Workflow, NodeOperationError, LoggerProxy, sleep } from 'n8n-workflow'; +import { Workflow, NodeOperationError, LoggerProxy, sleep, jsonParse } from 'n8n-workflow'; import * as Db from '@/Db'; import * as ResponseHelper from '@/ResponseHelper'; @@ -31,7 +32,8 @@ import { eventBus } from '../eventbus'; import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric'; -import { getWorkerCommandReceivedHandler } from './workerCommandHandler'; +import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; +import { type RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -281,12 +283,7 @@ export class Worker extends BaseCommand { await this.redisSubscriber.subscribeToCommandChannel(); this.redisSubscriber.addMessageHandler( 'WorkerCommandReceivedHandler', - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - getWorkerCommandReceivedHandler({ - uniqueInstanceId: this.uniqueInstanceId, - redisPublisher: this.redisPublisher, - getRunningJobIds: () => Object.keys(Worker.runningJobs), - }), + this.getWorkerCommandReceivedHandler(), ); } @@ -467,4 +464,78 @@ export class Worker extends BaseCommand { async catch(error: Error) { await this.exitWithCrash('Worker exiting due to an error.', error); } + + private getWorkerCommandReceivedHandler() { + const { uniqueInstanceId, redisPublisher } = this; + const getRunningJobIds = () => Object.keys(Worker.runningJobs); + return async (channel: string, messageString: string) => { + if (channel === COMMAND_REDIS_CHANNEL) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if (message.targets && !message.targets.includes(uniqueInstanceId)) { + return; // early return if the message is not for this worker + } + switch (message.command) { + case 'getStatus': + await redisPublisher.publishToWorkerChannel({ + workerId: uniqueInstanceId, + command: message.command, + payload: { + workerId: uniqueInstanceId, + runningJobs: getRunningJobIds(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + net: Object.values(os.networkInterfaces()).flatMap( + (interfaces) => + interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '', + ), + }, + }); + break; + case 'getId': + await redisPublisher.publishToWorkerChannel({ + workerId: uniqueInstanceId, + command: message.command, + }); + break; + case 'restartEventBus': + await eventBus.restart(); + await redisPublisher.publishToWorkerChannel({ + workerId: uniqueInstanceId, + command: message.command, + payload: { + result: 'success', + }, + }); + break; + case 'stopWorker': + // TODO: implement proper shutdown + // await this.stopProcess(); + break; + default: + LoggerProxy.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, + ); + break; + } + } + } + }; + } } diff --git a/packages/cli/src/commands/workerCommandHandler.ts b/packages/cli/src/commands/workerCommandHandler.ts deleted file mode 100644 index 874ead410c..0000000000 --- a/packages/cli/src/commands/workerCommandHandler.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { jsonParse, LoggerProxy } from 'n8n-workflow'; -import { eventBus } from '../eventbus'; -import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; -import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; -import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; -import * as os from 'os'; - -export function getWorkerCommandReceivedHandler(options: { - uniqueInstanceId: string; - redisPublisher: RedisServicePubSubPublisher; - getRunningJobIds: () => string[]; -}) { - return async (channel: string, messageString: string) => { - if (channel === COMMAND_REDIS_CHANNEL) { - if (!messageString) return; - let message: RedisServiceCommandObject; - try { - message = jsonParse(messageString); - } catch { - LoggerProxy.debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, - ); - return; - } - if (message) { - if (message.targets && !message.targets.includes(options.uniqueInstanceId)) { - return; // early return if the message is not for this worker - } - switch (message.command) { - case 'getStatus': - await options.redisPublisher.publishToWorkerChannel({ - workerId: options.uniqueInstanceId, - command: message.command, - payload: { - workerId: options.uniqueInstanceId, - runningJobs: options.getRunningJobIds(), - freeMem: os.freemem(), - totalMem: os.totalmem(), - uptime: process.uptime(), - loadAvg: os.loadavg(), - cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), - arch: os.arch(), - platform: os.platform(), - hostname: os.hostname(), - net: Object.values(os.networkInterfaces()).flatMap( - (interfaces) => - interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '', - ), - }, - }); - break; - case 'getId': - await options.redisPublisher.publishToWorkerChannel({ - workerId: options.uniqueInstanceId, - command: message.command, - }); - break; - case 'restartEventBus': - await eventBus.restart(); - await options.redisPublisher.publishToWorkerChannel({ - workerId: options.uniqueInstanceId, - command: message.command, - payload: { - result: 'success', - }, - }); - break; - case 'stopWorker': - // TODO: implement proper shutdown - // await this.stopProcess(); - break; - default: - LoggerProxy.debug( - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, - ); - break; - } - } - } - }; -}