fix(core): Separate error handlers for main and worker (#11091)

This commit is contained in:
Iván Ovejero
2024-10-04 16:38:48 +02:00
committed by GitHub
parent 722f4a8b77
commit bb59cc71ac

View File

@@ -203,14 +203,6 @@ export class ScalingService {
// #region Listeners
private registerListeners() {
this.queue.on('error', (error: Error) => {
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
this.logger.error('[ScalingService] Queue errored', { error });
throw error;
});
const { instanceType } = this.instanceSettings;
if (instanceType === 'main' || instanceType === 'webhook') {
this.registerMainOrWebhookListeners();
@@ -230,6 +222,8 @@ export class ScalingService {
});
this.queue.on('error', (error: Error) => {
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
if (error.message.includes('job stalled more than maxStalledCount')) {
throw new MaxStalledCountError(error);
}
@@ -244,6 +238,8 @@ export class ScalingService {
process.exit(1);
}
this.logger.error('[ScalingService] Queue errored', { error });
throw error;
});
}
@@ -252,6 +248,14 @@ export class ScalingService {
* Register listeners on a `main` or `webhook` process for Bull queue events.
*/
private registerMainOrWebhookListeners() {
this.queue.on('error', (error: Error) => {
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
this.logger.error('[ScalingService] Queue errored', { error });
throw error;
});
this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => {
if (!this.isPubSubMessage(msg)) return;