From 46723d3518b4d37e51363e49fae2216a4cb96e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 12 Jun 2025 12:47:37 +0200 Subject: [PATCH] feat(core): Add task runner events to log streaming (#16265) --- .../event-message-runner.ts | 44 +++++++++++++++++++ .../eventbus/event-message-classes/index.ts | 13 +++++- .../message-event-bus/message-event-bus.ts | 6 +++ .../log-streaming-event-relay.test.ts | 44 +++++++++++++++++++ .../cli/src/events/maps/relay.event-map.ts | 18 ++++++++ .../relays/log-streaming.event-relay.ts | 20 +++++++++ .../__tests__/task-manager.test.ts | 4 +- .../task-managers/local-task-requester.ts | 5 ++- .../task-managers/task-requester.ts | 21 ++++++++- .../frontend/@n8n/i18n/src/locales/en.json | 2 + packages/workflow/src/message-event-bus.ts | 1 + 11 files changed, 173 insertions(+), 5 deletions(-) create mode 100644 packages/cli/src/eventbus/event-message-classes/event-message-runner.ts diff --git a/packages/cli/src/eventbus/event-message-classes/event-message-runner.ts b/packages/cli/src/eventbus/event-message-classes/event-message-runner.ts new file mode 100644 index 0000000000..2d5f80ef60 --- /dev/null +++ b/packages/cli/src/eventbus/event-message-classes/event-message-runner.ts @@ -0,0 +1,44 @@ +import type { JsonObject } from 'n8n-workflow'; +import { EventMessageTypeNames } from 'n8n-workflow'; + +import { AbstractEventMessage, isEventMessageOptionsWithType } from './abstract-event-message'; +import type { AbstractEventMessageOptions } from './abstract-event-message-options'; +import type { AbstractEventPayload } from './abstract-event-payload'; + +export interface EventPayloadRunner extends AbstractEventPayload { + taskId: string; + nodeId: string; + executionId: string; + workflowId: string; +} + +export interface EventMessageRunnerOptions extends AbstractEventMessageOptions { + payload?: EventPayloadRunner; +} + +export class EventMessageRunner extends AbstractEventMessage { + readonly __type = EventMessageTypeNames.runner; + + payload: EventPayloadRunner; + + constructor(options: EventMessageRunnerOptions) { + super(options); + if (options.payload) this.setPayload(options.payload); + if (options.anonymize) { + this.anonymize(); + } + } + + setPayload(payload: EventPayloadRunner): this { + this.payload = payload; + return this; + } + + deserialize(data: JsonObject): this { + if (isEventMessageOptionsWithType(data, this.__type)) { + this.setOptionsOrDefault(data); + if (data.payload) this.setPayload(data.payload as EventPayloadRunner); + } + return this; + } +} diff --git a/packages/cli/src/eventbus/event-message-classes/index.ts b/packages/cli/src/eventbus/event-message-classes/index.ts index fe985af4e0..bf88933d53 100644 --- a/packages/cli/src/eventbus/event-message-classes/index.ts +++ b/packages/cli/src/eventbus/event-message-classes/index.ts @@ -3,6 +3,7 @@ import type { EventMessageAudit } from './event-message-audit'; import type { EventMessageExecution } from './event-message-execution'; import type { EventMessageGeneric } from './event-message-generic'; import type { EventMessageNode } from './event-message-node'; +import type { EventMessageRunner } from './event-message-runner'; import type { EventMessageWorkflow } from './event-message-workflow'; export const eventNamesAiNodes = [ @@ -24,6 +25,13 @@ export const eventNamesAiNodes = [ export type EventNamesAiNodesType = (typeof eventNamesAiNodes)[number]; +export const eventNamesRunner = [ + 'n8n.runner.task.requested', + 'n8n.runner.response.received', +] as const; + +export type EventNamesRunnerType = (typeof eventNamesRunner)[number]; + export const eventNamesWorkflow = [ 'n8n.workflow.started', 'n8n.workflow.success', @@ -76,6 +84,7 @@ export type EventNamesTypes = | EventNamesExecutionType | EventNamesGenericType | EventNamesAiNodesType + | EventNamesRunnerType | 'n8n.destination.test'; export const eventNamesAll = [ @@ -84,6 +93,7 @@ export const eventNamesAll = [ ...eventNamesNode, ...eventNamesGeneric, ...eventNamesAiNodes, + ...eventNamesRunner, ]; export type EventMessageTypes = @@ -92,4 +102,5 @@ export type EventMessageTypes = | EventMessageAudit | EventMessageNode | EventMessageExecution - | EventMessageAiNode; + | EventMessageAiNode + | EventMessageRunner; diff --git a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts index e061fd45c5..3f268c3d13 100644 --- a/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts +++ b/packages/cli/src/eventbus/message-event-bus/message-event-bus.ts @@ -32,6 +32,8 @@ import { } from '../event-message-classes/event-message-generic'; import type { EventMessageNodeOptions } from '../event-message-classes/event-message-node'; import { EventMessageNode } from '../event-message-classes/event-message-node'; +import type { EventMessageRunnerOptions } from '../event-message-classes/event-message-runner'; +import { EventMessageRunner } from '../event-message-classes/event-message-runner'; import type { EventMessageWorkflowOptions } from '../event-message-classes/event-message-workflow'; import { EventMessageWorkflow } from '../event-message-classes/event-message-workflow'; import { messageEventBusDestinationFromDb } from '../message-event-bus-destination/message-event-bus-destination-from-db'; @@ -419,4 +421,8 @@ export class MessageEventBus extends EventEmitter { async sendExecutionEvent(options: EventMessageExecutionOptions) { await this.send(new EventMessageExecution(options)); } + + async sendRunnerEvent(options: EventMessageRunnerOptions) { + await this.send(new EventMessageRunner(options)); + } } diff --git a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts index aa8a979309..0a672a96da 100644 --- a/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts +++ b/packages/cli/src/events/__tests__/log-streaming-event-relay.test.ts @@ -1258,4 +1258,48 @@ describe('LogStreamingEventRelay', () => { }); }); }); + + describe('runner events', () => { + it('should log on `runner-task-requested` event', () => { + const event: RelayEventMap['runner-task-requested'] = { + taskId: 't-1', + nodeId: 'n-2', + executionId: 'e-3', + workflowId: 'w-4', + }; + + eventService.emit('runner-task-requested', event); + + expect(eventBus.sendRunnerEvent).toHaveBeenCalledWith({ + eventName: 'n8n.runner.task.requested', + payload: { + taskId: 't-1', + nodeId: 'n-2', + executionId: 'e-3', + workflowId: 'w-4', + }, + }); + }); + + it('should log on `runner-response-received` event', () => { + const event: RelayEventMap['runner-response-received'] = { + taskId: 't-1', + nodeId: 'n-2', + executionId: 'e-3', + workflowId: 'w-4', + }; + + eventService.emit('runner-response-received', event); + + expect(eventBus.sendRunnerEvent).toHaveBeenCalledWith({ + eventName: 'n8n.runner.response.received', + payload: { + taskId: 't-1', + nodeId: 'n-2', + executionId: 'e-3', + workflowId: 'w-4', + }, + }); + }); + }); }); diff --git a/packages/cli/src/events/maps/relay.event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts index accf34c2e1..3a11f37987 100644 --- a/packages/cli/src/events/maps/relay.event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -492,4 +492,22 @@ export type RelayEventMap = { }; // #endregion + + // #region runner + + 'runner-task-requested': { + taskId: string; + nodeId: string; + workflowId: string; + executionId: string; + }; + + 'runner-response-received': { + taskId: string; + nodeId: string; + workflowId: string; + executionId: string; + }; + + // #endregion } & AiEventMap; diff --git a/packages/cli/src/events/relays/log-streaming.event-relay.ts b/packages/cli/src/events/relays/log-streaming.event-relay.ts index 6286fe8f9b..1515fcd3bb 100644 --- a/packages/cli/src/events/relays/log-streaming.event-relay.ts +++ b/packages/cli/src/events/relays/log-streaming.event-relay.ts @@ -63,6 +63,8 @@ export class LogStreamingEventRelay extends EventRelay { 'ai-llm-errored': (event) => this.aiLlmErrored(event), 'ai-vector-store-populated': (event) => this.aiVectorStorePopulated(event), 'ai-vector-store-updated': (event) => this.aiVectorStoreUpdated(event), + 'runner-task-requested': (event) => this.runnerTaskRequested(event), + 'runner-response-received': (event) => this.runnerResponseReceived(event), }); } @@ -524,4 +526,22 @@ export class LogStreamingEventRelay extends EventRelay { } // #endregion + + // #region runner + + private runnerTaskRequested(payload: RelayEventMap['runner-task-requested']) { + void this.eventBus.sendRunnerEvent({ + eventName: 'n8n.runner.task.requested', + payload, + }); + } + + private runnerResponseReceived(payload: RelayEventMap['runner-response-received']) { + void this.eventBus.sendRunnerEvent({ + eventName: 'n8n.runner.response.received', + payload, + }); + } + + // #endregion } diff --git a/packages/cli/src/task-runners/task-managers/__tests__/task-manager.test.ts b/packages/cli/src/task-runners/task-managers/__tests__/task-manager.test.ts index 1b12ba7caf..c1600dd8eb 100644 --- a/packages/cli/src/task-runners/task-managers/__tests__/task-manager.test.ts +++ b/packages/cli/src/task-runners/task-managers/__tests__/task-manager.test.ts @@ -2,6 +2,7 @@ import { mock } from 'jest-mock-extended'; import get from 'lodash/get'; import set from 'lodash/set'; +import type { EventService } from '@/events/event.service'; import type { NodeTypes } from '@/node-types'; import type { Task } from '@/task-runners/task-managers/task-requester'; import { TaskRequester } from '@/task-runners/task-managers/task-requester'; @@ -17,9 +18,10 @@ class TestTaskRequester extends TaskRequester { describe('TaskRequester', () => { let instance: TestTaskRequester; const mockNodeTypes = mock(); + const mockEventService = mock(); beforeEach(() => { - instance = new TestTaskRequester(mockNodeTypes); + instance = new TestTaskRequester(mockNodeTypes, mockEventService); }); describe('handleRpc', () => { diff --git a/packages/cli/src/task-runners/task-managers/local-task-requester.ts b/packages/cli/src/task-runners/task-managers/local-task-requester.ts index 4fc45ccd6c..821906cb76 100644 --- a/packages/cli/src/task-runners/task-managers/local-task-requester.ts +++ b/packages/cli/src/task-runners/task-managers/local-task-requester.ts @@ -1,6 +1,7 @@ import { Container, Service } from '@n8n/di'; import type { RequesterMessage } from '@n8n/task-runner'; +import { EventService } from '@/events/event.service'; import { NodeTypes } from '@/node-types'; import type { RequesterMessageCallback } from '@/task-runners/task-broker/task-broker.service'; import { TaskBroker } from '@/task-runners/task-broker/task-broker.service'; @@ -13,8 +14,8 @@ export class LocalTaskRequester extends TaskRequester { id = 'local-task-requester'; - constructor(nodeTypes: NodeTypes) { - super(nodeTypes); + constructor(nodeTypes: NodeTypes, eventService: EventService) { + super(nodeTypes, eventService); this.registerRequester(); } diff --git a/packages/cli/src/task-runners/task-managers/task-requester.ts b/packages/cli/src/task-runners/task-managers/task-requester.ts index 61ed4c6062..2c409b8f89 100644 --- a/packages/cli/src/task-runners/task-managers/task-requester.ts +++ b/packages/cli/src/task-runners/task-managers/task-requester.ts @@ -20,6 +20,7 @@ import type { } from 'n8n-workflow'; import { nanoid } from 'nanoid'; +import { EventService } from '@/events/event.service'; import { NodeTypes } from '@/node-types'; import { DataRequestResponseBuilder } from './data-request-response-builder'; @@ -58,7 +59,10 @@ export abstract class TaskRequester { private readonly dataResponseBuilder = new DataRequestResponseBuilder(); - constructor(private readonly nodeTypes: NodeTypes) {} + constructor( + private readonly nodeTypes: NodeTypes, + private readonly eventService: EventService, + ) {} async startTask( additionalData: IWorkflowExecuteAdditionalData, @@ -130,6 +134,13 @@ export abstract class TaskRequester { }; this.tasks.set(task.taskId, task); + this.eventService.emit('runner-task-requested', { + taskId: task.taskId, + nodeId: task.data.node.id, + workflowId: task.data.workflow.id, + executionId: task.data.additionalData.executionId ?? 'unknown', + }); + try { const dataPromise = new Promise((resolve, reject) => { this.taskAcceptRejects.set(task.taskId, { @@ -145,6 +156,14 @@ export abstract class TaskRequester { }); const resultData = await dataPromise; + + this.eventService.emit('runner-response-received', { + taskId: task.taskId, + nodeId: task.data.node.id, + workflowId: task.data.workflow.id, + executionId: task.data.additionalData.executionId ?? 'unknown', + }); + // Set custom execution data (`$execution.customData`) if sent if (resultData.customData) { Object.entries(resultData.customData).forEach(([k, v]) => { diff --git a/packages/frontend/@n8n/i18n/src/locales/en.json b/packages/frontend/@n8n/i18n/src/locales/en.json index 5b44f2d0d3..a12860c3bd 100644 --- a/packages/frontend/@n8n/i18n/src/locales/en.json +++ b/packages/frontend/@n8n/i18n/src/locales/en.json @@ -2002,6 +2002,8 @@ "settings.log-streaming.eventGroup.n8n.user": "User", "settings.log-streaming.eventGroup.n8n.node": "Node Executions", "settings.log-streaming.eventGroup.n8n.node.info": "Will send step-wise execution events every time a node executes. Please note that this can lead to a high frequency of logged events and is probably not suitable for general use.", + "settings.log-streaming.eventGroup.n8n.runner": "Runner tasks", + "settings.log-streaming.eventGroup.n8n.runner.info": "Will send an event when a Code node execution is requested from a task runner, and when a response is received from the runner with the result.", "settings.log-streaming.eventGroup.n8n.worker": "Worker", "settings.log-streaming.$$AbstractMessageEventBusDestination": "Generic", "settings.log-streaming.$$MessageEventBusDestinationWebhook": "Webhook", diff --git a/packages/workflow/src/message-event-bus.ts b/packages/workflow/src/message-event-bus.ts index 4ca16093d2..940381ec96 100644 --- a/packages/workflow/src/message-event-bus.ts +++ b/packages/workflow/src/message-event-bus.ts @@ -14,6 +14,7 @@ export const enum EventMessageTypeNames { node = '$$EventMessageNode', execution = '$$EventMessageExecution', aiNode = '$$EventMessageAiNode', + runner = '$$EventMessageRunner', } export const enum MessageEventBusDestinationTypeNames {