diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 3d237e9f49..d05542cb6e 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -295,7 +295,7 @@ describe('enqueueExecution', () => { addJob.mockRejectedValueOnce(error); // @ts-expect-error Private method - await expect(runner.enqueueExecution('1', data)).rejects.toThrowError(error); + await expect(runner.enqueueExecution('1', 'workflow-xyz', data)).rejects.toThrowError(error); expect(setupQueue).toHaveBeenCalledTimes(1); }); diff --git a/packages/cli/src/eventbus/event-message-classes/event-message-queue.ts b/packages/cli/src/eventbus/event-message-classes/event-message-queue.ts new file mode 100644 index 0000000000..a9af43a76d --- /dev/null +++ b/packages/cli/src/eventbus/event-message-classes/event-message-queue.ts @@ -0,0 +1,44 @@ +import type { JsonObject } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; + +import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message'; +import type { AbstractEventMessageOptions } from './abstract-event-message-options'; +import type { AbstractEventPayload } from './abstract-event-payload'; + +export interface EventPayloadQueue extends AbstractEventPayload { + workflowId: string; + jobId: string; + executionId: string; + hostId: string; +} + +export interface EventMessageQueueOptions extends AbstractEventMessageOptions { + payload?: EventPayloadQueue; +} + +export class EventMessageQueue extends AbstractEventMessage { + readonly __type = EventMessageTypeNames.runner; + + payload: EventPayloadQueue; + + constructor(options: EventMessageQueueOptions) { + super(options); + if (options.payload) this.setPayload(options.payload); + if (options.anonymize) { + this.anonymize(); + } + } + + setPayload(payload: EventPayloadQueue): this { + this.payload = payload; + return this; + } + + deserialize(data: JsonObject): this { + if (isEventMessageOptionsWithType(data, this.__type)) { + this.setOptionsOrDefault(data); + if (data.payload) this.setPayload(data.payload as EventPayloadQueue); + } + return this; + } +} diff --git a/packages/cli/src/eventbus/event-message-classes/index.ts b/packages/cli/src/eventbus/event-message-classes/index.ts index bf88933d53..059b90686d 100644 --- a/packages/cli/src/eventbus/event-message-classes/index.ts +++ b/packages/cli/src/eventbus/event-message-classes/index.ts @@ -3,6 +3,7 @@ import type { EventMessageAudit } from './event-message-audit'; import type { EventMessageExecution } from './event-message-execution'; import type { EventMessageGeneric } from './event-message-generic'; import type { EventMessageNode } from './event-message-node'; +import type { EventMessageQueue } from './event-message-queue'; import type { EventMessageRunner } from './event-message-runner'; import type { EventMessageWorkflow } from './event-message-workflow'; @@ -32,6 +33,16 @@ export const eventNamesRunner = [ export type EventNamesRunnerType = (typeof eventNamesRunner)[number]; +export const eventNamesQueue = [ + 'n8n.queue.job.enqueued', + 'n8n.queue.job.dequeued', + 'n8n.queue.job.completed', + 'n8n.queue.job.failed', + 'n8n.queue.job.stalled', +] as const; + +export type EventNamesQueueType = (typeof eventNamesQueue)[number]; + export const eventNamesWorkflow = [ 'n8n.workflow.started', 'n8n.workflow.success', @@ -85,6 +96,7 @@ export type EventNamesTypes = | EventNamesGenericType | EventNamesAiNodesType | EventNamesRunnerType + | EventNamesQueueType | 'n8n.destination.test'; export const eventNamesAll = [ @@ -94,6 +106,7 @@ export const eventNamesAll = [ ...eventNamesGeneric, ...eventNamesAiNodes, ...eventNamesRunner, + ...eventNamesQueue, ]; export type EventMessageTypes = @@ -103,4 +116,5 @@ export type EventMessageTypes = | EventMessageNode | EventMessageExecution | EventMessageAiNode + | EventMessageQueue | EventMessageRunner; diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index 3f268c3d13..e154583ff7 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -32,6 +32,8 @@ import { } from '../event-message-classes/event-message-generic'; import type { EventMessageNodeOptions } from '../event-message-classes/event-message-node'; import { EventMessageNode } from '../event-message-classes/event-message-node'; +import type { EventMessageQueueOptions } from '../event-message-classes/event-message-queue'; +import { EventMessageQueue } from '../event-message-classes/event-message-queue'; import type { EventMessageRunnerOptions } from '../event-message-classes/event-message-runner'; import { EventMessageRunner } from '../event-message-classes/event-message-runner'; import type { EventMessageWorkflowOptions } from '../event-message-classes/event-message-workflow'; @@ -425,4 +427,8 @@ export class MessageEventBus extends EventEmitter { async sendRunnerEvent(options: EventMessageRunnerOptions) { await this.send(new EventMessageRunner(options)); } + + async sendQueueEvent(options: EventMessageQueueOptions) { + await this.send(new EventMessageQueue(options)); + } } diff --git a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts index 93b7ab93d5..2cd2019018 100644 --- a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts @@ -1,5 +1,6 @@ import type { IWorkflowDb } from '@n8n/db'; import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; @@ -10,7 +11,9 @@ import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-rela describe('LogStreamingEventRelay', () => { const eventBus = mock(); const eventService = new EventService(); - new LogStreamingEventRelay(eventService, eventBus).init(); + const hostId = 'host-xyz'; + const instanceSettings = mock({ hostId }); + new LogStreamingEventRelay(eventService, eventBus, instanceSettings).init(); afterEach(() => { jest.clearAllMocks(); @@ -223,6 +226,35 @@ describe('LogStreamingEventRelay', () => { }); }); + it('should log job completion on `workflow-post-execute` for successful job', () => { + const runData = mock({ + finished: true, + status: 'success', + mode: 'manual', + jobId: '12345', + data: { resultData: {} }, + }); + + const event = { + executionId: 'exec-123', + userId: 'user-456', + workflow: mock({ id: 'wf-789', name: 'Test Workflow' }), + runData, + }; + + eventService.emit('workflow-post-execute', event); + + expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({ + eventName: 'n8n.queue.job.completed', + payload: { + executionId: 'exec-123', + workflowId: 'wf-789', + hostId: 'host-xyz', + jobId: '12345', + }, + }); + }); + it('should log on `workflow-post-execute` event for failed execution', () => { const runData = mock({ status: 'error', @@ -266,6 +298,45 @@ describe('LogStreamingEventRelay', () => { }, }); }); + + it('should log job failure on `workflow-post-execute` for failed job', () => { + const runData = mock({ + finished: false, + status: 'error', + mode: 'manual', + jobId: '67890', + data: { + resultData: { + lastNodeExecuted: 'some-node', + // @ts-expect-error Partial mock + error: { + node: mock({ type: 'some-type' }), + message: 'some-message', + }, + errorMessage: 'some-message', + }, + }, + }) as unknown as IRun; + + const event = { + executionId: 'exec-456', + userId: 'user-789', + workflow: mock({ id: 'wf-101', name: 'Failed Workflow' }), + runData, + }; + + eventService.emit('workflow-post-execute', event); + + expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({ + eventName: 'n8n.queue.job.failed', + payload: { + executionId: 'exec-456', + workflowId: 'wf-101', + hostId: 'host-xyz', + jobId: '67890', + }, + }); + }); }); describe('user events', () => { @@ -1308,4 +1379,69 @@ describe('LogStreamingEventRelay', () => { }); }); }); + + describe('job events', () => { + it('should log on `job-enqueued` event', () => { + const event: RelayEventMap['job-enqueued'] = { + executionId: 'exec-1', + workflowId: 'wf-2', + hostId, + jobId: 'job-4', + }; + + eventService.emit('job-enqueued', event); + + expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({ + eventName: 'n8n.queue.job.enqueued', + payload: { + executionId: 'exec-1', + workflowId: 'wf-2', + hostId, + jobId: 'job-4', + }, + }); + }); + + it('should log on `job-dequeued` event', () => { + const event: RelayEventMap['job-dequeued'] = { + executionId: 'exec-1', + workflowId: 'wf-2', + hostId, + jobId: 'job-4', + }; + + eventService.emit('job-dequeued', event); + + expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({ + eventName: 'n8n.queue.job.dequeued', + payload: { + executionId: 'exec-1', + workflowId: 'wf-2', + hostId, + jobId: 'job-4', + }, + }); + }); + + it('should log on `job-stalled` event', () => { + const event: RelayEventMap['job-stalled'] = { + executionId: 'exec-1', + workflowId: 'wf-2', + hostId, + jobId: 'job-4', + }; + + eventService.emit('job-stalled', event); + + expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({ + eventName: 'n8n.queue.job.stalled', + payload: { + executionId: 'exec-1', + workflowId: 'wf-2', + hostId, + jobId: 'job-4', + }, + }); + }); + }); }); diff --git a/packages/cli/src/events/maps/relay.event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts index 30efd876e9..d29254deb1 100644 --- a/packages/cli/src/events/maps/relay.event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -514,4 +514,29 @@ export type RelayEventMap = { }; // #endregion + + // #region queue + + 'job-enqueued': { + executionId: string; + workflowId: string; + hostId: string; + jobId: string; + }; + + 'job-dequeued': { + executionId: string; + workflowId: string; + hostId: string; + jobId: string; + }; + + 'job-stalled': { + executionId: string; + workflowId: string; + hostId: string; + jobId: string; + }; + + // #endregion } & AiEventMap; diff --git a/packages/cli/src/events/relays/log-streaming.event-relay.ts b/packages/cli/src/events/relays/log-streaming.event-relay.ts index 246309688c..afd108f2a1 100644 --- a/packages/cli/src/events/relays/log-streaming.event-relay.ts +++ b/packages/cli/src/events/relays/log-streaming.event-relay.ts @@ -1,5 +1,6 @@ import { Redactable } from '@n8n/decorators'; import { Service } from '@n8n/di'; +import { InstanceSettings } from 'n8n-core'; import type { IWorkflowBase } from 'n8n-workflow'; import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'; @@ -12,6 +13,7 @@ export class LogStreamingEventRelay extends EventRelay { constructor( readonly eventService: EventService, private readonly eventBus: MessageEventBus, + private readonly instanceSettings: InstanceSettings, ) { super(eventService); } @@ -65,6 +67,9 @@ export class LogStreamingEventRelay extends EventRelay { 'ai-vector-store-updated': (event) => this.aiVectorStoreUpdated(event), 'runner-task-requested': (event) => this.runnerTaskRequested(event), 'runner-response-received': (event) => this.runnerResponseReceived(event), + 'job-enqueued': (event) => this.jobEnqueued(event), + 'job-dequeued': (event) => this.jobDequeued(event), + 'job-stalled': (event) => this.jobStalled(event), }); } @@ -143,10 +148,11 @@ export class LogStreamingEventRelay extends EventRelay { } private workflowPostExecute(event: RelayEventMap['workflow-post-execute']) { - const { runData, workflow, ...rest } = event; + const { runData, workflow, executionId, ...rest } = event; const payload = { ...rest, + executionId, success: !!runData?.finished, // despite the `success` name, this reports `finished` state isManual: runData?.mode === 'manual', workflowId: workflow.id, @@ -159,6 +165,18 @@ export class LogStreamingEventRelay extends EventRelay { payload, }); + if (runData?.jobId) { + void this.eventBus.sendQueueEvent({ + eventName: 'n8n.queue.job.completed', + payload: { + executionId, + workflowId: workflow.id, + hostId: this.instanceSettings.hostId, + jobId: runData.jobId.toString(), + }, + }); + } + return; } @@ -174,6 +192,18 @@ export class LogStreamingEventRelay extends EventRelay { errorMessage: runData?.data.resultData.error?.message.toString(), }, }); + + if (runData?.jobId) { + void this.eventBus.sendQueueEvent({ + eventName: 'n8n.queue.job.failed', + payload: { + executionId, + workflowId: workflow.id, + hostId: this.instanceSettings.hostId, + jobId: runData.jobId.toString(), + }, + }); + } } // #endregion @@ -558,4 +588,29 @@ export class LogStreamingEventRelay extends EventRelay { } // #endregion + + // #region queue + + private jobEnqueued(payload: RelayEventMap['job-enqueued']) { + void this.eventBus.sendQueueEvent({ + eventName: 'n8n.queue.job.enqueued', + payload, + }); + } + + private jobDequeued(payload: RelayEventMap['job-dequeued']) { + void this.eventBus.sendQueueEvent({ + eventName: 'n8n.queue.job.dequeued', + payload, + }); + } + + private jobStalled(payload: RelayEventMap['job-stalled']) { + void this.eventBus.sendQueueEvent({ + eventName: 'n8n.queue.job.stalled', + payload, + }); + } + + // #endregion } diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 8b7efe2b55..915fbea143 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -86,6 +86,13 @@ export class ScalingService { void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => { try { + this.eventService.emit('job-dequeued', { + executionId: job.data.executionId, + workflowId: job.data.workflowId, + hostId: this.instanceSettings.hostId, + jobId: job.id.toString(), + }); + if (!this.hasValidJobData(job)) { throw new UnexpectedError('Worker received invalid job', { extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) }, @@ -196,6 +203,12 @@ export class ScalingService { const jobId = job.id; this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { executionId, jobId }); + this.eventService.emit('job-enqueued', { + executionId, + workflowId: jobData.workflowId, + hostId: this.instanceSettings.hostId, + jobId: jobId.toString(), + }); return job; } diff --git a/packages/cli/src/scaling/scaling.types.ts b/packages/cli/src/scaling/scaling.types.ts index 3c69294172..91f89c5e03 100644 --- a/packages/cli/src/scaling/scaling.types.ts +++ b/packages/cli/src/scaling/scaling.types.ts @@ -10,6 +10,7 @@ export type Job = Bull.Job; export type JobId = Job['id']; export type JobData = { + workflowId: string; executionId: string; loadStaticData: boolean; pushRef?: string; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index f82e7867f1..d60136fcdf 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -38,6 +38,8 @@ import type { Job, JobData } from '@/scaling/scaling.types'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; +import { EventService } from './events/event.service'; + @Service() export class WorkflowRunner { private scalingService: ScalingService; @@ -55,6 +57,7 @@ export class WorkflowRunner { private readonly instanceSettings: InstanceSettings, private readonly manualExecutionService: ManualExecutionService, private readonly executionDataService: ExecutionDataService, + private readonly eventService: EventService, ) {} setExecutionMode(mode: 'regular' | 'queue') { @@ -167,7 +170,7 @@ export class WorkflowRunner { : this.executionsMode === 'queue' && data.executionMode !== 'manual'; if (shouldEnqueue) { - await this.enqueueExecution(executionId, data, loadStaticData, realtime); + await this.enqueueExecution(executionId, workflowId, data, loadStaticData, realtime); } else { await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId); } @@ -335,11 +338,13 @@ export class WorkflowRunner { private async enqueueExecution( executionId: string, + workflowId: string, data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, ): Promise { const jobData: JobData = { + workflowId, executionId, loadStaticData: !!loadStaticData, pushRef: data.pushRef, @@ -400,6 +405,12 @@ export class WorkflowRunner { error.message.includes('job stalled more than maxStalledCount') ) { error = new MaxStalledCountError(error); + this.eventService.emit('job-stalled', { + executionId: job.data.executionId, + workflowId: job.data.workflowId, + hostId: this.instanceSettings.hostId, + jobId: job.id.toString(), + }); } // We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the @@ -432,6 +443,7 @@ export class WorkflowRunner { stoppedAt: fullExecutionData.stoppedAt, status: fullExecutionData.status, data: fullExecutionData.data, + jobId: job.id.toString(), }; this.activeExecutions.finalizeExecution(executionId, runData); diff --git a/packages/frontend/@n8n/i18n/src/locales/en.json b/packages/frontend/@n8n/i18n/src/locales/en.json index b5e5f2bc0e..c88426bbc2 100644 --- a/packages/frontend/@n8n/i18n/src/locales/en.json +++ b/packages/frontend/@n8n/i18n/src/locales/en.json @@ -2016,6 +2016,8 @@ "settings.log-streaming.eventGroup.n8n.node.info": "Will send step-wise execution events every time a node executes. Please note that this can lead to a high frequency of logged events and is probably not suitable for general use.", "settings.log-streaming.eventGroup.n8n.runner": "Runner tasks", "settings.log-streaming.eventGroup.n8n.runner.info": "Will send an event when a Code node execution is requested from a task runner, and when a response is received from the runner with the result.", + "settings.log-streaming.eventGroup.n8n.queue": "Queue events", + "settings.log-streaming.eventGroup.n8n.queue.info": "Will send an event when a queue-related event occurs, e.g. enqueuing, dequeueing, completion, failure, or stalling.", "settings.log-streaming.eventGroup.n8n.worker": "Worker", "settings.log-streaming.$$AbstractMessageEventBusDestination": "Generic", "settings.log-streaming.$$MessageEventBusDestinationWebhook": "Webhook", diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index ce77b145de..00fcb57a81 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -2143,6 +2143,9 @@ export interface IRun { startedAt: Date; stoppedAt?: Date; status: ExecutionStatus; + + /** ID of the job this execution belongs to. Only in scaling mode. */ + jobId?: string; } // Contains all the data which is needed to execute a workflow and so also to diff --git a/packages/workflow/src/message-event-bus.ts b/packages/workflow/src/message-event-bus.ts index 940381ec96..0f1e4dc216 100644 --- a/packages/workflow/src/message-event-bus.ts +++ b/packages/workflow/src/message-event-bus.ts @@ -15,6 +15,7 @@ export const enum EventMessageTypeNames { execution = '$$EventMessageExecution', aiNode = '$$EventMessageAiNode', runner = '$$EventMessageRunner', + queue = '$$EventMessageQueue', } export const enum MessageEventBusDestinationTypeNames {