mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
refactor(core): Move queue recovery to scaling service (no-changelog) (#10368)
This commit is contained in:
@@ -1,16 +1,28 @@
|
||||
import Container, { Service } from 'typedi';
|
||||
import { ApplicationError, BINARY_ENCODING, sleep } from 'n8n-workflow';
|
||||
import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify } from 'n8n-workflow';
|
||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||
import config from '@/config';
|
||||
import { Logger } from '@/Logger';
|
||||
import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants';
|
||||
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||
import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
|
||||
import { JobProcessor } from './job-processor';
|
||||
import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types';
|
||||
import type {
|
||||
JobQueue,
|
||||
Job,
|
||||
JobData,
|
||||
JobOptions,
|
||||
JobMessage,
|
||||
JobStatus,
|
||||
JobId,
|
||||
QueueRecoveryContext,
|
||||
} from './types';
|
||||
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||
import { GlobalConfig } from '@n8n/config';
|
||||
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||
import { InstanceSettings } from 'n8n-core';
|
||||
import { OrchestrationService } from '@/services/orchestration.service';
|
||||
|
||||
@Service()
|
||||
export class ScalingService {
|
||||
@@ -23,6 +35,9 @@ export class ScalingService {
|
||||
private readonly activeExecutions: ActiveExecutions,
|
||||
private readonly jobProcessor: JobProcessor,
|
||||
private readonly globalConfig: GlobalConfig,
|
||||
private readonly executionRepository: ExecutionRepository,
|
||||
private readonly instanceSettings: InstanceSettings,
|
||||
private readonly orchestrationService: OrchestrationService,
|
||||
) {}
|
||||
|
||||
// #region Lifecycle
|
||||
@@ -43,6 +58,14 @@ export class ScalingService {
|
||||
|
||||
this.registerListeners();
|
||||
|
||||
if (this.instanceSettings.isLeader) this.scheduleQueueRecovery();
|
||||
|
||||
if (this.orchestrationService.isMultiMainSetupEnabled) {
|
||||
this.orchestrationService.multiMainSetup
|
||||
.on('leader-takeover', () => this.scheduleQueueRecovery())
|
||||
.on('leader-stepdown', () => this.stopQueueRecovery());
|
||||
}
|
||||
|
||||
this.logger.debug('[ScalingService] Queue setup completed');
|
||||
}
|
||||
|
||||
@@ -64,6 +87,10 @@ export class ScalingService {
|
||||
|
||||
this.logger.debug('[ScalingService] Queue paused');
|
||||
|
||||
this.stopQueueRecovery();
|
||||
|
||||
this.logger.debug('[ScalingService] Queue recovery stopped');
|
||||
|
||||
let count = 0;
|
||||
|
||||
while (this.getRunningJobsCount() !== 0) {
|
||||
@@ -230,4 +257,86 @@ export class ScalingService {
|
||||
|
||||
throw new ApplicationError('This method must be called on a `worker` instance');
|
||||
}
|
||||
|
||||
// #region Queue recovery
|
||||
|
||||
private readonly queueRecoveryContext: QueueRecoveryContext = {
|
||||
batchSize: config.getEnv('executions.queueRecovery.batchSize'),
|
||||
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
|
||||
};
|
||||
|
||||
scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) {
|
||||
this.queueRecoveryContext.timeout = setTimeout(async () => {
|
||||
try {
|
||||
const nextWaitMs = await this.recoverFromQueue();
|
||||
this.scheduleQueueRecovery(nextWaitMs);
|
||||
} catch (error) {
|
||||
this.logger.error('[ScalingService] Failed to recover dangling executions from queue', {
|
||||
msg: this.toErrorMsg(error),
|
||||
});
|
||||
this.logger.error('[ScalingService] Retrying...');
|
||||
|
||||
this.scheduleQueueRecovery();
|
||||
}
|
||||
}, waitMs);
|
||||
|
||||
const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' ');
|
||||
|
||||
this.logger.debug(`[ScalingService] Scheduled queue recovery check for next ${wait}`);
|
||||
}
|
||||
|
||||
stopQueueRecovery() {
|
||||
clearTimeout(this.queueRecoveryContext.timeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
|
||||
* but absent from the queue. Return time until next recovery cycle.
|
||||
*/
|
||||
private async recoverFromQueue() {
|
||||
const { waitMs, batchSize } = this.queueRecoveryContext;
|
||||
|
||||
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
|
||||
|
||||
if (storedIds.length === 0) {
|
||||
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
const runningJobs = await this.findJobsByStatus(['active', 'waiting']);
|
||||
|
||||
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
|
||||
|
||||
if (queuedIds.size === 0) {
|
||||
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
|
||||
|
||||
if (danglingIds.length === 0) {
|
||||
this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions');
|
||||
return waitMs;
|
||||
}
|
||||
|
||||
await this.executionRepository.markAsCrashed(danglingIds);
|
||||
|
||||
this.logger.info(
|
||||
'[ScalingService] Completed queue recovery check, recovered dangling executions',
|
||||
{ danglingIds },
|
||||
);
|
||||
|
||||
// if this cycle used up the whole batch size, it is possible for there to be
|
||||
// dangling executions outside this check, so speed up next cycle
|
||||
|
||||
return storedIds.length >= this.queueRecoveryContext.batchSize ? waitMs / 2 : waitMs;
|
||||
}
|
||||
|
||||
private toErrorMsg(error: unknown) {
|
||||
return error instanceof Error
|
||||
? error.message
|
||||
: jsonStringify(error, { replaceCircularRefs: true });
|
||||
}
|
||||
|
||||
// #endregion
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user