mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-19 11:01:15 +00:00
fix(core): Fix telemetry for concurrency control (#9845)
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
@@ -1,6 +1,10 @@
|
|||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import config from '@/config';
|
import config from '@/config';
|
||||||
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
|
import {
|
||||||
|
CLOUD_TEMP_PRODUCTION_LIMIT,
|
||||||
|
CLOUD_TEMP_REPORTABLE_THRESHOLDS,
|
||||||
|
ConcurrencyControlService,
|
||||||
|
} from '@/concurrency/concurrency-control.service';
|
||||||
import type { Logger } from '@/Logger';
|
import type { Logger } from '@/Logger';
|
||||||
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
|
import { InvalidConcurrencyLimitError } from '@/errors/invalid-concurrency-limit.error';
|
||||||
import { ConcurrencyQueue } from '../concurrency-queue';
|
import { ConcurrencyQueue } from '../concurrency-queue';
|
||||||
@@ -366,4 +370,91 @@ describe('ConcurrencyControlService', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ----------------------------------
|
||||||
|
// telemetry
|
||||||
|
// ----------------------------------
|
||||||
|
|
||||||
|
describe('telemetry', () => {
|
||||||
|
describe('on cloud', () => {
|
||||||
|
test.each(CLOUD_TEMP_REPORTABLE_THRESHOLDS)(
|
||||||
|
'for capacity %d, should report temp cloud threshold if reached',
|
||||||
|
(threshold) => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
|
||||||
|
config.set('deployment.type', 'cloud');
|
||||||
|
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
service.productionQueue.emit('concurrency-check', {
|
||||||
|
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold });
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.each(CLOUD_TEMP_REPORTABLE_THRESHOLDS.map((t) => t - 1))(
|
||||||
|
'for capacity %d, should not report temp cloud threshold if not reached',
|
||||||
|
(threshold) => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
|
||||||
|
config.set('deployment.type', 'cloud');
|
||||||
|
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
service.productionQueue.emit('concurrency-check', {
|
||||||
|
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(telemetry.track).not.toHaveBeenCalledWith('User hit concurrency limit', {
|
||||||
|
threshold,
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test.each(CLOUD_TEMP_REPORTABLE_THRESHOLDS.map((t) => t + 1))(
|
||||||
|
'for capacity %d, should not report temp cloud threshold if exceeded',
|
||||||
|
(threshold) => {
|
||||||
|
/**
|
||||||
|
* Arrange
|
||||||
|
*/
|
||||||
|
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT);
|
||||||
|
config.set('deployment.type', 'cloud');
|
||||||
|
const service = new ConcurrencyControlService(logger, executionRepository, telemetry);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Act
|
||||||
|
*/
|
||||||
|
// @ts-expect-error Private property
|
||||||
|
service.productionQueue.emit('concurrency-check', {
|
||||||
|
capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold,
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert
|
||||||
|
*/
|
||||||
|
expect(telemetry.track).not.toHaveBeenCalledWith('User hit concurrency limit', {
|
||||||
|
threshold,
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import { sleep } from 'n8n-workflow';
|
||||||
import { ConcurrencyQueue } from '../concurrency-queue';
|
import { ConcurrencyQueue } from '../concurrency-queue';
|
||||||
|
|
||||||
describe('ConcurrencyQueue', () => {
|
describe('ConcurrencyQueue', () => {
|
||||||
@@ -10,12 +11,12 @@ describe('ConcurrencyQueue', () => {
|
|||||||
const state: Record<string, 'started' | 'finished'> = {};
|
const state: Record<string, 'started' | 'finished'> = {};
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/promise-function-async
|
// eslint-disable-next-line @typescript-eslint/promise-function-async
|
||||||
const sleep = jest.fn(() => new Promise((resolve) => setTimeout(resolve, 500)));
|
const sleepSpy = jest.fn(() => sleep(500));
|
||||||
|
|
||||||
const testFn = async (item: { executionId: string }) => {
|
const testFn = async (item: { executionId: string }) => {
|
||||||
await queue.enqueue(item.executionId);
|
await queue.enqueue(item.executionId);
|
||||||
state[item.executionId] = 'started';
|
state[item.executionId] = 'started';
|
||||||
await sleep();
|
await sleepSpy();
|
||||||
queue.dequeue();
|
queue.dequeue();
|
||||||
state[item.executionId] = 'finished';
|
state[item.executionId] = 'finished';
|
||||||
};
|
};
|
||||||
@@ -29,33 +30,46 @@ describe('ConcurrencyQueue', () => {
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
// At T+0 seconds this method hasn't yielded to the event-loop, so no `testFn` calls are made
|
// At T+0 seconds this method hasn't yielded to the event-loop, so no `testFn` calls are made
|
||||||
expect(sleep).toHaveBeenCalledTimes(0);
|
expect(sleepSpy).toHaveBeenCalledTimes(0);
|
||||||
expect(state).toEqual({});
|
expect(state).toEqual({});
|
||||||
|
|
||||||
// At T+0.4 seconds the first `testFn` has been called, but hasn't resolved
|
// At T+0.4 seconds the first `testFn` has been called, but hasn't resolved
|
||||||
await jest.advanceTimersByTimeAsync(400);
|
await jest.advanceTimersByTimeAsync(400);
|
||||||
expect(sleep).toHaveBeenCalledTimes(1);
|
expect(sleepSpy).toHaveBeenCalledTimes(1);
|
||||||
expect(state).toEqual({ 1: 'started' });
|
expect(state).toEqual({ 1: 'started' });
|
||||||
|
|
||||||
// At T+0.5 seconds the first promise has resolved, and the second one has stared
|
// At T+0.5 seconds the first promise has resolved, and the second one has stared
|
||||||
await jest.advanceTimersByTimeAsync(100);
|
await jest.advanceTimersByTimeAsync(100);
|
||||||
expect(sleep).toHaveBeenCalledTimes(2);
|
expect(sleepSpy).toHaveBeenCalledTimes(2);
|
||||||
expect(state).toEqual({ 1: 'finished', 2: 'started' });
|
expect(state).toEqual({ 1: 'finished', 2: 'started' });
|
||||||
|
|
||||||
// At T+1 seconds the first two promises have resolved, and the third one has stared
|
// At T+1 seconds the first two promises have resolved, and the third one has stared
|
||||||
await jest.advanceTimersByTimeAsync(500);
|
await jest.advanceTimersByTimeAsync(500);
|
||||||
expect(sleep).toHaveBeenCalledTimes(3);
|
expect(sleepSpy).toHaveBeenCalledTimes(3);
|
||||||
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started' });
|
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started' });
|
||||||
|
|
||||||
// If the fourth promise is removed, the fifth one is started in the next tick
|
// If the fourth promise is removed, the fifth one is started in the next tick
|
||||||
queue.remove('4');
|
queue.remove('4');
|
||||||
await jest.advanceTimersByTimeAsync(1);
|
await jest.advanceTimersByTimeAsync(1);
|
||||||
expect(sleep).toHaveBeenCalledTimes(4);
|
expect(sleepSpy).toHaveBeenCalledTimes(4);
|
||||||
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started', 5: 'started' });
|
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'started', 5: 'started' });
|
||||||
|
|
||||||
// at T+5 seconds, all but the fourth promise should be resolved
|
// at T+5 seconds, all but the fourth promise should be resolved
|
||||||
await jest.advanceTimersByTimeAsync(4000);
|
await jest.advanceTimersByTimeAsync(4000);
|
||||||
expect(sleep).toHaveBeenCalledTimes(4);
|
expect(sleepSpy).toHaveBeenCalledTimes(4);
|
||||||
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'finished', 5: 'finished' });
|
expect(state).toEqual({ 1: 'finished', 2: 'finished', 3: 'finished', 5: 'finished' });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should debounce emitting of the `concurrency-check` event', async () => {
|
||||||
|
const queue = new ConcurrencyQueue(10);
|
||||||
|
const emitSpy = jest.fn();
|
||||||
|
queue.on('concurrency-check', emitSpy);
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/promise-function-async
|
||||||
|
Array.from({ length: 10 }, (_, i) => i).forEach(() => queue.enqueue('1'));
|
||||||
|
|
||||||
|
expect(queue.currentCapacity).toBe(0);
|
||||||
|
await jest.advanceTimersByTimeAsync(1000);
|
||||||
|
expect(emitSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -9,6 +9,9 @@ import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow';
|
|||||||
import type { IExecutingWorkflowData } from '@/Interfaces';
|
import type { IExecutingWorkflowData } from '@/Interfaces';
|
||||||
import { Telemetry } from '@/telemetry';
|
import { Telemetry } from '@/telemetry';
|
||||||
|
|
||||||
|
export const CLOUD_TEMP_PRODUCTION_LIMIT = 999;
|
||||||
|
export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200];
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ConcurrencyControlService {
|
export class ConcurrencyControlService {
|
||||||
private readonly isEnabled: boolean;
|
private readonly isEnabled: boolean;
|
||||||
@@ -17,7 +20,9 @@ export class ConcurrencyControlService {
|
|||||||
|
|
||||||
private readonly productionQueue: ConcurrencyQueue;
|
private readonly productionQueue: ConcurrencyQueue;
|
||||||
|
|
||||||
private readonly limitsToReport = [5, 10, 20, 50, 100, 200];
|
private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map(
|
||||||
|
(t) => CLOUD_TEMP_PRODUCTION_LIMIT - t,
|
||||||
|
);
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly logger: Logger,
|
private readonly logger: Logger,
|
||||||
@@ -46,19 +51,17 @@ export class ConcurrencyControlService {
|
|||||||
|
|
||||||
this.isEnabled = true;
|
this.isEnabled = true;
|
||||||
|
|
||||||
this.productionQueue.on(
|
this.productionQueue.on('concurrency-check', ({ capacity }: { capacity: number }) => {
|
||||||
'execution-throttled',
|
if (this.shouldReport(capacity)) {
|
||||||
async ({ executionId, capacity }: { executionId: string; capacity: number }) => {
|
void this.telemetry.track('User hit concurrency limit', {
|
||||||
this.log('Execution throttled', { executionId });
|
threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
/**
|
this.productionQueue.on('execution-throttled', ({ executionId }: { executionId: string }) => {
|
||||||
* Temporary until base data for cloud plans is collected.
|
this.log('Execution throttled', { executionId });
|
||||||
*/
|
});
|
||||||
if (this.shouldReport(capacity)) {
|
|
||||||
await this.telemetry.track('User hit concurrency limit', { threshold: capacity });
|
|
||||||
}
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
this.productionQueue.on('execution-released', async (executionId: string) => {
|
this.productionQueue.on('execution-released', async (executionId: string) => {
|
||||||
this.log('Execution released', { executionId });
|
this.log('Execution released', { executionId });
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
import { EventEmitter } from 'node:events';
|
import { EventEmitter } from 'node:events';
|
||||||
|
import debounce from 'lodash/debounce';
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class ConcurrencyQueue extends EventEmitter {
|
export class ConcurrencyQueue extends EventEmitter {
|
||||||
@@ -15,14 +16,20 @@ export class ConcurrencyQueue extends EventEmitter {
|
|||||||
async enqueue(executionId: string) {
|
async enqueue(executionId: string) {
|
||||||
this.capacity--;
|
this.capacity--;
|
||||||
|
|
||||||
|
this.debouncedEmit('concurrency-check', { capacity: this.capacity });
|
||||||
|
|
||||||
if (this.capacity < 0) {
|
if (this.capacity < 0) {
|
||||||
this.emit('execution-throttled', { executionId, capacity: this.capacity });
|
this.emit('execution-throttled', { executionId });
|
||||||
|
|
||||||
// eslint-disable-next-line @typescript-eslint/return-await
|
// eslint-disable-next-line @typescript-eslint/return-await
|
||||||
return new Promise<void>((resolve) => this.queue.push({ executionId, resolve }));
|
return new Promise<void>((resolve) => this.queue.push({ executionId, resolve }));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get currentCapacity() {
|
||||||
|
return this.capacity;
|
||||||
|
}
|
||||||
|
|
||||||
dequeue() {
|
dequeue() {
|
||||||
this.capacity++;
|
this.capacity++;
|
||||||
|
|
||||||
@@ -56,4 +63,9 @@ export class ConcurrencyQueue extends EventEmitter {
|
|||||||
|
|
||||||
resolve();
|
resolve();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private debouncedEmit = debounce(
|
||||||
|
(event: string, payload: object) => this.emit(event, payload),
|
||||||
|
300,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user