mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
refactor(core): Centralize scaling mode (no-changelog) (#9835)
This commit is contained in:
213
packages/cli/src/scaling/scaling.service.ts
Normal file
213
packages/cli/src/scaling/scaling.service.ts
Normal file
@@ -0,0 +1,213 @@
|
||||
import Container, { Service } from 'typedi';
|
||||
import { ApplicationError, BINARY_ENCODING } from 'n8n-workflow';
|
||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||
import config from '@/config';
|
||||
import { Logger } from '@/Logger';
|
||||
import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
|
||||
import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants';
|
||||
import { OnShutdown } from '@/decorators/OnShutdown';
|
||||
import { JOB_TYPE_NAME, QUEUE_NAME } from './constants';
|
||||
import { JobProcessor } from './job-processor';
|
||||
import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types';
|
||||
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||
|
||||
@Service()
|
||||
export class ScalingService {
|
||||
private queue: JobQueue;
|
||||
|
||||
private readonly instanceType = config.getEnv('generic.instanceType');
|
||||
|
||||
constructor(
|
||||
private readonly logger: Logger,
|
||||
private readonly activeExecutions: ActiveExecutions,
|
||||
private readonly jobProcessor: JobProcessor,
|
||||
) {}
|
||||
|
||||
// #region Lifecycle
|
||||
|
||||
async setupQueue() {
|
||||
const { default: BullQueue } = await import('bull');
|
||||
const { RedisClientService } = await import('@/services/redis/redis-client.service');
|
||||
const service = Container.get(RedisClientService);
|
||||
|
||||
const bullPrefix = config.getEnv('queue.bull.prefix');
|
||||
const prefix = service.toValidPrefix(bullPrefix);
|
||||
|
||||
this.queue = new BullQueue(QUEUE_NAME, {
|
||||
prefix,
|
||||
settings: config.get('queue.bull.settings'),
|
||||
createClient: (type) => service.createClient({ type: `${type}(bull)` }),
|
||||
});
|
||||
|
||||
this.registerListeners();
|
||||
|
||||
this.logger.debug('[ScalingService] Queue setup completed');
|
||||
}
|
||||
|
||||
setupWorker(concurrency: number) {
|
||||
this.assertWorker();
|
||||
|
||||
void this.queue.process(
|
||||
JOB_TYPE_NAME,
|
||||
concurrency,
|
||||
async (job: Job) => await this.jobProcessor.processJob(job),
|
||||
);
|
||||
|
||||
this.logger.debug('[ScalingService] Worker setup completed');
|
||||
}
|
||||
|
||||
@OnShutdown(HIGHEST_SHUTDOWN_PRIORITY)
|
||||
async pauseQueue() {
|
||||
await this.queue.pause(true, true);
|
||||
|
||||
this.logger.debug('[ScalingService] Queue paused');
|
||||
}
|
||||
|
||||
async pingQueue() {
|
||||
await this.queue.client.ping();
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Jobs
|
||||
|
||||
async addJob(jobData: JobData, jobOptions: JobOptions) {
|
||||
const { executionId } = jobData;
|
||||
|
||||
const job = await this.queue.add(JOB_TYPE_NAME, jobData, jobOptions);
|
||||
|
||||
this.logger.info(`[ScalingService] Added job ${job.id} (execution ${executionId})`);
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
async getJob(jobId: JobId) {
|
||||
return await this.queue.getJob(jobId);
|
||||
}
|
||||
|
||||
async findJobsByStatus(statuses: JobStatus[]) {
|
||||
return await this.queue.getJobs(statuses);
|
||||
}
|
||||
|
||||
async stopJob(job: Job) {
|
||||
const props = { jobId: job.id, executionId: job.data.executionId };
|
||||
|
||||
try {
|
||||
if (await job.isActive()) {
|
||||
await job.progress({ kind: 'abort-job' });
|
||||
this.logger.debug('[ScalingService] Stopped active job', props);
|
||||
return true;
|
||||
}
|
||||
|
||||
await job.remove();
|
||||
this.logger.debug('[ScalingService] Stopped inactive job', props);
|
||||
return true;
|
||||
} catch (error: unknown) {
|
||||
await job.progress({ kind: 'abort-job' });
|
||||
this.logger.error('[ScalingService] Failed to stop job', { ...props, error });
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
// #region Listeners
|
||||
|
||||
private registerListeners() {
|
||||
this.queue.on('global:progress', (_jobId: JobId, msg: JobMessage) => {
|
||||
if (msg.kind === 'respond-to-webhook') {
|
||||
const { executionId, response } = msg;
|
||||
this.activeExecutions.resolveResponsePromise(
|
||||
executionId,
|
||||
this.decodeWebhookResponse(response),
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
this.queue.on('global:progress', (jobId: JobId, msg: JobMessage) => {
|
||||
if (msg.kind === 'abort-job') {
|
||||
this.jobProcessor.stopJob(jobId);
|
||||
}
|
||||
});
|
||||
|
||||
let latestAttemptTs = 0;
|
||||
let cumulativeTimeoutMs = 0;
|
||||
|
||||
const MAX_TIMEOUT_MS = config.getEnv('queue.bull.redis.timeoutThreshold');
|
||||
const RESET_LENGTH_MS = 30_000;
|
||||
|
||||
this.queue.on('error', (error: Error) => {
|
||||
this.logger.error('[ScalingService] Queue errored', { error });
|
||||
|
||||
/**
|
||||
* On Redis connection failure, try to reconnect. On every failed attempt,
|
||||
* increment a cumulative timeout - if this exceeds a limit, exit the
|
||||
* process. Reset the cumulative timeout if >30s between retries.
|
||||
*/
|
||||
if (error.message.includes('ECONNREFUSED')) {
|
||||
const nowTs = Date.now();
|
||||
if (nowTs - latestAttemptTs > RESET_LENGTH_MS) {
|
||||
latestAttemptTs = nowTs;
|
||||
cumulativeTimeoutMs = 0;
|
||||
} else {
|
||||
cumulativeTimeoutMs += nowTs - latestAttemptTs;
|
||||
latestAttemptTs = nowTs;
|
||||
if (cumulativeTimeoutMs > MAX_TIMEOUT_MS) {
|
||||
this.logger.error('[ScalingService] Redis unavailable after max timeout');
|
||||
this.logger.error('[ScalingService] Exiting process...');
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.warn('[ScalingService] Redis unavailable - retrying to connect...');
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
this.instanceType === 'worker' &&
|
||||
error.message.includes('job stalled more than maxStalledCount')
|
||||
) {
|
||||
throw new MaxStalledCountError(error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Non-recoverable error on worker start with Redis unavailable.
|
||||
* Even if Redis recovers, worker will remain unable to process jobs.
|
||||
*/
|
||||
if (
|
||||
this.instanceType === 'worker' &&
|
||||
error.message.includes('Error initializing Lua scripts')
|
||||
) {
|
||||
this.logger.error('[ScalingService] Fatal error initializing worker', { error });
|
||||
this.logger.error('[ScalingService] Exiting process...');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
throw error;
|
||||
});
|
||||
}
|
||||
|
||||
// #endregion
|
||||
|
||||
private decodeWebhookResponse(
|
||||
response: IExecuteResponsePromiseData,
|
||||
): IExecuteResponsePromiseData {
|
||||
if (
|
||||
typeof response === 'object' &&
|
||||
typeof response.body === 'object' &&
|
||||
response.body !== null &&
|
||||
'__@N8nEncodedBuffer@__' in response.body &&
|
||||
typeof response.body['__@N8nEncodedBuffer@__'] === 'string'
|
||||
) {
|
||||
response.body = Buffer.from(response.body['__@N8nEncodedBuffer@__'], BINARY_ENCODING);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
private assertWorker() {
|
||||
if (this.instanceType === 'worker') return;
|
||||
|
||||
throw new ApplicationError('This method must be called on a `worker` instance');
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user