From f7352b6a8f5000844b3e4e406f9746d1999b9271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 17 Jun 2024 12:49:40 +0200 Subject: [PATCH] refactor(core): Make `PruningService.init` and `WaitTracker.init` consistent (no-changelog) (#9761) --- packages/cli/src/WaitTracker.ts | 18 ++++++------- packages/cli/src/commands/start.ts | 25 ++++--------------- packages/cli/src/services/pruning.service.ts | 16 ++++++++++++ .../test/integration/pruning.service.test.ts | 3 +++ packages/cli/test/unit/WaitTracker.test.ts | 11 +++++++- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index ac24246c50..201c22b1f3 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -30,19 +30,19 @@ export class WaitTracker { private readonly orchestrationService: OrchestrationService, ) {} + /** + * @important Requires `OrchestrationService` to be initialized. + */ init() { - const { isSingleMainSetup, isLeader, multiMainSetup } = this.orchestrationService; - - if (isSingleMainSetup) { - this.startTracking(); - return; - } + const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService; if (isLeader) this.startTracking(); - multiMainSetup - .on('leader-takeover', () => this.startTracking()) - .on('leader-stepdown', () => this.stopTracking()); + if (isMultiMainSetupEnabled) { + this.orchestrationService.multiMainSetup + .on('leader-takeover', () => this.startTracking()) + .on('leader-stepdown', () => this.stopTracking()); + } } private startTracking() { diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index a78ea57cf9..1fc3690d47 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -199,7 +199,10 @@ export class Start extends BaseCommand { } async initOrchestration() { - if (config.getEnv('executions.mode') !== 'queue') return; + if (config.getEnv('executions.mode') === 'regular') { + config.set('multiMainSetup.instanceType', 'leader'); + return; + } if ( config.getEnv('multiMainSetup.enabled') && @@ -290,7 +293,7 @@ export class Start extends BaseCommand { await this.server.start(); - await this.initPruning(); + Container.get(PruningService).init(); if (config.getEnv('executions.mode') === 'regular') { await this.runEnqueuedExecutions(); @@ -333,24 +336,6 @@ export class Start extends BaseCommand { } } - async initPruning() { - this.pruningService = Container.get(PruningService); - - this.pruningService.startPruning(); - - if (config.getEnv('executions.mode') !== 'queue') return; - - const orchestrationService = Container.get(OrchestrationService); - - await orchestrationService.init(); - - if (!orchestrationService.isMultiMainSetupEnabled) return; - - orchestrationService.multiMainSetup - .on('leader-stepdown', () => this.pruningService.stopPruning()) - .on('leader-takeover', () => this.pruningService.startPruning()); - } - async catch(error: Error) { if (error.stack) this.logger.error(error.stack); await this.exitWithCrash('Exiting due to an error.', error); diff --git a/packages/cli/src/services/pruning.service.ts b/packages/cli/src/services/pruning.service.ts index 3f78c220cb..daa0883e1f 100644 --- a/packages/cli/src/services/pruning.service.ts +++ b/packages/cli/src/services/pruning.service.ts @@ -6,6 +6,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; import { jsonStringify } from 'n8n-workflow'; import { OnShutdown } from '@/decorators/OnShutdown'; +import { OrchestrationService } from './orchestration.service'; @Service() export class PruningService { @@ -26,8 +27,23 @@ export class PruningService { private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly binaryDataService: BinaryDataService, + private readonly orchestrationService: OrchestrationService, ) {} + /** + * @important Requires `OrchestrationService` to be initialized. + */ + init() { + const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService; + + if (isLeader) this.startPruning(); + + if (isMultiMainSetupEnabled) { + this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning()); + this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning()); + } + } + private isPruningEnabled() { if ( !config.getEnv('executions.pruneData') || diff --git a/packages/cli/test/integration/pruning.service.test.ts b/packages/cli/test/integration/pruning.service.test.ts index b8eb07dba1..a600b4aabd 100644 --- a/packages/cli/test/integration/pruning.service.test.ts +++ b/packages/cli/test/integration/pruning.service.test.ts @@ -14,6 +14,8 @@ import { Logger } from '@/Logger'; import { mockInstance } from '../shared/mocking'; import { createWorkflow } from './shared/db/workflows'; import { createExecution, createSuccessfulExecution } from './shared/db/executions'; +import { mock } from 'jest-mock-extended'; +import type { OrchestrationService } from '@/services/orchestration.service'; describe('softDeleteOnPruningCycle()', () => { let pruningService: PruningService; @@ -29,6 +31,7 @@ describe('softDeleteOnPruningCycle()', () => { mockInstance(Logger), Container.get(ExecutionRepository), mockInstance(BinaryDataService), + mock(), ); workflow = await createWorkflow(); diff --git a/packages/cli/test/unit/WaitTracker.test.ts b/packages/cli/test/unit/WaitTracker.test.ts index 8b56de3cee..0f8464e18d 100644 --- a/packages/cli/test/unit/WaitTracker.test.ts +++ b/packages/cli/test/unit/WaitTracker.test.ts @@ -34,7 +34,8 @@ describe('WaitTracker', () => { }); describe('init()', () => { - it('should query DB for waiting executions', async () => { + it('should query DB for waiting executions if leader', async () => { + jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); waitTracker.init(); @@ -42,6 +43,14 @@ describe('WaitTracker', () => { expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); }); + it('if follower, should do nothing', () => { + executionRepository.getWaitingExecutions.mockResolvedValue([]); + + waitTracker.init(); + + expect(executionRepository.findSingleExecution).not.toHaveBeenCalled(); + }); + it('if no executions to start, should do nothing', () => { executionRepository.getWaitingExecutions.mockResolvedValue([]);