mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-19 19:11:13 +00:00
feat(core): Unlock queue metrics for multi-main (#17977)
This commit is contained in:
@@ -225,6 +225,12 @@ export class Start extends BaseCommand<z.infer<typeof flagsSchema>> {
|
|||||||
await this.moduleRegistry.initModules();
|
await this.moduleRegistry.initModules();
|
||||||
|
|
||||||
if (this.instanceSettings.isMultiMain) {
|
if (this.instanceSettings.isMultiMain) {
|
||||||
|
// we instantiate `PrometheusMetricsService` early to register its multi-main event handlers
|
||||||
|
if (this.globalConfig.endpoints.metrics.enable) {
|
||||||
|
const { PrometheusMetricsService } = await import('@/metrics/prometheus-metrics.service');
|
||||||
|
Container.get(PrometheusMetricsService);
|
||||||
|
}
|
||||||
|
|
||||||
Container.get(MultiMainSetup).registerEventHandlers();
|
Container.get(MultiMainSetup).registerEventHandlers();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -174,7 +174,7 @@ describe('PrometheusMetricsService', () => {
|
|||||||
includeStatusCode: false,
|
includeStatusCode: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(3, {
|
||||||
name: 'n8n_last_activity',
|
name: 'n8n_last_activity',
|
||||||
help: 'last instance activity (backend request) in Unix time (seconds).',
|
help: 'last instance activity (backend request) in Unix time (seconds).',
|
||||||
});
|
});
|
||||||
@@ -209,12 +209,12 @@ describe('PrometheusMetricsService', () => {
|
|||||||
|
|
||||||
// call 1 is for `n8n_version_info` (always enabled)
|
// call 1 is for `n8n_version_info` (always enabled)
|
||||||
|
|
||||||
expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(3, {
|
||||||
name: 'n8n_scaling_mode_queue_jobs_waiting',
|
name: 'n8n_scaling_mode_queue_jobs_waiting',
|
||||||
help: 'Current number of enqueued jobs waiting for pickup in scaling mode.',
|
help: 'Current number of enqueued jobs waiting for pickup in scaling mode.',
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(promClient.Gauge).toHaveBeenNthCalledWith(3, {
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(4, {
|
||||||
name: 'n8n_scaling_mode_queue_jobs_active',
|
name: 'n8n_scaling_mode_queue_jobs_active',
|
||||||
help: 'Current number of jobs being processed across all workers in scaling mode.',
|
help: 'Current number of jobs being processed across all workers in scaling mode.',
|
||||||
});
|
});
|
||||||
@@ -238,7 +238,7 @@ describe('PrometheusMetricsService', () => {
|
|||||||
|
|
||||||
await prometheusMetricsService.init(app);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
expect(promClient.Gauge).toHaveBeenCalledTimes(2); // version metric + active workflow count metric
|
expect(promClient.Gauge).toHaveBeenCalledTimes(3); // version metric + active workflow count metric + instance role metric
|
||||||
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
|
expect(promClient.Counter).toHaveBeenCalledTimes(0); // cache metrics
|
||||||
expect(eventService.on).not.toHaveBeenCalled();
|
expect(eventService.on).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
@@ -260,9 +260,9 @@ describe('PrometheusMetricsService', () => {
|
|||||||
await prometheusMetricsService.init(app);
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
// First call is n8n version metric
|
// First call is n8n version metric
|
||||||
expect(promClient.Gauge).toHaveBeenCalledTimes(2);
|
expect(promClient.Gauge).toHaveBeenCalledTimes(3);
|
||||||
|
|
||||||
expect(promClient.Gauge).toHaveBeenNthCalledWith(2, {
|
expect(promClient.Gauge).toHaveBeenNthCalledWith(3, {
|
||||||
name: 'n8n_active_workflow_count',
|
name: 'n8n_active_workflow_count',
|
||||||
help: 'Total number of active workflows.',
|
help: 'Total number of active workflows.',
|
||||||
collect: expect.any(Function),
|
collect: expect.any(Function),
|
||||||
@@ -530,4 +530,35 @@ describe('PrometheusMetricsService', () => {
|
|||||||
expect(promClient.Counter.prototype.inc).toHaveBeenCalledWith({}, 1);
|
expect(promClient.Counter.prototype.inc).toHaveBeenCalledWith({}, 1);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('instance role metric', () => {
|
||||||
|
it('should set up instance role metric for main instance', async () => {
|
||||||
|
// @ts-expect-error Private field
|
||||||
|
instanceSettings.instanceType = 'main';
|
||||||
|
|
||||||
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
|
expect(promClient.Gauge).toHaveBeenCalledWith({
|
||||||
|
name: 'n8n_instance_role_leader',
|
||||||
|
help: 'Whether this main instance is the leader (1) or not (0).',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not set up instance role metric for worker instance', async () => {
|
||||||
|
// @ts-expect-error Private field
|
||||||
|
instanceSettings.instanceType = 'worker';
|
||||||
|
|
||||||
|
await prometheusMetricsService.init(app);
|
||||||
|
|
||||||
|
// Only version and active workflow count metrics should be created
|
||||||
|
expect(promClient.Gauge).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
|
// Verify instance role metric was not created
|
||||||
|
const calls = (promClient.Gauge as jest.Mock).mock.calls;
|
||||||
|
const hasInstanceRoleMetric = calls.some(
|
||||||
|
(call) => call[0]?.name === 'n8n_instance_role_leader',
|
||||||
|
);
|
||||||
|
expect(hasInstanceRoleMetric).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
import { Time } from '@n8n/constants';
|
import { Time } from '@n8n/constants';
|
||||||
import { WorkflowRepository } from '@n8n/db';
|
import { WorkflowRepository } from '@n8n/db';
|
||||||
|
import { OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators';
|
||||||
import { Service } from '@n8n/di';
|
import { Service } from '@n8n/di';
|
||||||
import type express from 'express';
|
import type express from 'express';
|
||||||
import promBundle from 'express-prom-bundle';
|
import promBundle from 'express-prom-bundle';
|
||||||
@@ -59,6 +60,7 @@ export class PrometheusMetricsService {
|
|||||||
promClient.register.clear(); // clear all metrics in case we call this a second time
|
promClient.register.clear(); // clear all metrics in case we call this a second time
|
||||||
this.initDefaultMetrics();
|
this.initDefaultMetrics();
|
||||||
this.initN8nVersionMetric();
|
this.initN8nVersionMetric();
|
||||||
|
if (this.instanceSettings.instanceType === 'main') this.initInstanceRoleMetric();
|
||||||
this.initCacheMetrics();
|
this.initCacheMetrics();
|
||||||
this.initEventBusMetrics();
|
this.initEventBusMetrics();
|
||||||
this.initRouteMetrics(app);
|
this.initRouteMetrics(app);
|
||||||
@@ -112,6 +114,25 @@ export class PrometheusMetricsService {
|
|||||||
versionGauge.set({ version: 'v' + version, major, minor, patch }, 1);
|
versionGauge.set({ version: 'v' + version, major, minor, patch }, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private initInstanceRoleMetric() {
|
||||||
|
this.gauges.instanceRoleLeader = new promClient.Gauge({
|
||||||
|
name: this.prefix + 'instance_role_leader',
|
||||||
|
help: 'Whether this main instance is the leader (1) or not (0).',
|
||||||
|
});
|
||||||
|
|
||||||
|
this.gauges.instanceRoleLeader.set(this.instanceSettings.isLeader ? 1 : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnLeaderTakeover()
|
||||||
|
updateOnLeaderTakeover() {
|
||||||
|
this.gauges.instanceRoleLeader?.set(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnLeaderStepdown()
|
||||||
|
updateOnLeaderStepdown() {
|
||||||
|
this.gauges.instanceRoleLeader?.set(0);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up default metrics collection with `prom-client`, e.g.
|
* Set up default metrics collection with `prom-client`, e.g.
|
||||||
* `process_cpu_seconds_total`, `process_resident_memory_bytes`, etc.
|
* `process_cpu_seconds_total`, `process_resident_memory_bytes`, etc.
|
||||||
|
|||||||
@@ -405,8 +405,7 @@ export class ScalingService {
|
|||||||
get isQueueMetricsEnabled() {
|
get isQueueMetricsEnabled() {
|
||||||
return (
|
return (
|
||||||
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
this.globalConfig.endpoints.metrics.includeQueueMetrics &&
|
||||||
this.instanceSettings.instanceType === 'main' &&
|
this.instanceSettings.instanceType === 'main'
|
||||||
this.instanceSettings.isSingleMain
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user