refactor(core): Port concurrency config (#18324)

This commit is contained in:
Iván Ovejero
2025-08-15 10:47:11 +02:00
committed by GitHub
parent 2cbea86fec
commit 9cc1b11f7f
7 changed files with 71 additions and 51 deletions

View File

@@ -11,6 +11,21 @@ class PruningIntervalsConfig {
softDelete: number = 60; softDelete: number = 60;
} }
@Config
class ConcurrencyConfig {
/**
* Max production executions allowed to run concurrently. `-1` means unlimited.
*
* Default for scaling mode is taken from the worker's `--concurrency` flag.
*/
@Env('N8N_CONCURRENCY_PRODUCTION_LIMIT')
productionLimit: number = -1;
/** Max evaluation executions allowed to run concurrently. `-1` means unlimited. */
@Env('N8N_CONCURRENCY_EVALUATION_LIMIT')
evaluationLimit: number = -1;
}
@Config @Config
export class ExecutionsConfig { export class ExecutionsConfig {
/** Whether to delete past executions on a rolling basis. */ /** Whether to delete past executions on a rolling basis. */
@@ -38,4 +53,7 @@ export class ExecutionsConfig {
@Nested @Nested
pruneDataIntervals: PruningIntervalsConfig; pruneDataIntervals: PruningIntervalsConfig;
@Nested
concurrency: ConcurrencyConfig;
} }

View File

@@ -310,6 +310,10 @@ describe('GlobalConfig', () => {
hardDelete: 15, hardDelete: 15,
softDelete: 60, softDelete: 60,
}, },
concurrency: {
productionLimit: -1,
evaluationLimit: -1,
},
}, },
diagnostics: { diagnostics: {
enabled: true, enabled: true,

View File

@@ -136,7 +136,7 @@ export class Worker extends BaseCommand<z.infer<typeof flagsSchema>> {
async setConcurrency() { async setConcurrency() {
const { flags } = this; const { flags } = this;
const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); const envConcurrency = this.globalConfig.executions.concurrency.productionLimit;
this.concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; this.concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency;

View File

@@ -22,11 +22,18 @@ describe('ConcurrencyControlService', () => {
const executionRepository = mock<ExecutionRepository>(); const executionRepository = mock<ExecutionRepository>();
const telemetry = mock<Telemetry>(); const telemetry = mock<Telemetry>();
const eventService = mock<EventService>(); const eventService = mock<EventService>();
const globalConfig = mock<GlobalConfig>(); const globalConfig = mock<GlobalConfig>({
executions: {
concurrency: {
productionLimit: -1,
evaluationLimit: -1,
},
},
});
afterEach(() => { afterEach(() => {
config.set('executions.concurrency.productionLimit', -1); globalConfig.executions.concurrency.productionLimit = -1;
config.set('executions.concurrency.evaluationLimit', -1); globalConfig.executions.concurrency.evaluationLimit = -1;
config.set('executions.mode', 'integrated'); config.set('executions.mode', 'integrated');
jest.clearAllMocks(); jest.clearAllMocks();
@@ -39,7 +46,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set(`executions.concurrency.${type}Limit`, 1); // @ts-expect-error Testing
globalConfig.executions.concurrency[type + 'Limit'] = 1;
/** /**
* Act * Act
@@ -70,7 +78,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set(`executions.concurrency.${type}Limit`, 0); // @ts-expect-error Testing
globalConfig.executions.concurrency[type + 'Limit'] = 0;
try { try {
/** /**
@@ -96,8 +105,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', -1); globalConfig.executions.concurrency.productionLimit = -1;
config.set('executions.concurrency.evaluationLimit', -1); globalConfig.executions.concurrency.evaluationLimit = -1;
/** /**
* Act * Act
@@ -123,7 +132,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set(`executions.concurrency.${type}Limit`, -2); // @ts-expect-error Testing
globalConfig.executions.concurrency[type + 'Limit'] = -2;
/** /**
* Act * Act
@@ -149,7 +159,7 @@ describe('ConcurrencyControlService', () => {
* Arrange * Arrange
*/ */
config.set('executions.mode', 'queue'); config.set('executions.mode', 'queue');
config.set('executions.concurrency.productionLimit', 2); globalConfig.executions.concurrency.productionLimit = 2;
/** /**
* Act * Act
@@ -182,7 +192,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); globalConfig.executions.concurrency.productionLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -209,7 +219,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); globalConfig.executions.concurrency.productionLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -235,7 +245,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.evaluationLimit', 1); globalConfig.executions.concurrency.evaluationLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -265,7 +275,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); globalConfig.executions.concurrency.evaluationLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -292,7 +302,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); globalConfig.executions.concurrency.productionLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -318,7 +328,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.evaluationLimit', 1); globalConfig.executions.concurrency.evaluationLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -348,7 +358,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); globalConfig.executions.concurrency.productionLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -377,7 +387,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 1); globalConfig.executions.concurrency.productionLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -404,7 +414,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.evaluationLimit', 1); globalConfig.executions.concurrency.evaluationLimit = 1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -434,7 +444,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set(`executions.concurrency.${type}Limit`, 2); // @ts-expect-error Testing
globalConfig.executions.concurrency[type + 'Limit'] = 2;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -470,8 +481,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 2); globalConfig.executions.concurrency.productionLimit = 2;
config.set('executions.concurrency.evaluationLimit', 2); globalConfig.executions.concurrency.evaluationLimit = 2;
/** /**
* Act * Act
@@ -497,8 +508,8 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', 2); globalConfig.executions.concurrency.productionLimit = 2;
config.set('executions.concurrency.evaluationLimit', 2); globalConfig.executions.concurrency.evaluationLimit = 2;
/** /**
* Act * Act
@@ -532,7 +543,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', -1); globalConfig.executions.concurrency.productionLimit = -1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -559,7 +570,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.evaluationLimit', -1); globalConfig.executions.concurrency.evaluationLimit = -1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -588,7 +599,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', -1); globalConfig.executions.concurrency.evaluationLimit = -1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -614,7 +625,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.evaluationLimit', -1); globalConfig.executions.concurrency.evaluationLimit = -1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -642,7 +653,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', -1); globalConfig.executions.concurrency.productionLimit = -1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -668,7 +679,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.evaluationLimit', -1); globalConfig.executions.concurrency.evaluationLimit = -1;
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -704,7 +715,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); globalConfig.executions.concurrency.productionLimit = CLOUD_TEMP_PRODUCTION_LIMIT;
globalConfig.deployment.type = 'cloud'; globalConfig.deployment.type = 'cloud';
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -738,7 +749,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); globalConfig.executions.concurrency.productionLimit = CLOUD_TEMP_PRODUCTION_LIMIT;
globalConfig.deployment.type = 'cloud'; globalConfig.deployment.type = 'cloud';
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,
@@ -771,7 +782,7 @@ describe('ConcurrencyControlService', () => {
/** /**
* Arrange * Arrange
*/ */
config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); globalConfig.executions.concurrency.productionLimit = CLOUD_TEMP_PRODUCTION_LIMIT;
globalConfig.deployment.type = 'cloud'; globalConfig.deployment.type = 'cloud';
const service = new ConcurrencyControlService( const service = new ConcurrencyControlService(
logger, logger,

View File

@@ -39,9 +39,11 @@ export class ConcurrencyControlService {
) { ) {
this.logger = this.logger.scoped('concurrency'); this.logger = this.logger.scoped('concurrency');
const { productionLimit, evaluationLimit } = this.globalConfig.executions.concurrency;
this.limits = new Map([ this.limits = new Map([
['production', config.getEnv('executions.concurrency.productionLimit')], ['production', productionLimit],
['evaluation', config.getEnv('executions.concurrency.evaluationLimit')], ['evaluation', evaluationLimit],
]); ]);
this.limits.forEach((limit, type) => { this.limits.forEach((limit, type) => {

View File

@@ -10,21 +10,6 @@ export const schema = {
env: 'EXECUTIONS_MODE', env: 'EXECUTIONS_MODE',
}, },
concurrency: {
productionLimit: {
doc: "Max production executions allowed to run concurrently, in main process for regular mode and in worker for queue mode. Default for main mode is `-1` (disabled). Default for queue mode is taken from the worker's `--concurrency` flag.",
format: Number,
default: -1,
env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT',
},
evaluationLimit: {
doc: 'Max evaluation executions allowed to run concurrently.',
format: Number,
default: -1,
env: 'N8N_CONCURRENCY_EVALUATION_LIMIT',
},
},
// A Workflow times out and gets canceled after this time (seconds). // A Workflow times out and gets canceled after this time (seconds).
// If the workflow is executed in the main process a soft timeout // If the workflow is executed in the main process a soft timeout
// is executed (takes effect after the current node finishes). // is executed (takes effect after the current node finishes).

View File

@@ -129,7 +129,7 @@ export class FrontendService {
binaryDataMode: this.binaryDataConfig.mode, binaryDataMode: this.binaryDataConfig.mode,
nodeJsVersion: process.version.replace(/^v/, ''), nodeJsVersion: process.version.replace(/^v/, ''),
versionCli: N8N_VERSION, versionCli: N8N_VERSION,
concurrency: config.getEnv('executions.concurrency.productionLimit'), concurrency: this.globalConfig.executions.concurrency.productionLimit,
authCookie: { authCookie: {
secure: this.globalConfig.auth.cookie.secure, secure: this.globalConfig.auth.cookie.secure,
}, },