refactor(core): Port queue recovery config (#18396)

This commit is contained in:
Iván Ovejero
2025-08-18 09:08:03 +02:00
committed by GitHub
parent 18e32fe774
commit 58aad35592
5 changed files with 26 additions and 18 deletions

View File

@@ -26,6 +26,17 @@ class ConcurrencyConfig {
evaluationLimit: number = -1;
}
@Config
class QueueRecoveryConfig {
/** How often (minutes) to check for queue recovery. */
@Env('N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL')
interval: number = 180;
/** Size of batch of executions to check for queue recovery. */
@Env('N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH')
batchSize: number = 100;
}
@Config
export class ExecutionsConfig {
/** Whether to delete past executions on a rolling basis. */
@@ -56,4 +67,7 @@ export class ExecutionsConfig {
@Nested
concurrency: ConcurrencyConfig;
@Nested
queueRecovery: QueueRecoveryConfig;
}

View File

@@ -314,6 +314,10 @@ describe('GlobalConfig', () => {
productionLimit: -1,
evaluationLimit: -1,
},
queueRecovery: {
interval: 180,
batchSize: 100,
},
},
diagnostics: {
enabled: true,

View File

@@ -70,21 +70,6 @@ export const schema = {
default: true,
env: 'EXECUTIONS_DATA_SAVE_MANUAL_EXECUTIONS',
},
queueRecovery: {
interval: {
doc: 'How often (minutes) to check for queue recovery',
format: Number,
default: 180,
env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL',
},
batchSize: {
doc: 'Size of batch of executions to check for queue recovery',
format: Number,
default: 100,
env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH',
},
},
},
userManagement: {

View File

@@ -44,6 +44,12 @@ describe('ScalingService', () => {
queueMetricsInterval: 20,
},
},
executions: {
queueRecovery: {
interval: 180,
batchSize: 100,
},
},
});
const instanceSettings = Container.get(InstanceSettings);

View File

@@ -17,7 +17,6 @@ import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import assert, { strict } from 'node:assert';
import { ActiveExecutions } from '@/active-executions';
import config from '@/config';
import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants';
import { EventService } from '@/events/event.service';
import { assertNever } from '@/utils';
@@ -441,8 +440,8 @@ export class ScalingService {
// #region Queue recovery
private readonly queueRecoveryContext: QueueRecoveryContext = {
batchSize: config.getEnv('executions.queueRecovery.batchSize'),
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
batchSize: this.globalConfig.executions.queueRecovery.batchSize,
waitMs: this.globalConfig.executions.queueRecovery.interval * 60 * 1000,
};
@OnLeaderTakeover()