fix(core): Ensure queue is ready when enqueueing (#16098)

This commit is contained in:
Iván Ovejero
2025-06-10 10:09:26 +02:00
committed by GitHub
parent 07ed01d9fc
commit 25567f6f0e
3 changed files with 41 additions and 1 deletions

View File

@@ -1,6 +1,6 @@
import type { User } from '@n8n/db'; import type { User } from '@n8n/db';
import type { ExecutionEntity } from '@n8n/db'; import type { ExecutionEntity } from '@n8n/db';
import { Container } from '@n8n/di'; import { Container, Service } from '@n8n/di';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import { DirectedGraph, WorkflowExecute } from 'n8n-core'; import { DirectedGraph, WorkflowExecute } from 'n8n-core';
import * as core from 'n8n-core'; import * as core from 'n8n-core';
@@ -258,3 +258,39 @@ describe('run', () => {
); );
}); });
}); });
describe('enqueueExecution', () => {
const setupQueue = jest.fn();
@Service()
class MockScalingService {
setupQueue = setupQueue;
addJob = jest.fn();
}
beforeAll(() => {
jest.mock('@/scaling/scaling.service', () => ({
ScalingService: MockScalingService,
}));
});
afterAll(() => {
jest.unmock('@/scaling/scaling.service');
});
it('should setup queue when scalingService is not initialized', async () => {
const activeExecutions = Container.get(ActiveExecutions);
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue();
jest.spyOn(runner, 'processError').mockResolvedValue();
const data = mock<IWorkflowExecutionDataProcess>({
workflowData: { nodes: [] },
executionData: undefined,
});
// @ts-expect-error Private method
await runner.enqueueExecution('1', data);
expect(setupQueue).toHaveBeenCalledTimes(1);
});
});

View File

@@ -57,6 +57,9 @@ export class ScalingService {
async setupQueue() { async setupQueue() {
const { default: BullQueue } = await import('bull'); const { default: BullQueue } = await import('bull');
const { RedisClientService } = await import('@/services/redis-client.service'); const { RedisClientService } = await import('@/services/redis-client.service');
if (this.queue) return;
const service = Container.get(RedisClientService); const service = Container.get(RedisClientService);
const bullPrefix = this.globalConfig.queue.bull.prefix; const bullPrefix = this.globalConfig.queue.bull.prefix;

View File

@@ -348,6 +348,7 @@ export class WorkflowRunner {
if (!this.scalingService) { if (!this.scalingService) {
const { ScalingService } = await import('@/scaling/scaling.service'); const { ScalingService } = await import('@/scaling/scaling.service');
this.scalingService = Container.get(ScalingService); this.scalingService = Container.get(ScalingService);
await this.scalingService.setupQueue();
} }
// TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds. // TODO: For realtime jobs should probably also not do retry or not retry if they are older than x seconds.