feat(core): Add task runner events to log streaming (#16265)

This commit is contained in:
Iván Ovejero
2025-06-12 12:47:37 +02:00
committed by GitHub
parent 2b0cd5f7de
commit 46723d3518
11 changed files with 173 additions and 5 deletions

View File

@@ -0,0 +1,44 @@
import type { JsonObject } from 'n8n-workflow';
import { EventMessageTypeNames } from 'n8n-workflow';
import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message';
import type { AbstractEventMessageOptions } from './abstract-event-message-options';
import type { AbstractEventPayload } from './abstract-event-payload';
export interface EventPayloadRunner extends AbstractEventPayload {
taskId: string;
nodeId: string;
executionId: string;
workflowId: string;
}
export interface EventMessageRunnerOptions extends AbstractEventMessageOptions {
payload?: EventPayloadRunner;
}
export class EventMessageRunner extends AbstractEventMessage {
readonly __type = EventMessageTypeNames.runner;
payload: EventPayloadRunner;
constructor(options: EventMessageRunnerOptions) {
super(options);
if (options.payload) this.setPayload(options.payload);
if (options.anonymize) {
this.anonymize();
}
}
setPayload(payload: EventPayloadRunner): this {
this.payload = payload;
return this;
}
deserialize(data: JsonObject): this {
if (isEventMessageOptionsWithType(data, this.__type)) {
this.setOptionsOrDefault(data);
if (data.payload) this.setPayload(data.payload as EventPayloadRunner);
}
return this;
}
}

View File

@@ -3,6 +3,7 @@ import type { EventMessageAudit } from './event-message-audit';
import type { EventMessageExecution } from './event-message-execution';
import type { EventMessageGeneric } from './event-message-generic';
import type { EventMessageNode } from './event-message-node';
import type { EventMessageRunner } from './event-message-runner';
import type { EventMessageWorkflow } from './event-message-workflow';
export const eventNamesAiNodes = [
@@ -24,6 +25,13 @@ export const eventNamesAiNodes = [
export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number];
export const eventNamesRunner = [
'n8n.runner.task.requested',
'n8n.runner.response.received',
] as const;
export type EventNamesRunnerType = (typeof eventNamesRunner)[number];
export const eventNamesWorkflow = [
'n8n.workflow.started',
'n8n.workflow.success',
@@ -76,6 +84,7 @@ export type EventNamesTypes =
| EventNamesExecutionType
| EventNamesGenericType
| EventNamesAiNodesType
| EventNamesRunnerType
| 'n8n.destination.test';
export const eventNamesAll = [
@@ -84,6 +93,7 @@ export const eventNamesAll = [
...eventNamesNode,
...eventNamesGeneric,
...eventNamesAiNodes,
...eventNamesRunner,
];
export type EventMessageTypes =
@@ -92,4 +102,5 @@ export type EventMessageTypes =
| EventMessageAudit
| EventMessageNode
| EventMessageExecution
| EventMessageAiNode;
| EventMessageAiNode
| EventMessageRunner;

View File

@@ -32,6 +32,8 @@ import {
} from '../event-message-classes/event-message-generic';
import type { EventMessageNodeOptions } from '../event-message-classes/event-message-node';
import { EventMessageNode } from '../event-message-classes/event-message-node';
import type { EventMessageRunnerOptions } from '../event-message-classes/event-message-runner';
import { EventMessageRunner } from '../event-message-classes/event-message-runner';
import type { EventMessageWorkflowOptions } from '../event-message-classes/event-message-workflow';
import { EventMessageWorkflow } from '../event-message-classes/event-message-workflow';
import { messageEventBusDestinationFromDb } from '../message-event-bus-destination/message-event-bus-destination-from-db';
@@ -419,4 +421,8 @@ export class MessageEventBus extends EventEmitter {
async sendExecutionEvent(options: EventMessageExecutionOptions) {
await this.send(new EventMessageExecution(options));
}
async sendRunnerEvent(options: EventMessageRunnerOptions) {
await this.send(new EventMessageRunner(options));
}
}

View File

@@ -1258,4 +1258,48 @@ describe('LogStreamingEventRelay', () => {
});
});
});
describe('runner events', () => {
it('should log on `runner-task-requested` event', () => {
const event: RelayEventMap['runner-task-requested'] = {
taskId: 't-1',
nodeId: 'n-2',
executionId: 'e-3',
workflowId: 'w-4',
};
eventService.emit('runner-task-requested', event);
expect(eventBus.sendRunnerEvent).toHaveBeenCalledWith({
eventName: 'n8n.runner.task.requested',
payload: {
taskId: 't-1',
nodeId: 'n-2',
executionId: 'e-3',
workflowId: 'w-4',
},
});
});
it('should log on `runner-response-received` event', () => {
const event: RelayEventMap['runner-response-received'] = {
taskId: 't-1',
nodeId: 'n-2',
executionId: 'e-3',
workflowId: 'w-4',
};
eventService.emit('runner-response-received', event);
expect(eventBus.sendRunnerEvent).toHaveBeenCalledWith({
eventName: 'n8n.runner.response.received',
payload: {
taskId: 't-1',
nodeId: 'n-2',
executionId: 'e-3',
workflowId: 'w-4',
},
});
});
});
});

View File

@@ -492,4 +492,22 @@ export type RelayEventMap = {
};
// #endregion
// #region runner
'runner-task-requested': {
taskId: string;
nodeId: string;
workflowId: string;
executionId: string;
};
'runner-response-received': {
taskId: string;
nodeId: string;
workflowId: string;
executionId: string;
};
// #endregion
} & AiEventMap;

View File

@@ -63,6 +63,8 @@ export class LogStreamingEventRelay extends EventRelay {
'ai-llm-errored': (event) => this.aiLlmErrored(event),
'ai-vector-store-populated': (event) => this.aiVectorStorePopulated(event),
'ai-vector-store-updated': (event) => this.aiVectorStoreUpdated(event),
'runner-task-requested': (event) => this.runnerTaskRequested(event),
'runner-response-received': (event) => this.runnerResponseReceived(event),
});
}
@@ -524,4 +526,22 @@ export class LogStreamingEventRelay extends EventRelay {
}
// #endregion
// #region runner
private runnerTaskRequested(payload: RelayEventMap['runner-task-requested']) {
void this.eventBus.sendRunnerEvent({
eventName: 'n8n.runner.task.requested',
payload,
});
}
private runnerResponseReceived(payload: RelayEventMap['runner-response-received']) {
void this.eventBus.sendRunnerEvent({
eventName: 'n8n.runner.response.received',
payload,
});
}
// #endregion
}

View File

@@ -2,6 +2,7 @@ import { mock } from 'jest-mock-extended';
import get from 'lodash/get';
import set from 'lodash/set';
import type { EventService } from '@/events/event.service';
import type { NodeTypes } from '@/node-types';
import type { Task } from '@/task-runners/task-managers/task-requester';
import { TaskRequester } from '@/task-runners/task-managers/task-requester';
@@ -17,9 +18,10 @@ class TestTaskRequester extends TaskRequester {
describe('TaskRequester', () => {
let instance: TestTaskRequester;
const mockNodeTypes = mock<NodeTypes>();
const mockEventService = mock<EventService>();
beforeEach(() => {
instance = new TestTaskRequester(mockNodeTypes);
instance = new TestTaskRequester(mockNodeTypes, mockEventService);
});
describe('handleRpc', () => {

View File

@@ -1,6 +1,7 @@
import { Container, Service } from '@n8n/di';
import type { RequesterMessage } from '@n8n/task-runner';
import { EventService } from '@/events/event.service';
import { NodeTypes } from '@/node-types';
import type { RequesterMessageCallback } from '@/task-runners/task-broker/task-broker.service';
import { TaskBroker } from '@/task-runners/task-broker/task-broker.service';
@@ -13,8 +14,8 @@ export class LocalTaskRequester extends TaskRequester {
id = 'local-task-requester';
constructor(nodeTypes: NodeTypes) {
super(nodeTypes);
constructor(nodeTypes: NodeTypes, eventService: EventService) {
super(nodeTypes, eventService);
this.registerRequester();
}

View File

@@ -20,6 +20,7 @@ import type {
} from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { EventService } from '@/events/event.service';
import { NodeTypes } from '@/node-types';
import { DataRequestResponseBuilder } from './data-request-response-builder';
@@ -58,7 +59,10 @@ export abstract class TaskRequester {
private readonly dataResponseBuilder = new DataRequestResponseBuilder();
constructor(private readonly nodeTypes: NodeTypes) {}
constructor(
private readonly nodeTypes: NodeTypes,
private readonly eventService: EventService,
) {}
async startTask<TData, TError>(
additionalData: IWorkflowExecuteAdditionalData,
@@ -130,6 +134,13 @@ export abstract class TaskRequester {
};
this.tasks.set(task.taskId, task);
this.eventService.emit('runner-task-requested', {
taskId: task.taskId,
nodeId: task.data.node.id,
workflowId: task.data.workflow.id,
executionId: task.data.additionalData.executionId ?? 'unknown',
});
try {
const dataPromise = new Promise<TaskResultData>((resolve, reject) => {
this.taskAcceptRejects.set(task.taskId, {
@@ -145,6 +156,14 @@ export abstract class TaskRequester {
});
const resultData = await dataPromise;
this.eventService.emit('runner-response-received', {
taskId: task.taskId,
nodeId: task.data.node.id,
workflowId: task.data.workflow.id,
executionId: task.data.additionalData.executionId ?? 'unknown',
});
// Set custom execution data (`$execution.customData`) if sent
if (resultData.customData) {
Object.entries(resultData.customData).forEach(([k, v]) => {

View File

@@ -2002,6 +2002,8 @@
"settings.log-streaming.eventGroup.n8n.user": "User",
"settings.log-streaming.eventGroup.n8n.node": "Node Executions",
"settings.log-streaming.eventGroup.n8n.node.info": "Will send step-wise execution events every time a node executes. Please note that this can lead to a high frequency of logged events and is probably not suitable for general use.",
"settings.log-streaming.eventGroup.n8n.runner": "Runner tasks",
"settings.log-streaming.eventGroup.n8n.runner.info": "Will send an event when a Code node execution is requested from a task runner, and when a response is received from the runner with the result.",
"settings.log-streaming.eventGroup.n8n.worker": "Worker",
"settings.log-streaming.$$AbstractMessageEventBusDestination": "Generic",
"settings.log-streaming.$$MessageEventBusDestinationWebhook": "Webhook",

View File

@@ -14,6 +14,7 @@ export const enum EventMessageTypeNames {
node = '$$EventMessageNode',
execution = '$$EventMessageExecution',
aiNode = '$$EventMessageAiNode',
runner = '$$EventMessageRunner',
}
export const enum MessageEventBusDestinationTypeNames {