diff --git a/packages/@n8n/config/src/configs/executions.config.ts b/packages/@n8n/config/src/configs/executions.config.ts index 977d539920..fea9d89501 100644 --- a/packages/@n8n/config/src/configs/executions.config.ts +++ b/packages/@n8n/config/src/configs/executions.config.ts @@ -11,6 +11,21 @@ class PruningIntervalsConfig { softDelete: number = 60; } +@Config +class ConcurrencyConfig { + /** + * Max production executions allowed to run concurrently. `-1` means unlimited. + * + * Default for scaling mode is taken from the worker's `--concurrency` flag. + */ + @Env('N8N_CONCURRENCY_PRODUCTION_LIMIT') + productionLimit: number = -1; + + /** Max evaluation executions allowed to run concurrently. `-1` means unlimited. */ + @Env('N8N_CONCURRENCY_EVALUATION_LIMIT') + evaluationLimit: number = -1; +} + @Config export class ExecutionsConfig { /** Whether to delete past executions on a rolling basis. */ @@ -38,4 +53,7 @@ export class ExecutionsConfig { @Nested pruneDataIntervals: PruningIntervalsConfig; + + @Nested + concurrency: ConcurrencyConfig; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index 740d0b1553..3e9f5b5737 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -310,6 +310,10 @@ describe('GlobalConfig', () => { hardDelete: 15, softDelete: 60, }, + concurrency: { + productionLimit: -1, + evaluationLimit: -1, + }, }, diagnostics: { enabled: true, diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 572a9c1036..a705b95597 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -136,7 +136,7 @@ export class Worker extends BaseCommand> { async setConcurrency() { const { flags } = this; - const envConcurrency = config.getEnv('executions.concurrency.productionLimit'); + const envConcurrency = this.globalConfig.executions.concurrency.productionLimit; this.concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency; diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index adcd2aef00..82c184032d 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -22,11 +22,18 @@ describe('ConcurrencyControlService', () => { const executionRepository = mock(); const telemetry = mock(); const eventService = mock(); - const globalConfig = mock(); + const globalConfig = mock({ + executions: { + concurrency: { + productionLimit: -1, + evaluationLimit: -1, + }, + }, + }); afterEach(() => { - config.set('executions.concurrency.productionLimit', -1); - config.set('executions.concurrency.evaluationLimit', -1); + globalConfig.executions.concurrency.productionLimit = -1; + globalConfig.executions.concurrency.evaluationLimit = -1; config.set('executions.mode', 'integrated'); jest.clearAllMocks(); @@ -39,7 +46,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set(`executions.concurrency.${type}Limit`, 1); + // @ts-expect-error Testing + globalConfig.executions.concurrency[type + 'Limit'] = 1; /** * Act @@ -70,7 +78,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set(`executions.concurrency.${type}Limit`, 0); + // @ts-expect-error Testing + globalConfig.executions.concurrency[type + 'Limit'] = 0; try { /** @@ -96,8 +105,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', -1); - config.set('executions.concurrency.evaluationLimit', -1); + globalConfig.executions.concurrency.productionLimit = -1; + globalConfig.executions.concurrency.evaluationLimit = -1; /** * Act @@ -123,7 +132,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set(`executions.concurrency.${type}Limit`, -2); + // @ts-expect-error Testing + globalConfig.executions.concurrency[type + 'Limit'] = -2; /** * Act @@ -149,7 +159,7 @@ describe('ConcurrencyControlService', () => { * Arrange */ config.set('executions.mode', 'queue'); - config.set('executions.concurrency.productionLimit', 2); + globalConfig.executions.concurrency.productionLimit = 2; /** * Act @@ -182,7 +192,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 1); + globalConfig.executions.concurrency.productionLimit = 1; const service = new ConcurrencyControlService( logger, @@ -209,7 +219,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 1); + globalConfig.executions.concurrency.productionLimit = 1; const service = new ConcurrencyControlService( logger, @@ -235,7 +245,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.evaluationLimit', 1); + globalConfig.executions.concurrency.evaluationLimit = 1; const service = new ConcurrencyControlService( logger, @@ -265,7 +275,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 1); + globalConfig.executions.concurrency.evaluationLimit = 1; const service = new ConcurrencyControlService( logger, @@ -292,7 +302,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 1); + globalConfig.executions.concurrency.productionLimit = 1; const service = new ConcurrencyControlService( logger, @@ -318,7 +328,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.evaluationLimit', 1); + globalConfig.executions.concurrency.evaluationLimit = 1; const service = new ConcurrencyControlService( logger, @@ -348,7 +358,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 1); + globalConfig.executions.concurrency.productionLimit = 1; const service = new ConcurrencyControlService( logger, @@ -377,7 +387,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 1); + globalConfig.executions.concurrency.productionLimit = 1; const service = new ConcurrencyControlService( logger, @@ -404,7 +414,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.evaluationLimit', 1); + globalConfig.executions.concurrency.evaluationLimit = 1; const service = new ConcurrencyControlService( logger, @@ -434,7 +444,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set(`executions.concurrency.${type}Limit`, 2); + // @ts-expect-error Testing + globalConfig.executions.concurrency[type + 'Limit'] = 2; const service = new ConcurrencyControlService( logger, @@ -470,8 +481,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 2); - config.set('executions.concurrency.evaluationLimit', 2); + globalConfig.executions.concurrency.productionLimit = 2; + globalConfig.executions.concurrency.evaluationLimit = 2; /** * Act @@ -497,8 +508,8 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', 2); - config.set('executions.concurrency.evaluationLimit', 2); + globalConfig.executions.concurrency.productionLimit = 2; + globalConfig.executions.concurrency.evaluationLimit = 2; /** * Act @@ -532,7 +543,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', -1); + globalConfig.executions.concurrency.productionLimit = -1; const service = new ConcurrencyControlService( logger, @@ -559,7 +570,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.evaluationLimit', -1); + globalConfig.executions.concurrency.evaluationLimit = -1; const service = new ConcurrencyControlService( logger, @@ -588,7 +599,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', -1); + globalConfig.executions.concurrency.evaluationLimit = -1; const service = new ConcurrencyControlService( logger, @@ -614,7 +625,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.evaluationLimit', -1); + globalConfig.executions.concurrency.evaluationLimit = -1; const service = new ConcurrencyControlService( logger, @@ -642,7 +653,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', -1); + globalConfig.executions.concurrency.productionLimit = -1; const service = new ConcurrencyControlService( logger, @@ -668,7 +679,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.evaluationLimit', -1); + globalConfig.executions.concurrency.evaluationLimit = -1; const service = new ConcurrencyControlService( logger, @@ -704,7 +715,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); + globalConfig.executions.concurrency.productionLimit = CLOUD_TEMP_PRODUCTION_LIMIT; globalConfig.deployment.type = 'cloud'; const service = new ConcurrencyControlService( logger, @@ -738,7 +749,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); + globalConfig.executions.concurrency.productionLimit = CLOUD_TEMP_PRODUCTION_LIMIT; globalConfig.deployment.type = 'cloud'; const service = new ConcurrencyControlService( logger, @@ -771,7 +782,7 @@ describe('ConcurrencyControlService', () => { /** * Arrange */ - config.set('executions.concurrency.productionLimit', CLOUD_TEMP_PRODUCTION_LIMIT); + globalConfig.executions.concurrency.productionLimit = CLOUD_TEMP_PRODUCTION_LIMIT; globalConfig.deployment.type = 'cloud'; const service = new ConcurrencyControlService( logger, diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index db26032cfc..ec38f92a14 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -39,9 +39,11 @@ export class ConcurrencyControlService { ) { this.logger = this.logger.scoped('concurrency'); + const { productionLimit, evaluationLimit } = this.globalConfig.executions.concurrency; + this.limits = new Map([ - ['production', config.getEnv('executions.concurrency.productionLimit')], - ['evaluation', config.getEnv('executions.concurrency.evaluationLimit')], + ['production', productionLimit], + ['evaluation', evaluationLimit], ]); this.limits.forEach((limit, type) => { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 9ba5cb9387..8846d6d822 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -10,21 +10,6 @@ export const schema = { env: 'EXECUTIONS_MODE', }, - concurrency: { - productionLimit: { - doc: "Max production executions allowed to run concurrently, in main process for regular mode and in worker for queue mode. Default for main mode is `-1` (disabled). Default for queue mode is taken from the worker's `--concurrency` flag.", - format: Number, - default: -1, - env: 'N8N_CONCURRENCY_PRODUCTION_LIMIT', - }, - evaluationLimit: { - doc: 'Max evaluation executions allowed to run concurrently.', - format: Number, - default: -1, - env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', - }, - }, - // A Workflow times out and gets canceled after this time (seconds). // If the workflow is executed in the main process a soft timeout // is executed (takes effect after the current node finishes). diff --git a/packages/cli/src/services/frontend.service.ts b/packages/cli/src/services/frontend.service.ts index cb0fff329f..49da7d2036 100644 --- a/packages/cli/src/services/frontend.service.ts +++ b/packages/cli/src/services/frontend.service.ts @@ -129,7 +129,7 @@ export class FrontendService { binaryDataMode: this.binaryDataConfig.mode, nodeJsVersion: process.version.replace(/^v/, ''), versionCli: N8N_VERSION, - concurrency: config.getEnv('executions.concurrency.productionLimit'), + concurrency: this.globalConfig.executions.concurrency.productionLimit, authCookie: { secure: this.globalConfig.auth.cookie.secure, },