mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
refactor(core): Enrich relay-execution-lifecycle-event logs (#12165)
This commit is contained in:
@@ -4,6 +4,7 @@ import { Service } from 'typedi';
|
||||
|
||||
import config from '@/config';
|
||||
import { Logger } from '@/logging/logger.service';
|
||||
import type { LogMetadata } from '@/logging/types';
|
||||
import { RedisClientService } from '@/services/redis-client.service';
|
||||
|
||||
import type { PubSub } from './pubsub.types';
|
||||
@@ -45,7 +46,7 @@ export class Publisher {
|
||||
// #region Publishing
|
||||
|
||||
/** Publish a command into the `n8n.commands` channel. */
|
||||
async publishCommand(msg: Omit<PubSub.Command, 'senderId'>) {
|
||||
async publishCommand(msg: PubSub.Command) {
|
||||
// @TODO: Once this class is only ever used in scaling mode, remove next line.
|
||||
if (config.getEnv('executions.mode') !== 'queue') return;
|
||||
|
||||
@@ -59,7 +60,18 @@ export class Publisher {
|
||||
}),
|
||||
);
|
||||
|
||||
this.logger.debug(`Published ${msg.command} to command channel`);
|
||||
let msgName = msg.command;
|
||||
|
||||
const metadata: LogMetadata = { msg: msg.command, channel: 'n8n.commands' };
|
||||
|
||||
if (msg.command === 'relay-execution-lifecycle-event') {
|
||||
const { args, type } = msg.payload;
|
||||
msgName += ` (${type})`;
|
||||
metadata.type = type;
|
||||
metadata.executionId = args.executionId;
|
||||
}
|
||||
|
||||
this.logger.debug(`Published pubsub msg: ${msgName}`, metadata);
|
||||
}
|
||||
|
||||
/** Publish a response to a command into the `n8n.worker-response` channel. */
|
||||
|
||||
@@ -23,10 +23,14 @@ export namespace PubSub {
|
||||
// ----------------------------------
|
||||
|
||||
type _ToCommand<CommandKey extends keyof PubSubCommandMap> = {
|
||||
senderId: string;
|
||||
targets?: string[];
|
||||
command: CommandKey;
|
||||
|
||||
/** Host ID of the sender, added during publishing. */
|
||||
senderId?: string;
|
||||
|
||||
/** Host IDs of the receivers. */
|
||||
targets?: string[];
|
||||
|
||||
/** Whether the command should be sent to the sender as well. */
|
||||
selfSend?: boolean;
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import { Service } from 'typedi';
|
||||
import config from '@/config';
|
||||
import { EventService } from '@/events/event.service';
|
||||
import { Logger } from '@/logging/logger.service';
|
||||
import type { LogMetadata } from '@/logging/types';
|
||||
import { RedisClientService } from '@/services/redis-client.service';
|
||||
|
||||
import type { PubSub } from './pubsub.types';
|
||||
@@ -72,7 +73,7 @@ export class Subscriber {
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
this.logger.error(`Received malformed message via channel ${channel}`, {
|
||||
this.logger.error('Received malformed pubsub message', {
|
||||
msg: str,
|
||||
channel,
|
||||
});
|
||||
@@ -89,12 +90,18 @@ export class Subscriber {
|
||||
return null;
|
||||
}
|
||||
|
||||
const msgName = 'command' in msg ? msg.command : msg.response;
|
||||
let msgName = 'command' in msg ? msg.command : msg.response;
|
||||
|
||||
this.logger.debug(`Received message ${msgName} via channel ${channel}`, {
|
||||
msg,
|
||||
channel,
|
||||
});
|
||||
const metadata: LogMetadata = { msg: msgName, channel };
|
||||
|
||||
if ('command' in msg && msg.command === 'relay-execution-lifecycle-event') {
|
||||
const { args, type } = msg.payload;
|
||||
msgName += ` (${type})`;
|
||||
metadata.type = type;
|
||||
metadata.executionId = args.executionId;
|
||||
}
|
||||
|
||||
this.logger.debug(`Received pubsub msg: ${msgName}`, metadata);
|
||||
|
||||
return msg;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user