mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 01:56:46 +00:00
fix(core): Fix crashed execution recovery in scaling mode (#19412)
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import { mockLogger, mockInstance } from '@n8n/backend-test-utils';
|
import { mockLogger, mockInstance } from '@n8n/backend-test-utils';
|
||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
|
import type { ExecutionRepository } from '@n8n/db';
|
||||||
import { Container } from '@n8n/di';
|
import { Container } from '@n8n/di';
|
||||||
import * as BullModule from 'bull';
|
import * as BullModule from 'bull';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
@@ -54,6 +55,7 @@ describe('ScalingService', () => {
|
|||||||
|
|
||||||
const instanceSettings = Container.get(InstanceSettings);
|
const instanceSettings = Container.get(InstanceSettings);
|
||||||
const jobProcessor = mock<JobProcessor>();
|
const jobProcessor = mock<JobProcessor>();
|
||||||
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
|
|
||||||
let scalingService: ScalingService;
|
let scalingService: ScalingService;
|
||||||
|
|
||||||
@@ -85,7 +87,7 @@ describe('ScalingService', () => {
|
|||||||
mock(),
|
mock(),
|
||||||
jobProcessor,
|
jobProcessor,
|
||||||
globalConfig,
|
globalConfig,
|
||||||
mock(),
|
executionRepository,
|
||||||
instanceSettings,
|
instanceSettings,
|
||||||
mock(),
|
mock(),
|
||||||
);
|
);
|
||||||
@@ -359,4 +361,36 @@ describe('ScalingService', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('recoverFromQueue', () => {
|
||||||
|
it('should mark running executions as crashed if they are missing from the queue and queue is empty', async () => {
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
executionRepository.getInProgressExecutionIds.mockResolvedValue(['123']);
|
||||||
|
queue.getJobs.mockResolvedValue([]);
|
||||||
|
|
||||||
|
await scalingService.recoverFromQueue();
|
||||||
|
|
||||||
|
expect(executionRepository.markAsCrashed).toHaveBeenCalledWith(['123']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should mark running executions as crashed if they are missing from the queue and queue is not empty', async () => {
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
executionRepository.getInProgressExecutionIds.mockResolvedValue(['123']);
|
||||||
|
queue.getJobs.mockResolvedValue([mock<Job>({ data: { executionId: '321' } })]);
|
||||||
|
|
||||||
|
await scalingService.recoverFromQueue();
|
||||||
|
|
||||||
|
expect(executionRepository.markAsCrashed).toHaveBeenCalledWith(['123']);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not mark running executions as crashed if they are present in the queue', async () => {
|
||||||
|
await scalingService.setupQueue();
|
||||||
|
executionRepository.getInProgressExecutionIds.mockResolvedValue(['123']);
|
||||||
|
queue.getJobs.mockResolvedValue([mock<Job>({ data: { executionId: '123' } })]);
|
||||||
|
|
||||||
|
await scalingService.recoverFromQueue();
|
||||||
|
|
||||||
|
expect(executionRepository.markAsCrashed).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -480,7 +480,7 @@ export class ScalingService {
|
|||||||
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
|
* Mark in-progress executions as `crashed` if stored in DB as `new` or `running`
|
||||||
* but absent from the queue. Return time until next recovery cycle.
|
* but absent from the queue. Return time until next recovery cycle.
|
||||||
*/
|
*/
|
||||||
private async recoverFromQueue() {
|
async recoverFromQueue() {
|
||||||
const { waitMs, batchSize } = this.queueRecoveryContext;
|
const { waitMs, batchSize } = this.queueRecoveryContext;
|
||||||
|
|
||||||
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
|
const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize);
|
||||||
@@ -491,16 +491,9 @@ export class ScalingService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const runningJobs = await this.findJobsByStatus(['active', 'waiting']);
|
const runningJobs = await this.findJobsByStatus(['active', 'waiting']);
|
||||||
|
|
||||||
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
|
const queuedIds = new Set(runningJobs.map((job) => job.data.executionId));
|
||||||
|
|
||||||
if (queuedIds.size === 0) {
|
|
||||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
|
||||||
return waitMs;
|
|
||||||
}
|
|
||||||
|
|
||||||
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
|
const danglingIds = storedIds.filter((id) => !queuedIds.has(id));
|
||||||
|
|
||||||
if (danglingIds.length === 0) {
|
if (danglingIds.length === 0) {
|
||||||
this.logger.debug('Completed queue recovery check, no dangling executions');
|
this.logger.debug('Completed queue recovery check, no dangling executions');
|
||||||
return waitMs;
|
return waitMs;
|
||||||
|
|||||||
Reference in New Issue
Block a user