From cc4e46eae4ef3d690aa25d97e37dd4de09140781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 11 Jun 2024 09:11:39 +0200 Subject: [PATCH] refactor(core): Remove event bus helpers (no-changelog) (#9690) --- .../MessageEventBus/MessageEventBus.ts | 8 +-- .../MessageEventBus/MessageEventBusHelper.ts | 7 --- .../MessageEventBusDestination/Helpers.ee.ts | 52 ------------------- .../MessageEventBusDestination.ee.ts | 6 +++ .../MessageEventBusDestinationSentry.ee.ts | 3 +- .../MessageEventBusDestinationSyslog.ee.ts | 3 +- .../MessageEventBusDestinationWebhook.ee.ts | 3 +- packages/cli/src/eventbus/index.ts | 1 - packages/cli/src/services/metrics.service.ts | 49 +++++++++++++++-- 9 files changed, 59 insertions(+), 73 deletions(-) delete mode 100644 packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts delete mode 100644 packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 141719c68d..3a3e2f282b 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -22,14 +22,12 @@ import type { EventMessageAuditOptions } from '../EventMessageClasses/EventMessa import { EventMessageAudit } from '../EventMessageClasses/EventMessageAudit'; import type { EventMessageWorkflowOptions } from '../EventMessageClasses/EventMessageWorkflow'; import { EventMessageWorkflow } from '../EventMessageClasses/EventMessageWorkflow'; -import { isLogStreamingEnabled } from './MessageEventBusHelper'; import type { EventMessageNodeOptions } from '../EventMessageClasses/EventMessageNode'; import { EventMessageNode } from '../EventMessageClasses/EventMessageNode'; import { EventMessageGeneric, eventMessageGenericDestinationTestEvent, } from '../EventMessageClasses/EventMessageGeneric'; -import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; import { ExecutionRecoveryService } from '../../executions/execution-recovery.service'; @@ -37,6 +35,7 @@ import { EventMessageAiNode, type EventMessageAiNodeOptions, } from '../EventMessageClasses/EventMessageAiNode'; +import { License } from '@/License'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -69,6 +68,7 @@ export class MessageEventBus extends EventEmitter { private readonly workflowRepository: WorkflowRepository, private readonly orchestrationService: OrchestrationService, private readonly recoveryService: ExecutionRecoveryService, + private readonly license: License, ) { super(); } @@ -329,7 +329,7 @@ export class MessageEventBus extends EventEmitter { } private async emitMessage(msg: EventMessageTypes) { - this.emit(METRICS_EVENT_NAME, msg); + this.emit('metrics.messageEventBus.Event', msg); // generic emit for external modules to capture events // this is for internal use ONLY and not for use with custom destinations! @@ -350,7 +350,7 @@ export class MessageEventBus extends EventEmitter { shouldSendMsg(msg: EventMessageTypes): boolean { return ( - isLogStreamingEnabled() && + this.license.isLogStreamingEnabled() && Object.keys(this.destinations).length > 0 && this.hasAnyDestinationSubscribedToEvent(msg) ); diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts deleted file mode 100644 index 29eab2872a..0000000000 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBusHelper.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { License } from '@/License'; -import { Container } from 'typedi'; - -export function isLogStreamingEnabled(): boolean { - const license = Container.get(License); - return license.isLogStreamingEnabled(); -} diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts deleted file mode 100644 index 33a6e54bcc..0000000000 --- a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { EventMessageTypeNames } from 'n8n-workflow'; -import config from '@/config'; -import type { EventMessageTypes } from '../EventMessageClasses'; - -export const METRICS_EVENT_NAME = 'metrics.messageEventBus.Event'; - -export function getMetricNameForEvent(event: EventMessageTypes): string { - const prefix = config.getEnv('endpoints.metrics.prefix'); - return prefix + event.eventName.replace('n8n.', '').replace(/\./g, '_') + '_total'; -} - -export function getLabelValueForNode(nodeType: string): string { - return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_'); -} - -export function getLabelValueForCredential(credentialType: string): string { - return credentialType.replace(/\./g, '_'); -} - -export function getLabelsForEvent(event: EventMessageTypes): Record { - switch (event.__type) { - case EventMessageTypeNames.audit: - if (event.eventName.startsWith('n8n.audit.user.credentials')) { - return config.getEnv('endpoints.metrics.includeCredentialTypeLabel') - ? { - credential_type: getLabelValueForCredential( - event.payload.credentialType ?? 'unknown', - ), - } - : {}; - } - - if (event.eventName.startsWith('n8n.audit.workflow')) { - return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') - ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } - : {}; - } - break; - - case EventMessageTypeNames.node: - return config.getEnv('endpoints.metrics.includeNodeTypeLabel') - ? { node_type: getLabelValueForNode(event.payload.nodeType ?? 'unknown') } - : {}; - - case EventMessageTypeNames.workflow: - return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') - ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } - : {}; - } - - return {}; -} diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts index bf68bb859c..6a7fef6d6e 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts @@ -8,6 +8,7 @@ import type { EventMessageTypes } from '../EventMessageClasses'; import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; import { EventDestinationsRepository } from '@db/repositories/eventDestinations.repository'; +import { License } from '@/License'; export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions { // Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please. @@ -18,6 +19,8 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti protected readonly logger: Logger; + protected readonly license: License; + __type: MessageEventBusDestinationTypeNames; label: string; @@ -31,7 +34,10 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti anonymizeAuditMessages: boolean; constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) { + // @TODO: Use DI this.logger = Container.get(Logger); + this.license = Container.get(License); + this.eventBusInstance = eventBusInstance; this.id = !options.id || options.id.length !== 36 ? uuid() : options.id; this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract; diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts index 7c03927aac..25633c3ac6 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts @@ -7,7 +7,6 @@ import type { MessageEventBusDestinationOptions, MessageEventBusDestinationSentryOptions, } from 'n8n-workflow'; -import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import { N8N_VERSION } from '@/constants'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; @@ -57,7 +56,7 @@ export class MessageEventBusDestinationSentry let sendResult = false; if (!this.sentryClient) return sendResult; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { - if (!isLogStreamingEnabled()) return sendResult; + if (!this.license.isLogStreamingEnabled()) return sendResult; if (!this.hasSubscribedToEvent(msg)) return sendResult; } try { diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts index ac68d3856a..f57705319c 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts @@ -7,7 +7,6 @@ import type { } from 'n8n-workflow'; import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import { MessageEventBusDestination } from './MessageEventBusDestination.ee'; -import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; import Container from 'typedi'; @@ -73,7 +72,7 @@ export class MessageEventBusDestinationSyslog const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { - if (!isLogStreamingEnabled()) return sendResult; + if (!this.license.isLogStreamingEnabled()) return sendResult; if (!this.hasSubscribedToEvent(msg)) return sendResult; } try { diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts index 6b23c2de78..95e76a854a 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts @@ -14,7 +14,6 @@ import type { } from 'n8n-workflow'; import { CredentialsHelper } from '@/CredentialsHelper'; import { Agent as HTTPSAgent } from 'https'; -import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; import * as SecretsHelpers from '@/ExternalSecrets/externalSecretsHelper.ee'; @@ -255,7 +254,7 @@ export class MessageEventBusDestinationWebhook const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { - if (!isLogStreamingEnabled()) return sendResult; + if (!this.license.isLogStreamingEnabled()) return sendResult; if (!this.hasSubscribedToEvent(msg)) return sendResult; } // at first run, build this.requestOptions with the destination settings diff --git a/packages/cli/src/eventbus/index.ts b/packages/cli/src/eventbus/index.ts index b9a271bb81..fd3658c0a4 100644 --- a/packages/cli/src/eventbus/index.ts +++ b/packages/cli/src/eventbus/index.ts @@ -1,3 +1,2 @@ export { EventMessageTypes } from './EventMessageClasses'; export { EventPayloadWorkflow } from './EventMessageClasses/EventMessageWorkflow'; -export { METRICS_EVENT_NAME, getLabelsForEvent } from './MessageEventBusDestination/Helpers.ee'; diff --git a/packages/cli/src/services/metrics.service.ts b/packages/cli/src/services/metrics.service.ts index aae4652e56..edee289de1 100644 --- a/packages/cli/src/services/metrics.service.ts +++ b/packages/cli/src/services/metrics.service.ts @@ -8,9 +8,10 @@ import { Service } from 'typedi'; import EventEmitter from 'events'; import { CacheService } from '@/services/cache/cache.service'; -import { METRICS_EVENT_NAME, getLabelsForEvent, type EventMessageTypes } from '@/eventbus'; +import { type EventMessageTypes } from '@/eventbus'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { Logger } from '@/Logger'; +import { EventMessageTypeNames } from 'n8n-workflow'; @Service() export class MetricsService extends EventEmitter { @@ -135,7 +136,7 @@ export class MetricsService extends EventEmitter { const counter = new promClient.Counter({ name: metricName, help: `Total number of ${event.eventName} events.`, - labelNames: Object.keys(getLabelsForEvent(event)), + labelNames: Object.keys(this.getLabelsForEvent(event)), }); counter.inc(0); this.counters[event.eventName] = counter; @@ -148,10 +149,52 @@ export class MetricsService extends EventEmitter { if (!config.getEnv('endpoints.metrics.includeMessageEventBusMetrics')) { return; } - this.eventBus.on(METRICS_EVENT_NAME, (event: EventMessageTypes) => { + this.eventBus.on('metrics.messageEventBus.Event', (event: EventMessageTypes) => { const counter = this.getCounterForEvent(event); if (!counter) return; counter.inc(1); }); } + + getLabelsForEvent(event: EventMessageTypes): Record { + switch (event.__type) { + case EventMessageTypeNames.audit: + if (event.eventName.startsWith('n8n.audit.user.credentials')) { + return config.getEnv('endpoints.metrics.includeCredentialTypeLabel') + ? { + credential_type: this.getLabelValueForCredential( + event.payload.credentialType ?? 'unknown', + ), + } + : {}; + } + + if (event.eventName.startsWith('n8n.audit.workflow')) { + return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') + ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } + : {}; + } + break; + + case EventMessageTypeNames.node: + return config.getEnv('endpoints.metrics.includeNodeTypeLabel') + ? { node_type: this.getLabelValueForNode(event.payload.nodeType ?? 'unknown') } + : {}; + + case EventMessageTypeNames.workflow: + return config.getEnv('endpoints.metrics.includeWorkflowIdLabel') + ? { workflow_id: event.payload.workflowId?.toString() ?? 'unknown' } + : {}; + } + + return {}; + } + + getLabelValueForNode(nodeType: string) { + return nodeType.replace('n8n-nodes-', '').replace(/\./g, '_'); + } + + getLabelValueForCredential(credentialType: string) { + return credentialType.replace(/\./g, '_'); + } }