diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 372d56da0c..a08b5f7569 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -1,5 +1,6 @@ import { mockLogger, mockInstance } from '@n8n/backend-test-utils'; import { GlobalConfig } from '@n8n/config'; +import type { ExecutionRepository } from '@n8n/db'; import { Container } from '@n8n/di'; import * as BullModule from 'bull'; import { mock } from 'jest-mock-extended'; @@ -54,6 +55,7 @@ describe('ScalingService', () => { const instanceSettings = Container.get(InstanceSettings); const jobProcessor = mock(); + const executionRepository = mock(); let scalingService: ScalingService; @@ -85,7 +87,7 @@ describe('ScalingService', () => { mock(), jobProcessor, globalConfig, - mock(), + executionRepository, instanceSettings, mock(), ); @@ -359,4 +361,36 @@ describe('ScalingService', () => { }); }); }); + + describe('recoverFromQueue', () => { + it('should mark running executions as crashed if they are missing from the queue and queue is empty', async () => { + await scalingService.setupQueue(); + executionRepository.getInProgressExecutionIds.mockResolvedValue(['123']); + queue.getJobs.mockResolvedValue([]); + + await scalingService.recoverFromQueue(); + + expect(executionRepository.markAsCrashed).toHaveBeenCalledWith(['123']); + }); + + it('should mark running executions as crashed if they are missing from the queue and queue is not empty', async () => { + await scalingService.setupQueue(); + executionRepository.getInProgressExecutionIds.mockResolvedValue(['123']); + queue.getJobs.mockResolvedValue([mock({ data: { executionId: '321' } })]); + + await scalingService.recoverFromQueue(); + + expect(executionRepository.markAsCrashed).toHaveBeenCalledWith(['123']); + }); + + it('should not mark running executions as crashed if they are present in the queue', async () => { + await scalingService.setupQueue(); + executionRepository.getInProgressExecutionIds.mockResolvedValue(['123']); + queue.getJobs.mockResolvedValue([mock({ data: { executionId: '123' } })]); + + await scalingService.recoverFromQueue(); + + expect(executionRepository.markAsCrashed).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index d705b0d100..5cc237d6ef 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -480,7 +480,7 @@ export class ScalingService { * Mark in-progress executions as `crashed` if stored in DB as `new` or `running` * but absent from the queue. Return time until next recovery cycle. */ - private async recoverFromQueue() { + async recoverFromQueue() { const { waitMs, batchSize } = this.queueRecoveryContext; const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize); @@ -491,16 +491,9 @@ export class ScalingService { } const runningJobs = await this.findJobsByStatus(['active', 'waiting']); - const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); - if (queuedIds.size === 0) { - this.logger.debug('Completed queue recovery check, no dangling executions'); - return waitMs; - } - const danglingIds = storedIds.filter((id) => !queuedIds.has(id)); - if (danglingIds.length === 0) { this.logger.debug('Completed queue recovery check, no dangling executions'); return waitMs;