feat(core): Add queue events to log streaming (#16427)

This commit is contained in:
Iván Ovejero
2025-06-17 16:26:33 +02:00
committed by GitHub
parent 84fa924ce6
commit 93ac46c581
13 changed files with 316 additions and 4 deletions

View File

@@ -295,7 +295,7 @@ describe('enqueueExecution', () => {
addJob.mockRejectedValueOnce(error);
// @ts-expect-error Private method
await expect(runner.enqueueExecution('1', data)).rejects.toThrowError(error);
await expect(runner.enqueueExecution('1', 'workflow-xyz', data)).rejects.toThrowError(error);
expect(setupQueue).toHaveBeenCalledTimes(1);
});

View File

@@ -0,0 +1,44 @@
import type { JsonObject } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message';
import type { AbstractEventMessageOptions } from './abstract-event-message-options';
import type { AbstractEventPayload } from './abstract-event-payload';
export interface EventPayloadQueue extends AbstractEventPayload {
workflowId: string;
jobId: string;
executionId: string;
hostId: string;
}
export interface EventMessageQueueOptions extends AbstractEventMessageOptions {
payload?: EventPayloadQueue;
}
export class EventMessageQueue extends AbstractEventMessage {
readonly __type = EventMessageTypeNames.runner;
payload: EventPayloadQueue;
constructor(options: EventMessageQueueOptions) {
super(options);
if (options.payload) this.setPayload(options.payload);
if (options.anonymize) {
this.anonymize();
}
}
setPayload(payload: EventPayloadQueue): this {
this.payload = payload;
return this;
}
deserialize(data: JsonObject): this {
if (isEventMessageOptionsWithType(data, this.__type)) {
this.setOptionsOrDefault(data);
if (data.payload) this.setPayload(data.payload as EventPayloadQueue);
}
return this;
}
}

View File

@@ -3,6 +3,7 @@ import type { EventMessageAudit } from './event-message-audit';
import type { EventMessageExecution } from './event-message-execution';
import type { EventMessageGeneric } from './event-message-generic';
import type { EventMessageNode } from './event-message-node';
import type { EventMessageQueue } from './event-message-queue';
import type { EventMessageRunner } from './event-message-runner';
import type { EventMessageWorkflow } from './event-message-workflow';
@@ -32,6 +33,16 @@ export const eventNamesRunner = [
export type EventNamesRunnerType = (typeof eventNamesRunner)[number];
export const eventNamesQueue = [
'n8n.queue.job.enqueued',
'n8n.queue.job.dequeued',
'n8n.queue.job.completed',
'n8n.queue.job.failed',
'n8n.queue.job.stalled',
] as const;
export type EventNamesQueueType = (typeof eventNamesQueue)[number];
export const eventNamesWorkflow = [
'n8n.workflow.started',
'n8n.workflow.success',
@@ -85,6 +96,7 @@ export type EventNamesTypes =
| EventNamesGenericType
| EventNamesAiNodesType
| EventNamesRunnerType
| EventNamesQueueType
| 'n8n.destination.test';
export const eventNamesAll = [
@@ -94,6 +106,7 @@ export const eventNamesAll = [
...eventNamesGeneric,
...eventNamesAiNodes,
...eventNamesRunner,
...eventNamesQueue,
];
export type EventMessageTypes =
@@ -103,4 +116,5 @@ export type EventMessageTypes =
| EventMessageNode
| EventMessageExecution
| EventMessageAiNode
| EventMessageQueue
| EventMessageRunner;

View File

@@ -32,6 +32,8 @@ import {
} from '../event-message-classes/event-message-generic';
import type { EventMessageNodeOptions } from '../event-message-classes/event-message-node';
import { EventMessageNode } from '../event-message-classes/event-message-node';
import type { EventMessageQueueOptions } from '../event-message-classes/event-message-queue';
import { EventMessageQueue } from '../event-message-classes/event-message-queue';
import type { EventMessageRunnerOptions } from '../event-message-classes/event-message-runner';
import { EventMessageRunner } from '../event-message-classes/event-message-runner';
import type { EventMessageWorkflowOptions } from '../event-message-classes/event-message-workflow';
@@ -425,4 +427,8 @@ export class MessageEventBus extends EventEmitter {
async sendRunnerEvent(options: EventMessageRunnerOptions) {
await this.send(new EventMessageRunner(options));
}
async sendQueueEvent(options: EventMessageQueueOptions) {
await this.send(new EventMessageQueue(options));
}
}

View File

@@ -1,5 +1,6 @@
import type { IWorkflowDb } from '@n8n/db';
import { mock } from 'jest-mock-extended';
import type { InstanceSettings } from 'n8n-core';
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
@@ -10,7 +11,9 @@ import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-rela
describe('LogStreamingEventRelay', () => {
const eventBus = mock<MessageEventBus>();
const eventService = new EventService();
new LogStreamingEventRelay(eventService, eventBus).init();
const hostId = 'host-xyz';
const instanceSettings = mock<InstanceSettings>({ hostId });
new LogStreamingEventRelay(eventService, eventBus, instanceSettings).init();
afterEach(() => {
jest.clearAllMocks();
@@ -223,6 +226,35 @@ describe('LogStreamingEventRelay', () => {
});
});
it('should log job completion on `workflow-post-execute` for successful job', () => {
const runData = mock<IRun>({
finished: true,
status: 'success',
mode: 'manual',
jobId: '12345',
data: { resultData: {} },
});
const event = {
executionId: 'exec-123',
userId: 'user-456',
workflow: mock<IWorkflowBase>({ id: 'wf-789', name: 'Test Workflow' }),
runData,
};
eventService.emit('workflow-post-execute', event);
expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({
eventName: 'n8n.queue.job.completed',
payload: {
executionId: 'exec-123',
workflowId: 'wf-789',
hostId: 'host-xyz',
jobId: '12345',
},
});
});
it('should log on `workflow-post-execute` event for failed execution', () => {
const runData = mock<IRun>({
status: 'error',
@@ -266,6 +298,45 @@ describe('LogStreamingEventRelay', () => {
},
});
});
it('should log job failure on `workflow-post-execute` for failed job', () => {
const runData = mock<IRun>({
finished: false,
status: 'error',
mode: 'manual',
jobId: '67890',
data: {
resultData: {
lastNodeExecuted: 'some-node',
// @ts-expect-error Partial mock
error: {
node: mock<INode>({ type: 'some-type' }),
message: 'some-message',
},
errorMessage: 'some-message',
},
},
}) as unknown as IRun;
const event = {
executionId: 'exec-456',
userId: 'user-789',
workflow: mock<IWorkflowBase>({ id: 'wf-101', name: 'Failed Workflow' }),
runData,
};
eventService.emit('workflow-post-execute', event);
expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({
eventName: 'n8n.queue.job.failed',
payload: {
executionId: 'exec-456',
workflowId: 'wf-101',
hostId: 'host-xyz',
jobId: '67890',
},
});
});
});
describe('user events', () => {
@@ -1308,4 +1379,69 @@ describe('LogStreamingEventRelay', () => {
});
});
});
describe('job events', () => {
it('should log on `job-enqueued` event', () => {
const event: RelayEventMap['job-enqueued'] = {
executionId: 'exec-1',
workflowId: 'wf-2',
hostId,
jobId: 'job-4',
};
eventService.emit('job-enqueued', event);
expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({
eventName: 'n8n.queue.job.enqueued',
payload: {
executionId: 'exec-1',
workflowId: 'wf-2',
hostId,
jobId: 'job-4',
},
});
});
it('should log on `job-dequeued` event', () => {
const event: RelayEventMap['job-dequeued'] = {
executionId: 'exec-1',
workflowId: 'wf-2',
hostId,
jobId: 'job-4',
};
eventService.emit('job-dequeued', event);
expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({
eventName: 'n8n.queue.job.dequeued',
payload: {
executionId: 'exec-1',
workflowId: 'wf-2',
hostId,
jobId: 'job-4',
},
});
});
it('should log on `job-stalled` event', () => {
const event: RelayEventMap['job-stalled'] = {
executionId: 'exec-1',
workflowId: 'wf-2',
hostId,
jobId: 'job-4',
};
eventService.emit('job-stalled', event);
expect(eventBus.sendQueueEvent).toHaveBeenCalledWith({
eventName: 'n8n.queue.job.stalled',
payload: {
executionId: 'exec-1',
workflowId: 'wf-2',
hostId,
jobId: 'job-4',
},
});
});
});
});

View File

@@ -514,4 +514,29 @@ export type RelayEventMap = {
};
// #endregion
// #region queue
'job-enqueued': {
executionId: string;
workflowId: string;
hostId: string;
jobId: string;
};
'job-dequeued': {
executionId: string;
workflowId: string;
hostId: string;
jobId: string;
};
'job-stalled': {
executionId: string;
workflowId: string;
hostId: string;
jobId: string;
};
// #endregion
} & AiEventMap;

View File

@@ -1,5 +1,6 @@
import { Redactable } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
@@ -12,6 +13,7 @@ export class LogStreamingEventRelay extends EventRelay {
constructor(
readonly eventService: EventService,
private readonly eventBus: MessageEventBus,
private readonly instanceSettings: InstanceSettings,
) {
super(eventService);
}
@@ -65,6 +67,9 @@ export class LogStreamingEventRelay extends EventRelay {
'ai-vector-store-updated': (event) => this.aiVectorStoreUpdated(event),
'runner-task-requested': (event) => this.runnerTaskRequested(event),
'runner-response-received': (event) => this.runnerResponseReceived(event),
'job-enqueued': (event) => this.jobEnqueued(event),
'job-dequeued': (event) => this.jobDequeued(event),
'job-stalled': (event) => this.jobStalled(event),
});
}
@@ -143,10 +148,11 @@ export class LogStreamingEventRelay extends EventRelay {
}
private workflowPostExecute(event: RelayEventMap['workflow-post-execute']) {
const { runData, workflow, ...rest } = event;
const { runData, workflow, executionId, ...rest } = event;
const payload = {
...rest,
executionId,
success: !!runData?.finished, // despite the `success` name, this reports `finished` state
isManual: runData?.mode === 'manual',
workflowId: workflow.id,
@@ -159,6 +165,18 @@ export class LogStreamingEventRelay extends EventRelay {
payload,
});
if (runData?.jobId) {
void this.eventBus.sendQueueEvent({
eventName: 'n8n.queue.job.completed',
payload: {
executionId,
workflowId: workflow.id,
hostId: this.instanceSettings.hostId,
jobId: runData.jobId.toString(),
},
});
}
return;
}
@@ -174,6 +192,18 @@ export class LogStreamingEventRelay extends EventRelay {
errorMessage: runData?.data.resultData.error?.message.toString(),
},
});
if (runData?.jobId) {
void this.eventBus.sendQueueEvent({
eventName: 'n8n.queue.job.failed',
payload: {
executionId,
workflowId: workflow.id,
hostId: this.instanceSettings.hostId,
jobId: runData.jobId.toString(),
},
});
}
}
// #endregion
@@ -558,4 +588,29 @@ export class LogStreamingEventRelay extends EventRelay {
}
// #endregion
// #region queue
private jobEnqueued(payload: RelayEventMap['job-enqueued']) {
void this.eventBus.sendQueueEvent({
eventName: 'n8n.queue.job.enqueued',
payload,
});
}
private jobDequeued(payload: RelayEventMap['job-dequeued']) {
void this.eventBus.sendQueueEvent({
eventName: 'n8n.queue.job.dequeued',
payload,
});
}
private jobStalled(payload: RelayEventMap['job-stalled']) {
void this.eventBus.sendQueueEvent({
eventName: 'n8n.queue.job.stalled',
payload,
});
}
// #endregion
}

View File

@@ -86,6 +86,13 @@ export class ScalingService {
void this.queue.process(JOB_TYPE_NAME, concurrency, async (job: Job) => {
try {
this.eventService.emit('job-dequeued', {
executionId: job.data.executionId,
workflowId: job.data.workflowId,
hostId: this.instanceSettings.hostId,
jobId: job.id.toString(),
});
if (!this.hasValidJobData(job)) {
throw new UnexpectedError('Worker received invalid job', {
extra: { jobData: jsonStringify(job, { replaceCircularRefs: true }) },
@@ -196,6 +203,12 @@ export class ScalingService {
const jobId = job.id;
this.logger.info(`Enqueued execution ${executionId} (job ${jobId})`, { executionId, jobId });
this.eventService.emit('job-enqueued', {
executionId,
workflowId: jobData.workflowId,
hostId: this.instanceSettings.hostId,
jobId: jobId.toString(),
});
return job;
}

View File

@@ -10,6 +10,7 @@ export type Job = Bull.Job<JobData>;
export type JobId = Job['id'];
export type JobData = {
workflowId: string;
executionId: string;
loadStaticData: boolean;
pushRef?: string;

View File

@@ -38,6 +38,8 @@ import type { Job, JobData } from '@/scaling/scaling.types';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { EventService } from './events/event.service';
@Service()
export class WorkflowRunner {
private scalingService: ScalingService;
@@ -55,6 +57,7 @@ export class WorkflowRunner {
private readonly instanceSettings: InstanceSettings,
private readonly manualExecutionService: ManualExecutionService,
private readonly executionDataService: ExecutionDataService,
private readonly eventService: EventService,
) {}
setExecutionMode(mode: 'regular' | 'queue') {
@@ -167,7 +170,7 @@ export class WorkflowRunner {
: this.executionsMode === 'queue' && data.executionMode !== 'manual';
if (shouldEnqueue) {
await this.enqueueExecution(executionId, data, loadStaticData, realtime);
await this.enqueueExecution(executionId, workflowId, data, loadStaticData, realtime);
} else {
await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId);
}
@@ -335,11 +338,13 @@ export class WorkflowRunner {
private async enqueueExecution(
executionId: string,
workflowId: string,
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
realtime?: boolean,
): Promise<void> {
const jobData: JobData = {
workflowId,
executionId,
loadStaticData: !!loadStaticData,
pushRef: data.pushRef,
@@ -400,6 +405,12 @@ export class WorkflowRunner {
error.message.includes('job stalled more than maxStalledCount')
) {
error = new MaxStalledCountError(error);
this.eventService.emit('job-stalled', {
executionId: job.data.executionId,
workflowId: job.data.workflowId,
hostId: this.instanceSettings.hostId,
jobId: job.id.toString(),
});
}
// We use "getLifecycleHooksForScalingWorker" as "getLifecycleHooksForScalingMain" does not contain the
@@ -432,6 +443,7 @@ export class WorkflowRunner {
stoppedAt: fullExecutionData.stoppedAt,
status: fullExecutionData.status,
data: fullExecutionData.data,
jobId: job.id.toString(),
};
this.activeExecutions.finalizeExecution(executionId, runData);

View File

@@ -2016,6 +2016,8 @@
"settings.log-streaming.eventGroup.n8n.node.info": "Will send step-wise execution events every time a node executes. Please note that this can lead to a high frequency of logged events and is probably not suitable for general use.",
"settings.log-streaming.eventGroup.n8n.runner": "Runner tasks",
"settings.log-streaming.eventGroup.n8n.runner.info": "Will send an event when a Code node execution is requested from a task runner, and when a response is received from the runner with the result.",
"settings.log-streaming.eventGroup.n8n.queue": "Queue events",
"settings.log-streaming.eventGroup.n8n.queue.info": "Will send an event when a queue-related event occurs, e.g. enqueuing, dequeueing, completion, failure, or stalling.",
"settings.log-streaming.eventGroup.n8n.worker": "Worker",
"settings.log-streaming.$$AbstractMessageEventBusDestination": "Generic",
"settings.log-streaming.$$MessageEventBusDestinationWebhook": "Webhook",

View File

@@ -2143,6 +2143,9 @@ export interface IRun {
startedAt: Date;
stoppedAt?: Date;
status: ExecutionStatus;
/** ID of the job this execution belongs to. Only in scaling mode. */
jobId?: string;
}
// Contains all the data which is needed to execute a workflow and so also to

View File

@@ -15,6 +15,7 @@ export const enum EventMessageTypeNames {
execution = '$$EventMessageExecution',
aiNode = '$$EventMessageAiNode',
runner = '$$EventMessageRunner',
queue = '$$EventMessageQueue',
}
export const enum MessageEventBusDestinationTypeNames {