diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index fa321a6724..2e25342bc1 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -4,7 +4,7 @@ import type { Server } from 'http'; import express from 'express'; import compression from 'compression'; import isbot from 'isbot'; -import { jsonParse, LoggerProxy as Logger } from 'n8n-workflow'; +import { LoggerProxy as Logger } from 'n8n-workflow'; import config from '@/config'; import { N8N_VERSION, inDevelopment, inTest } from '@/constants'; @@ -18,16 +18,8 @@ import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares'; import { TestWebhooks } from '@/TestWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks'; import { webhookRequestHandler } from '@/WebhookHelpers'; -import { RedisService } from '@/services/redis.service'; -import { eventBus } from './eventbus'; -import type { AbstractEventMessageOptions } from './eventbus/EventMessageClasses/AbstractEventMessageOptions'; -import { getEventMessageObjectByType } from './eventbus/EventMessageClasses/Helpers'; -import type { RedisServiceWorkerResponseObject } from './services/redis/RedisServiceCommands'; -import { - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './services/redis/RedisServiceHelper'; import { generateHostInstanceId } from './databases/utils/generators'; +import { OrchestrationService } from './services/orchestration.service'; export abstract class AbstractServer { protected server: Server; @@ -124,78 +116,11 @@ export abstract class AbstractServer { }); if (config.getEnv('executions.mode') === 'queue') { - await this.setupRedis(); + // will start the redis connections + await Container.get(OrchestrationService).init(this.uniqueInstanceId); } } - // This connection is going to be our heartbeat - // IORedis automatically pings redis and tries to reconnect - // We will be using a retryStrategy to control how and when to exit. - // We are also subscribing to the event log channel to receive events from workers - private async setupRedis() { - const redisService = Container.get(RedisService); - const redisSubscriber = await redisService.getPubSubSubscriber(); - - // TODO: these are all proof of concept implementations for the moment - // until worker communication is implemented - // #region proof of concept - await redisSubscriber.subscribeToEventLog(); - await redisSubscriber.subscribeToWorkerResponseChannel(); - redisSubscriber.addMessageHandler( - 'AbstractServerReceiver', - async (channel: string, message: string) => { - // TODO: this is a proof of concept implementation to forward events to the main instance's event bus - // Events are arriving through a pub/sub channel and are forwarded to the eventBus - // In the future, a stream should probably replace this implementation entirely - if (channel === EVENT_BUS_REDIS_CHANNEL) { - const eventData = jsonParse(message); - if (eventData) { - const eventMessage = getEventMessageObjectByType(eventData); - if (eventMessage) { - await eventBus.send(eventMessage); - } - } - } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - // The back channel from the workers as a pub/sub channel - const workerResponse = jsonParse(message); - if (workerResponse) { - // TODO: Handle worker response - console.log('Received worker response', workerResponse); - } - } - }, - ); - // TODO: Leave comments for now as implementation example - // const redisStreamListener = await redisService.getStreamConsumer(); - // void redisStreamListener.listenToStream('teststream'); - // redisStreamListener.addMessageHandler( - // 'MessageLogger', - // async (stream: string, id: string, message: string[]) => { - // // TODO: this is a proof of concept implementation of a stream consumer - // switch (stream) { - // case EVENT_BUS_REDIS_STREAM: - // case COMMAND_REDIS_STREAM: - // case WORKER_RESPONSE_REDIS_STREAM: - // default: - // LoggerProxy.debug( - // `Received message from stream ${stream} with id ${id} and message ${message.join( - // ',', - // )}`, - // ); - // break; - // } - // }, - // ); - - // const redisListReceiver = await redisService.getListReceiver(); - // await redisListReceiver.init(); - - // setInterval(async () => { - // await redisListReceiver.popLatestWorkerResponse(); - // }, 1000); - // #endregion - } - async init(): Promise { const { app, protocol, sslKey, sslCert } = this; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 3d8f8de2ae..3903373c71 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -177,6 +177,7 @@ import { handleMfaDisable, isMfaFeatureEnabled } from './Mfa/helpers'; import { JwtService } from './services/jwt.service'; import { RoleService } from './services/role.service'; import { UserService } from './services/user.service'; +import { OrchestrationController } from './controllers/orchestration.controller'; const exec = promisify(callbackExec); @@ -551,6 +552,7 @@ export class Server extends AbstractServer { Container.get(SourceControlController), Container.get(WorkflowStatisticsController), Container.get(ExternalSecretsController), + Container.get(OrchestrationController), ]; if (isLdapEnabled()) { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 78be7cc23f..008e9f4e93 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -103,12 +103,12 @@ export abstract class BaseCommand extends Command { process.exit(1); } - protected async initBinaryManager() { + async initBinaryManager() { const binaryDataConfig = config.getEnv('binaryDataManager'); await BinaryDataManager.init(binaryDataConfig, true); } - protected async initExternalHooks() { + async initExternalHooks() { this.externalHooks = Container.get(ExternalHooks); await this.externalHooks.init(); } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index dfaf0e34c0..320c08aa9c 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -27,6 +27,11 @@ import { generateHostInstanceId } from '@/databases/utils/generators'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; +import { eventBus } from '../eventbus'; +import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; +import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric'; +import { getWorkerCommandReceivedHandler } from './workerCommandHandler'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -49,6 +54,10 @@ export class Worker extends BaseCommand { readonly uniqueInstanceId = generateHostInstanceId('worker'); + redisPublisher: RedisServicePubSubPublisher; + + redisSubscriber: RedisServicePubSubSubscriber; + /** * Stop n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -240,9 +249,48 @@ export class Worker extends BaseCommand { await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); + await this.initEventBus(); + await this.initRedis(); + await this.initQueue(); } - async run() { + async initEventBus() { + await eventBus.initialize({ + workerId: this.uniqueInstanceId, + }); + } + + /** + * Initializes the redis connection + * A publishing connection to redis is created to publish events to the event log + * A subscription connection to redis is created to subscribe to commands from the main process + * The subscription connection adds a handler to handle the command messages + */ + async initRedis() { + this.redisPublisher = Container.get(RedisServicePubSubPublisher); + this.redisSubscriber = Container.get(RedisServicePubSubSubscriber); + await this.redisPublisher.init(); + await this.redisPublisher.publishToEventLog( + new EventMessageGeneric({ + eventName: 'n8n.worker.started', + payload: { + workerId: this.uniqueInstanceId, + }, + }), + ); + await this.redisSubscriber.subscribeToCommandChannel(); + this.redisSubscriber.addMessageHandler( + 'WorkerCommandReceivedHandler', + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + getWorkerCommandReceivedHandler({ + uniqueInstanceId: this.uniqueInstanceId, + redisPublisher: this.redisPublisher, + getRunningJobIds: () => Object.keys(Worker.runningJobs), + }), + ); + } + + async initQueue() { // eslint-disable-next-line @typescript-eslint/no-shadow const { flags } = this.parse(Worker); @@ -255,11 +303,6 @@ export class Worker extends BaseCommand { this.runJob(job, this.nodeTypes), ); - this.logger.info('\nn8n worker is now ready'); - this.logger.info(` * Version: ${N8N_VERSION}`); - this.logger.info(` * Concurrency: ${flags.concurrency}`); - this.logger.info(''); - Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => { // Progress of a job got updated which does get used // to communicate that a job got canceled. @@ -305,105 +348,116 @@ export class Worker extends BaseCommand { throw error; } }); + } - if (config.getEnv('queue.health.active')) { - const port = config.getEnv('queue.health.port'); + async setupHealthMonitor() { + const port = config.getEnv('queue.health.port'); - const app = express(); - app.disable('x-powered-by'); + const app = express(); + app.disable('x-powered-by'); - const server = http.createServer(app); + const server = http.createServer(app); - app.get( - '/healthz', + app.get( + '/healthz', + async (req: express.Request, res: express.Response) => { + LoggerProxy.debug('Health check started!'); + + const connection = Db.getConnection(); + + try { + if (!connection.isInitialized) { + // Connection is not active + throw new Error('No active database connection!'); + } + // DB ping + await connection.query('SELECT 1'); + } catch (e) { + LoggerProxy.error('No Database connection!', e as Error); + const error = new ResponseHelper.ServiceUnavailableError('No Database connection!'); + return ResponseHelper.sendErrorResponse(res, error); + } + + // Just to be complete, generally will the worker stop automatically + // if it loses the connection to redis + try { + // Redis ping + await Worker.jobQueue.client.ping(); + } catch (e) { + LoggerProxy.error('No Redis connection!', e as Error); + const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); + return ResponseHelper.sendErrorResponse(res, error); + } + + // Everything fine + const responseData = { + status: 'ok', + }; + + LoggerProxy.debug('Health check completed successfully!'); + + ResponseHelper.sendSuccessResponse(res, responseData, true, 200); + }, + ); + + let presetCredentialsLoaded = false; + const endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); + if (endpointPresetCredentials !== '') { + // POST endpoint to set preset credentials + app.post( + `/${endpointPresetCredentials}`, + rawBodyReader, + bodyParser, async (req: express.Request, res: express.Response) => { - LoggerProxy.debug('Health check started!'); + if (!presetCredentialsLoaded) { + const body = req.body as ICredentialsOverwrite; - const connection = Db.getConnection(); - - try { - if (!connection.isInitialized) { - // Connection is not active - throw new Error('No active database connection!'); - } - // DB ping - await connection.query('SELECT 1'); - } catch (e) { - LoggerProxy.error('No Database connection!', e as Error); - const error = new ResponseHelper.ServiceUnavailableError('No Database connection!'); - return ResponseHelper.sendErrorResponse(res, error); - } - - // Just to be complete, generally will the worker stop automatically - // if it loses the connection to redis - try { - // Redis ping - await Worker.jobQueue.client.ping(); - } catch (e) { - LoggerProxy.error('No Redis connection!', e as Error); - const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); - return ResponseHelper.sendErrorResponse(res, error); - } - - // Everything fine - const responseData = { - status: 'ok', - }; - - LoggerProxy.debug('Health check completed successfully!'); - - ResponseHelper.sendSuccessResponse(res, responseData, true, 200); - }, - ); - - let presetCredentialsLoaded = false; - const endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); - if (endpointPresetCredentials !== '') { - // POST endpoint to set preset credentials - app.post( - `/${endpointPresetCredentials}`, - rawBodyReader, - bodyParser, - async (req: express.Request, res: express.Response) => { - if (!presetCredentialsLoaded) { - const body = req.body as ICredentialsOverwrite; - - if (req.contentType !== 'application/json') { - ResponseHelper.sendErrorResponse( - res, - new Error( - 'Body must be a valid JSON, make sure the content-type is application/json', - ), - ); - return; - } - - CredentialsOverwrites().setData(body); - presetCredentialsLoaded = true; - ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); - } else { + if (req.contentType !== 'application/json') { ResponseHelper.sendErrorResponse( res, - new Error('Preset credentials can be set once'), + new Error( + 'Body must be a valid JSON, make sure the content-type is application/json', + ), ); + return; } - }, + + CredentialsOverwrites().setData(body); + presetCredentialsLoaded = true; + ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); + } else { + ResponseHelper.sendErrorResponse(res, new Error('Preset credentials can be set once')); + } + }, + ); + } + + server.on('error', (error: Error & { code: string }) => { + if (error.code === 'EADDRINUSE') { + this.logger.error( + `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, ); + process.exit(1); } + }); - server.on('error', (error: Error & { code: string }) => { - if (error.code === 'EADDRINUSE') { - this.logger.error( - `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, - ); - process.exit(1); - } - }); + await new Promise((resolve) => server.listen(port, () => resolve())); + await this.externalHooks.run('worker.ready'); + this.logger.info(`\nn8n worker health check via, port ${port}`); + } - await new Promise((resolve) => server.listen(port, () => resolve())); - await this.externalHooks.run('worker.ready'); - this.logger.info(`\nn8n worker health check via, port ${port}`); + async run() { + // eslint-disable-next-line @typescript-eslint/no-shadow + const { flags } = this.parse(Worker); + + this.logger.info('\nn8n worker is now ready'); + this.logger.info(` * Version: ${N8N_VERSION}`); + this.logger.info(` * Concurrency: ${flags.concurrency}`); + this.logger.info(''); + + if (config.getEnv('queue.health.active')) { + await this.setupHealthMonitor(); } // Make sure that the process does not close diff --git a/packages/cli/src/commands/workerCommandHandler.ts b/packages/cli/src/commands/workerCommandHandler.ts new file mode 100644 index 0000000000..874ead410c --- /dev/null +++ b/packages/cli/src/commands/workerCommandHandler.ts @@ -0,0 +1,82 @@ +import { jsonParse, LoggerProxy } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; +import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; +import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; +import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import * as os from 'os'; + +export function getWorkerCommandReceivedHandler(options: { + uniqueInstanceId: string; + redisPublisher: RedisServicePubSubPublisher; + getRunningJobIds: () => string[]; +}) { + return async (channel: string, messageString: string) => { + if (channel === COMMAND_REDIS_CHANNEL) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if (message.targets && !message.targets.includes(options.uniqueInstanceId)) { + return; // early return if the message is not for this worker + } + switch (message.command) { + case 'getStatus': + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + payload: { + workerId: options.uniqueInstanceId, + runningJobs: options.getRunningJobIds(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + net: Object.values(os.networkInterfaces()).flatMap( + (interfaces) => + interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '', + ), + }, + }); + break; + case 'getId': + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + }); + break; + case 'restartEventBus': + await eventBus.restart(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + payload: { + result: 'success', + }, + }); + break; + case 'stopWorker': + // TODO: implement proper shutdown + // await this.stopProcess(); + break; + default: + LoggerProxy.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, + ); + break; + } + } + } + }; +} diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts new file mode 100644 index 0000000000..fd6b68ad4e --- /dev/null +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -0,0 +1,35 @@ +import config from '@/config'; +import { Authorized, Get, RestController } from '@/decorators'; +import { OrchestrationRequest } from '@/requests'; +import { Service } from 'typedi'; +import { OrchestrationService } from '../services/orchestration.service'; + +@Authorized(['global', 'owner']) +@RestController('/orchestration') +@Service() +export class OrchestrationController { + private config = config; + + constructor(private readonly orchestrationService: OrchestrationService) {} + + /** + * These endpoint currently do not return anything, they just trigger the messsage to + * the workers to respond on Redis with their status. + * TODO: these responses need to be forwarded to and handled by the frontend + */ + @Get('/worker/status/:id') + async getWorkersStatus(req: OrchestrationRequest.Get) { + const id = req.params.id; + return this.orchestrationService.getWorkerStatus(id); + } + + @Get('/worker/status') + async getWorkersStatusAll() { + return this.orchestrationService.getWorkerStatus(); + } + + @Get('/worker/ids') + async getWorkerIdsAll() { + return this.orchestrationService.getWorkerIds(); + } +} diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index 28da7c5ecc..c6a0f85bd9 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -9,6 +9,7 @@ export const eventNamesWorkflow = [ 'n8n.workflow.failed', 'n8n.workflow.crashed', ] as const; +export const eventNamesGeneric = ['n8n.worker.started', 'n8n.worker.stopped'] as const; export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const; export const eventNamesAudit = [ 'n8n.audit.user.login.success', @@ -37,14 +38,21 @@ export const eventNamesAudit = [ export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; export type EventNamesAuditType = (typeof eventNamesAudit)[number]; export type EventNamesNodeType = (typeof eventNamesNode)[number]; +export type EventNamesGenericType = (typeof eventNamesGeneric)[number]; export type EventNamesTypes = | EventNamesAuditType | EventNamesWorkflowType | EventNamesNodeType + | EventNamesGenericType | 'n8n.destination.test'; -export const eventNamesAll = [...eventNamesAudit, ...eventNamesWorkflow, ...eventNamesNode]; +export const eventNamesAll = [ + ...eventNamesAudit, + ...eventNamesWorkflow, + ...eventNamesNode, + ...eventNamesGeneric, +]; export type EventMessageTypes = | EventMessageGeneric diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index c6d14bf9a3..43059bf159 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -29,6 +29,7 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import Container from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; +import { OrchestrationService } from '../../services/orchestration.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -37,6 +38,11 @@ export interface MessageWithCallback { confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void; } +export interface MessageEventBusInitializeOptions { + skipRecoveryPass?: boolean; + workerId?: string; +} + export class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; @@ -70,7 +76,7 @@ export class MessageEventBus extends EventEmitter { * * Sets `isInitialized` to `true` once finished. */ - async initialize() { + async initialize(options?: MessageEventBusInitializeOptions): Promise { if (this.isInitialized) { return; } @@ -93,64 +99,75 @@ export class MessageEventBus extends EventEmitter { } LoggerProxy.debug('Initializing event writer'); - this.logWriter = await MessageEventBusLogWriter.getInstance(); + if (options?.workerId) { + // only add 'worker' to log file name since the ID changes on every start and we + // would not be able to recover the log files from the previous run not knowing it + const logBaseName = config.getEnv('eventBus.logWriter.logBaseName') + '-worker'; + this.logWriter = await MessageEventBusLogWriter.getInstance({ + logBaseName, + }); + } else { + this.logWriter = await MessageEventBusLogWriter.getInstance(); + } if (!this.logWriter) { LoggerProxy.warn('Could not initialize event writer'); } - // unsent event check: - // - find unsent messages in current event log(s) - // - cycle event logs and start the logging to a fresh file - // - retry sending events - LoggerProxy.debug('Checking for unsent event messages'); - const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); - LoggerProxy.debug( - `Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, - ); - this.logWriter?.startLogging(); - await this.send(unsentAndUnfinished.unsentMessages); + if (options?.skipRecoveryPass) { + LoggerProxy.debug('Skipping unsent event check'); + } else { + // unsent event check: + // - find unsent messages in current event log(s) + // - cycle event logs and start the logging to a fresh file + // - retry sending events + LoggerProxy.debug('Checking for unsent event messages'); + const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); + LoggerProxy.debug( + `Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, + ); + this.logWriter?.startLogging(); + await this.send(unsentAndUnfinished.unsentMessages); - const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions); + const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions); - if (unfinishedExecutionIds.length > 0) { - LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); - LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.'); - const activeWorkflows = await Container.get(WorkflowRepository).find({ - where: { active: true }, - select: ['id', 'name'], - }); - if (activeWorkflows.length > 0) { - LoggerProxy.info('Currently active workflows:'); - for (const workflowData of activeWorkflows) { - LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); + if (unfinishedExecutionIds.length > 0) { + LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); + LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.'); + const activeWorkflows = await Container.get(WorkflowRepository).find({ + where: { active: true }, + select: ['id', 'name'], + }); + if (activeWorkflows.length > 0) { + LoggerProxy.info('Currently active workflows:'); + for (const workflowData of activeWorkflows) { + LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); + } } - } - - const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); - if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { - await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds); - // if we end up here, it means that the previous recovery process did not finish - // a possible reason would be that recreating the workflow data itself caused e.g an OOM error - // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed - if (recoveryAlreadyAttempted) - LoggerProxy.warn('Skipped recovery process since it previously failed.'); - } else { - // start actual recovery process and write recovery process flag file - this.logWriter?.startRecoveryProcess(); - for (const executionId of unfinishedExecutionIds) { - LoggerProxy.warn(`Attempting to recover execution ${executionId}`); - await recoverExecutionDataFromEventLogMessages( - executionId, - unsentAndUnfinished.unfinishedExecutions[executionId], - true, - ); + const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); + if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { + await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds); + // if we end up here, it means that the previous recovery process did not finish + // a possible reason would be that recreating the workflow data itself caused e.g an OOM error + // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed + if (recoveryAlreadyAttempted) + LoggerProxy.warn('Skipped recovery process since it previously failed.'); + } else { + // start actual recovery process and write recovery process flag file + this.logWriter?.startRecoveryProcess(); + for (const executionId of unfinishedExecutionIds) { + LoggerProxy.warn(`Attempting to recover execution ${executionId}`); + await recoverExecutionDataFromEventLogMessages( + executionId, + unsentAndUnfinished.unfinishedExecutions[executionId], + true, + ); + } } + // remove the recovery process flag file + this.logWriter?.endRecoveryProcess(); } - // remove the recovery process flag file - this.logWriter?.endRecoveryProcess(); } - // if configured, run this test every n ms if (config.getEnv('eventBus.checkUnsentInterval') > 0) { if (this.pushIntervalTimer) { @@ -192,6 +209,12 @@ export class MessageEventBus extends EventEmitter { return result; } + async broadcastRestartEventbusAfterDestinationUpdate() { + if (config.getEnv('executions.mode') === 'queue') { + await Container.get(OrchestrationService).restartEventBus(); + } + } + private async trySendingUnsent(msgs?: EventMessageTypes[]) { const unsentMessages = msgs ?? (await this.getEventsUnsent()); if (unsentMessages.length > 0) { @@ -212,9 +235,15 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } + this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } + async restart() { + await this.close(); + await this.initialize({ skipRecoveryPass: true }); + } + async send(msgs: EventMessageTypes | EventMessageTypes[]) { if (!Array.isArray(msgs)) { msgs = [msgs]; diff --git a/packages/cli/src/requests.ts b/packages/cli/src/requests.ts index 7a63bc889f..82f2c6a7a1 100644 --- a/packages/cli/src/requests.ts +++ b/packages/cli/src/requests.ts @@ -535,3 +535,12 @@ export declare namespace ExternalSecretsRequest { type UpdateProvider = AuthenticatedRequest<{ provider: string }>; } + +// ---------------------------------- +// /orchestration +// ---------------------------------- +// +export declare namespace OrchestrationRequest { + type GetAll = AuthenticatedRequest; + type Get = AuthenticatedRequest<{ id: string }, {}, {}, {}>; +} diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts new file mode 100644 index 0000000000..d3f8c7e27e --- /dev/null +++ b/packages/cli/src/services/orchestration.service.ts @@ -0,0 +1,172 @@ +import { Service } from 'typedi'; +import { RedisService } from './redis.service'; +import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; +import { LoggerProxy, jsonParse } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; +import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions'; +import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from './redis/RedisServiceCommands'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './redis/RedisServiceHelper'; + +@Service() +export class OrchestrationService { + private initialized = false; + + private _uniqueInstanceId = ''; + + get uniqueInstanceId(): string { + return this._uniqueInstanceId; + } + + redisPublisher: RedisServicePubSubPublisher; + + redisSubscriber: RedisServicePubSubSubscriber; + + constructor(readonly redisService: RedisService) {} + + async init(uniqueInstanceId: string) { + this._uniqueInstanceId = uniqueInstanceId; + await this.initPublisher(); + await this.initSubscriber(); + this.initialized = true; + } + + async shutdown() { + await this.redisPublisher?.destroy(); + await this.redisSubscriber?.destroy(); + } + + private async initPublisher() { + this.redisPublisher = await this.redisService.getPubSubPublisher(); + } + + private async initSubscriber() { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + // TODO: these are all proof of concept implementations for the moment + // until worker communication is implemented + // #region proof of concept + await this.redisSubscriber.subscribeToEventLog(); + await this.redisSubscriber.subscribeToWorkerResponseChannel(); + await this.redisSubscriber.subscribeToCommandChannel(); + + this.redisSubscriber.addMessageHandler( + 'OrchestrationMessageReceiver', + async (channel: string, messageString: string) => { + // TODO: this is a proof of concept implementation to forward events to the main instance's event bus + // Events are arriving through a pub/sub channel and are forwarded to the eventBus + // In the future, a stream should probably replace this implementation entirely + if (channel === EVENT_BUS_REDIS_CHANNEL) { + await this.handleEventBusMessage(messageString); + } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await this.handleWorkerResponseMessage(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await this.handleCommandMessage(messageString); + } + }, + ); + } + + async handleWorkerResponseMessage(messageString: string) { + const workerResponse = jsonParse(messageString); + if (workerResponse) { + // TODO: Handle worker response + LoggerProxy.debug('Received worker response', workerResponse); + } + return workerResponse; + } + + async handleEventBusMessage(messageString: string) { + const eventData = jsonParse(messageString); + if (eventData) { + const eventMessage = getEventMessageObjectByType(eventData); + if (eventMessage) { + await eventBus.send(eventMessage); + } + } + return eventData; + } + + async handleCommandMessage(messageString: string) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if ( + message.senderId === this.uniqueInstanceId || + (message.targets && !message.targets.includes(this.uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'restartEventBus': + await eventBus.restart(); + break; + } + return message; + } + return; + } + + async getWorkerStatus(id?: string) { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'getStatus', + targets: id ? [id] : undefined, + }); + } + + async getWorkerIds() { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'getId', + }); + } + + // TODO: not implemented yet on worker side + async stopWorker(id?: string) { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'stopWorker', + targets: id ? [id] : undefined, + }); + } + + async restartEventBus(id?: string) { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'restartEventBus', + targets: id ? [id] : undefined, + }); + } +} diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index cd70a32d6e..5796560d4b 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,12 +1,13 @@ -export type RedisServiceCommand = 'getStatus' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands +export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands /** * An object to be sent via Redis pub/sub from the main process to the workers. * @field command: The command to be executed. * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. - * @field args: Optional arguments to be passed to the command. + * @field payload: Optional arguments to be sent with the command. */ type RedisServiceBaseCommand = { + senderId: string; command: RedisServiceCommand; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; @@ -15,7 +16,38 @@ type RedisServiceBaseCommand = { export type RedisServiceWorkerResponseObject = { workerId: string; -} & RedisServiceBaseCommand; +} & ( + | RedisServiceBaseCommand + | { + command: 'getStatus'; + payload: { + workerId: string; + runningJobs: string[]; + freeMem: number; + totalMem: number; + uptime: number; + loadAvg: number[]; + cpus: string[]; + arch: string; + platform: NodeJS.Platform; + hostname: string; + net: string[]; + }; + } + | { + command: 'getId'; + } + | { + command: 'restartEventBus'; + payload: { + result: 'success' | 'error'; + error?: string; + }; + } + | { + command: 'stopWorker'; + } +); export type RedisServiceCommandObject = { targets?: string[]; diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index cb7b05d41f..404544d6f9 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -23,11 +23,11 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { if (!this.redisClient) { await this.init(); } - await this.redisClient?.subscribe(channel, (error, count: number) => { + await this.redisClient?.subscribe(channel, (error, _count: number) => { if (error) { Logger.error(`Error subscribing to channel ${channel}`); } else { - Logger.debug(`Subscribed ${count.toString()} to eventlog channel`); + Logger.debug(`Subscribed Redis PubSub client to channel: ${channel}`); } }); } diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts new file mode 100644 index 0000000000..d860579ee3 --- /dev/null +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -0,0 +1,81 @@ +import { mockInstance } from '../shared/utils/'; +import { Worker } from '@/commands/worker'; +import * as Config from '@oclif/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { Telemetry } from '@/telemetry'; +import { getLogger } from '@/Logger'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; +import { BinaryDataManager } from 'n8n-core'; +import { CacheService } from '@/services/cache.service'; +import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; +import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; +import { CredentialTypes } from '@/CredentialTypes'; +import { NodeTypes } from '@/NodeTypes'; +import { InternalHooks } from '@/InternalHooks'; +import { PostHogClient } from '@/posthog'; +import { RedisService } from '@/services/redis.service'; + +const config: Config.IConfig = new Config.Config({ root: __dirname }); + +beforeAll(async () => { + LoggerProxy.init(getLogger()); + mockInstance(Telemetry); + mockInstance(PostHogClient); + mockInstance(InternalHooks); + mockInstance(CacheService); + mockInstance(ExternalSecretsManager); + mockInstance(BinaryDataManager); + mockInstance(MessageEventBus); + mockInstance(LoadNodesAndCredentials); + mockInstance(CredentialTypes); + mockInstance(NodeTypes); + mockInstance(RedisService); + mockInstance(RedisServicePubSubPublisher); + mockInstance(RedisServicePubSubSubscriber); +}); + +test('worker initializes all its components', async () => { + const worker = new Worker([], config); + + jest.spyOn(worker, 'init'); + jest.spyOn(worker, 'initLicense').mockImplementation(async () => {}); + jest.spyOn(worker, 'initBinaryManager').mockImplementation(async () => {}); + jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {}); + jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {}); + jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {}); + jest.spyOn(worker, 'initRedis'); + jest.spyOn(RedisServicePubSubPublisher.prototype, 'init').mockImplementation(async () => {}); + jest + .spyOn(RedisServicePubSubPublisher.prototype, 'publishToEventLog') + .mockImplementation(async () => {}); + jest + .spyOn(RedisServicePubSubSubscriber.prototype, 'subscribeToCommandChannel') + .mockImplementation(async () => {}); + jest + .spyOn(RedisServicePubSubSubscriber.prototype, 'addMessageHandler') + .mockImplementation(async () => {}); + jest.spyOn(worker, 'initQueue').mockImplementation(async () => {}); + + await worker.init(); + + expect(worker.uniqueInstanceId).toBeDefined(); + expect(worker.uniqueInstanceId).toContain('worker'); + expect(worker.uniqueInstanceId.length).toBeGreaterThan(15); + expect(worker.initLicense).toHaveBeenCalled(); + expect(worker.initBinaryManager).toHaveBeenCalled(); + expect(worker.initExternalHooks).toHaveBeenCalled(); + expect(worker.initExternalSecrets).toHaveBeenCalled(); + expect(worker.initEventBus).toHaveBeenCalled(); + expect(worker.initRedis).toHaveBeenCalled(); + expect(worker.redisPublisher).toBeDefined(); + expect(worker.redisPublisher.init).toHaveBeenCalled(); + expect(worker.redisPublisher.publishToEventLog).toHaveBeenCalled(); + expect(worker.redisSubscriber).toBeDefined(); + expect(worker.redisSubscriber.subscribeToCommandChannel).toHaveBeenCalled(); + expect(worker.redisSubscriber.addMessageHandler).toHaveBeenCalled(); + expect(worker.initQueue).toHaveBeenCalled(); + + jest.restoreAllMocks(); +}); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts new file mode 100644 index 0000000000..18204cea2b --- /dev/null +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -0,0 +1,140 @@ +import Container from 'typedi'; +import config from '@/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; +import { OrchestrationService } from '@/services/orchestration.service'; +import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; +import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; +import { eventBus } from '@/eventbus'; +import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers'; +import { RedisService } from '@/services/redis.service'; +import { mockInstance } from '../../integration/shared/utils'; + +const os = Container.get(OrchestrationService); + +function setDefaultConfig() { + config.set('executions.mode', 'queue'); +} + +const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { + senderId: 'test', + workerId: 'test', + command: 'restartEventBus', + payload: { + result: 'success', + }, +}; + +const eventBusMessage = new EventMessageWorkflow({ + eventName: 'n8n.workflow.success', + id: 'test', + message: 'test', + payload: { + test: 'test', + }, +}); + +describe('Orchestration Service', () => { + beforeAll(async () => { + mockInstance(RedisService); + LoggerProxy.init(getLogger()); + jest.mock('ioredis', () => { + const Redis = require('ioredis-mock'); + if (typeof Redis === 'object') { + // the first mock is an ioredis shim because ioredis-mock depends on it + // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 + return { + Command: { _transformer: { argument: {}, reply: {} } }, + }; + } + // second mock for our code + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function (...args: any) { + return new Redis(args); + }; + }); + jest.mock('../../../src/services/redis/RedisServicePubSubPublisher', () => { + return jest.fn().mockImplementation(() => { + return { + init: jest.fn(), + publishToEventLog: jest.fn(), + publishToWorkerChannel: jest.fn(), + destroy: jest.fn(), + }; + }); + }); + jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber', () => { + return jest.fn().mockImplementation(() => { + return { + subscribeToCommandChannel: jest.fn(), + destroy: jest.fn(), + }; + }); + }); + setDefaultConfig(); + }); + + afterAll(async () => { + jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); + jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); + }); + + test('should initialize', async () => { + await os.init('test-orchestration-service'); + expect(os.redisPublisher).toBeDefined(); + expect(os.redisSubscriber).toBeDefined(); + expect(os.uniqueInstanceId).toBeDefined(); + }); + + test('should handle worker responses', async () => { + const response = await os.handleWorkerResponseMessage( + JSON.stringify(workerRestartEventbusResponse), + ); + expect(response.command).toEqual('restartEventBus'); + }); + + test('should handle event messages', async () => { + const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage)); + jest.spyOn(eventBus, 'send'); + jest.spyOn(EventHelpers, 'getEventMessageObjectByType'); + expect(eventBus.send).toHaveBeenCalled(); + expect(response.eventName).toEqual('n8n.workflow.success'); + jest.spyOn(eventBus, 'send').mockRestore(); + jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore(); + }); + + test('should handle command messages from others', async () => { + jest.spyOn(eventBus, 'restart'); + const responseFalseId = await os.handleCommandMessage( + JSON.stringify(workerRestartEventbusResponse), + ); + expect(responseFalseId).toBeDefined(); + expect(responseFalseId!.command).toEqual('restartEventBus'); + expect(responseFalseId!.senderId).toEqual('test'); + expect(eventBus.restart).toHaveBeenCalled(); + jest.spyOn(eventBus, 'restart').mockRestore(); + }); + + test('should reject command messages from iteslf', async () => { + jest.spyOn(eventBus, 'restart'); + const response = await os.handleCommandMessage( + JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), + ); + expect(response).toBeDefined(); + expect(response!.command).toEqual('restartEventBus'); + expect(response!.senderId).toEqual(os.uniqueInstanceId); + expect(eventBus.restart).not.toHaveBeenCalled(); + jest.spyOn(eventBus, 'restart').mockRestore(); + }); + + test('should send command messages', async () => { + jest.spyOn(os.redisPublisher, 'publishToCommandChannel'); + await os.getWorkerIds(); + expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); + jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); + }); + + afterAll(async () => { + await os.shutdown(); + }); +});