refactor(core): Separate listeners in scaling service (no-changelog) (#10487)

This commit is contained in:
Iván Ovejero
2024-08-26 12:35:39 +02:00
committed by GitHub
parent 96e69ad5f2
commit 352aa2a9a4
9 changed files with 149 additions and 210 deletions

View File

@@ -13,11 +13,11 @@ import type {
Job,
JobData,
JobOptions,
JobMessage,
JobStatus,
JobId,
QueueRecoveryContext,
} from './types';
PubSubMessage,
} from './scaling.types';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { GlobalConfig } from '@n8n/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
@@ -71,6 +71,7 @@ export class ScalingService {
setupWorker(concurrency: number) {
this.assertWorker();
this.assertQueue();
void this.queue.process(
JOB_TYPE_NAME,
@@ -161,22 +162,6 @@ export class ScalingService {
// #region Listeners
private registerListeners() {
this.queue.on('global:progress', (_jobId: JobId, msg: JobMessage) => {
if (msg.kind === 'respond-to-webhook') {
const { executionId, response } = msg;
this.activeExecutions.resolveResponsePromise(
executionId,
this.decodeWebhookResponse(response),
);
}
});
this.queue.on('global:progress', (jobId: JobId, msg: JobMessage) => {
if (msg.kind === 'abort-job') {
this.jobProcessor.stopJob(jobId);
}
});
let latestAttemptTs = 0;
let cumulativeTimeoutMs = 0;
@@ -210,10 +195,28 @@ export class ScalingService {
return;
}
if (
this.instanceType === 'worker' &&
error.message.includes('job stalled more than maxStalledCount')
) {
throw error;
});
if (this.instanceType === 'main') {
this.registerMainListeners();
} else if (this.instanceType === 'worker') {
this.registerWorkerListeners();
}
}
/**
* Register listeners on a `worker` process for Bull queue events.
*/
private registerWorkerListeners() {
this.queue.on('global:progress', (jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return;
if (msg.kind === 'abort-job') this.jobProcessor.stopJob(jobId);
});
this.queue.on('error', (error: Error) => {
if (error.message.includes('job stalled more than maxStalledCount')) {
throw new MaxStalledCountError(error);
}
@@ -221,10 +224,7 @@ export class ScalingService {
* Non-recoverable error on worker start with Redis unavailable.
* Even if Redis recovers, worker will remain unable to process jobs.
*/
if (
this.instanceType === 'worker' &&
error.message.includes('Error initializing Lua scripts')
) {
if (error.message.includes('Error initializing Lua scripts')) {
this.logger.error('[ScalingService] Fatal error initializing worker', { error });
this.logger.error('[ScalingService] Exiting process...');
process.exit(1);
@@ -234,6 +234,24 @@ export class ScalingService {
});
}
/**
* Register listeners on a `main` process for Bull queue events.
*/
private registerMainListeners() {
this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return;
if (msg.kind === 'respond-to-webhook') {
const decodedResponse = this.decodeWebhookResponse(msg.response);
this.activeExecutions.resolveResponsePromise(msg.executionId, decodedResponse);
}
});
}
private isPubSubMessage(candidate: unknown): candidate is PubSubMessage {
return typeof candidate === 'object' && candidate !== null && 'kind' in candidate;
}
// #endregion
private decodeWebhookResponse(
@@ -252,6 +270,12 @@ export class ScalingService {
return response;
}
private assertQueue() {
if (this.queue) return;
throw new ApplicationError('This method must be called after `setupQueue`');
}
private assertWorker() {
if (this.instanceType === 'worker') return;
@@ -265,7 +289,7 @@ export class ScalingService {
waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000,
};
scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) {
private scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) {
this.queueRecoveryContext.timeout = setTimeout(async () => {
try {
const nextWaitMs = await this.recoverFromQueue();
@@ -285,7 +309,7 @@ export class ScalingService {
this.logger.debug(`[ScalingService] Scheduled queue recovery check for next ${wait}`);
}
stopQueueRecovery() {
private stopQueueRecovery() {
clearTimeout(this.queueRecoveryContext.timeout);
}