diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 04512e8be9..be26616fb6 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -127,6 +127,9 @@ export const TIME = { * Eventually this will superseed `TIME` above */ export const Time = { + milliseconds: { + toMinutes: 1 / (60 * 1000), + }, seconds: { toMilliseconds: 1000, }, diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 225badbf18..19da88e412 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -20,7 +20,7 @@ export class OrchestrationService { private subscriber: Subscriber; - protected isInitialized = false; + isInitialized = false; private isMultiMainSetupLicensed = false; diff --git a/packages/cli/src/services/pruning/__tests__/pruning.service.test.ts b/packages/cli/src/services/pruning/__tests__/pruning.service.test.ts index 5ea26ad540..64cecd1c06 100644 --- a/packages/cli/src/services/pruning/__tests__/pruning.service.test.ts +++ b/packages/cli/src/services/pruning/__tests__/pruning.service.test.ts @@ -1,4 +1,4 @@ -import type { GlobalConfig } from '@n8n/config'; +import type { PruningConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import type { InstanceSettings } from 'n8n-core'; @@ -8,9 +8,13 @@ import { mockLogger } from '@test/mocking'; import { PruningService } from '../pruning.service'; +jest.mock('@/db', () => ({ + connectionState: { migrated: true }, +})); + describe('PruningService', () => { describe('init', () => { - it('should start pruning if leader', () => { + it('should start pruning on main instance that is the leader', () => { const pruningService = new PruningService( mockLogger(), mock({ isLeader: true }), @@ -29,7 +33,7 @@ describe('PruningService', () => { expect(startPruningSpy).toHaveBeenCalled(); }); - it('should not start pruning if follower', () => { + it('should not start pruning on main instance that is a follower', () => { const pruningService = new PruningService( mockLogger(), mock({ isLeader: false }), @@ -48,7 +52,7 @@ describe('PruningService', () => { expect(startPruningSpy).not.toHaveBeenCalled(); }); - it('should register leadership events if multi-main setup is enabled', () => { + it('should register leadership events if main on multi-main setup', () => { const pruningService = new PruningService( mockLogger(), mock({ isLeader: true }), @@ -88,13 +92,10 @@ describe('PruningService', () => { isMultiMainSetupEnabled: true, multiMainSetup: mock(), }), - mock({ pruning: { isEnabled: true } }), + mock({ isEnabled: true }), ); - // @ts-expect-error Private method - const isEnabled = pruningService.isEnabled(); - - expect(isEnabled).toBe(true); + expect(pruningService.isEnabled).toBe(true); }); it('should return `false` based on config if leader main', () => { @@ -107,16 +108,13 @@ describe('PruningService', () => { isMultiMainSetupEnabled: true, multiMainSetup: mock(), }), - mock({ pruning: { isEnabled: false } }), + mock({ isEnabled: false }), ); - // @ts-expect-error Private method - const isEnabled = pruningService.isEnabled(); - - expect(isEnabled).toBe(false); + expect(pruningService.isEnabled).toBe(false); }); - it('should return `false` if non-main even if enabled', () => { + it('should return `false` if non-main even if config is enabled', () => { const pruningService = new PruningService( mockLogger(), mock({ isLeader: false, instanceType: 'worker' }), @@ -126,16 +124,13 @@ describe('PruningService', () => { isMultiMainSetupEnabled: true, multiMainSetup: mock(), }), - mock({ pruning: { isEnabled: true } }), + mock({ isEnabled: true }), ); - // @ts-expect-error Private method - const isEnabled = pruningService.isEnabled(); - - expect(isEnabled).toBe(false); + expect(pruningService.isEnabled).toBe(false); }); - it('should return `false` if follower main even if enabled', () => { + it('should return `false` if follower main even if config is enabled', () => { const pruningService = new PruningService( mockLogger(), mock({ isLeader: false, isFollower: true, instanceType: 'main' }), @@ -145,13 +140,10 @@ describe('PruningService', () => { isMultiMainSetupEnabled: true, multiMainSetup: mock(), }), - mock({ pruning: { isEnabled: true }, multiMainSetup: { enabled: true } }), + mock({ isEnabled: true }), ); - // @ts-expect-error Private method - const isEnabled = pruningService.isEnabled(); - - expect(isEnabled).toBe(false); + expect(pruningService.isEnabled).toBe(false); }); }); @@ -166,22 +158,25 @@ describe('PruningService', () => { isMultiMainSetupEnabled: true, multiMainSetup: mock(), }), - mock({ pruning: { isEnabled: false } }), + mock({ isEnabled: false }), + ); + + const scheduleRollingSoftDeletionsSpy = jest.spyOn( + pruningService, + // @ts-expect-error Private method + 'scheduleRollingSoftDeletions', ); // @ts-expect-error Private method - const setSoftDeletionInterval = jest.spyOn(pruningService, 'setSoftDeletionInterval'); - - // @ts-expect-error Private method - const scheduleHardDeletion = jest.spyOn(pruningService, 'scheduleHardDeletion'); + const scheduleNextHardDeletionSpy = jest.spyOn(pruningService, 'scheduleNextHardDeletion'); pruningService.startPruning(); - expect(setSoftDeletionInterval).not.toHaveBeenCalled(); - expect(scheduleHardDeletion).not.toHaveBeenCalled(); + expect(scheduleRollingSoftDeletionsSpy).not.toHaveBeenCalled(); + expect(scheduleNextHardDeletionSpy).not.toHaveBeenCalled(); }); - it('should start pruning if service is enabled', () => { + it('should start pruning if service is enabled and DB is migrated', () => { const pruningService = new PruningService( mockLogger(), mock({ isLeader: true, instanceType: 'main' }), @@ -191,23 +186,23 @@ describe('PruningService', () => { isMultiMainSetupEnabled: true, multiMainSetup: mock(), }), - mock({ pruning: { isEnabled: true } }), + mock({ isEnabled: true }), ); - const setSoftDeletionInterval = jest + const scheduleRollingSoftDeletionsSpy = jest // @ts-expect-error Private method - .spyOn(pruningService, 'setSoftDeletionInterval') + .spyOn(pruningService, 'scheduleRollingSoftDeletions') .mockImplementation(); - const scheduleHardDeletion = jest + const scheduleNextHardDeletionSpy = jest // @ts-expect-error Private method - .spyOn(pruningService, 'scheduleHardDeletion') + .spyOn(pruningService, 'scheduleNextHardDeletion') .mockImplementation(); pruningService.startPruning(); - expect(setSoftDeletionInterval).toHaveBeenCalled(); - expect(scheduleHardDeletion).toHaveBeenCalled(); + expect(scheduleRollingSoftDeletionsSpy).toHaveBeenCalled(); + expect(scheduleNextHardDeletionSpy).toHaveBeenCalled(); }); }); }); diff --git a/packages/cli/src/services/pruning/pruning.service.ts b/packages/cli/src/services/pruning/pruning.service.ts index 7314015091..34be37cf21 100644 --- a/packages/cli/src/services/pruning/pruning.service.ts +++ b/packages/cli/src/services/pruning/pruning.service.ts @@ -1,27 +1,37 @@ -import { GlobalConfig } from '@n8n/config'; +import { PruningConfig } from '@n8n/config'; import { BinaryDataService, InstanceSettings } from 'n8n-core'; -import { jsonStringify } from 'n8n-workflow'; +import { ensureError } from 'n8n-workflow'; +import { strict } from 'node:assert'; import { Service } from 'typedi'; -import { TIME } from '@/constants'; +import { Time } from '@/constants'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { connectionState as dbConnectionState } from '@/db'; import { OnShutdown } from '@/decorators/on-shutdown'; import { Logger } from '@/logging/logger.service'; import { OrchestrationService } from '../orchestration.service'; +/** + * Responsible for pruning executions from the database and their associated binary data + * from the filesystem, on a rolling basis. By default we soft-delete execution rows + * every cycle and hard-delete them and their binary data every 4th cycle. + */ @Service() export class PruningService { - private hardDeletionBatchSize = 100; + /** Timer for soft-deleting executions on a rolling basis. */ + private softDeletionInterval: NodeJS.Timer | undefined; - private rates: Record = { - softDeletion: this.globalConfig.pruning.softDeleteInterval * TIME.MINUTE, - hardDeletion: this.globalConfig.pruning.hardDeleteInterval * TIME.MINUTE, + /** Timeout for next hard-deletion of soft-deleted executions. */ + private hardDeletionTimeout: NodeJS.Timeout | undefined; + + private readonly rates = { + softDeletion: this.pruningConfig.softDeleteInterval * Time.minutes.toMilliseconds, + hardDeletion: this.pruningConfig.hardDeleteInterval * Time.minutes.toMilliseconds, }; - public softDeletionInterval: NodeJS.Timer | undefined; - - public hardDeletionTimeout: NodeJS.Timeout | undefined; + /** Max number of executions to hard-delete in a cycle. */ + private readonly batchSize = 100; private isShuttingDown = false; @@ -31,103 +41,68 @@ export class PruningService { private readonly executionRepository: ExecutionRepository, private readonly binaryDataService: BinaryDataService, private readonly orchestrationService: OrchestrationService, - private readonly globalConfig: GlobalConfig, + private readonly pruningConfig: PruningConfig, ) { this.logger = this.logger.scoped('pruning'); } - /** - * @important Requires `OrchestrationService` to be initialized. - */ init() { - const { isLeader } = this.instanceSettings; - const { isMultiMainSetupEnabled } = this.orchestrationService; + strict(this.instanceSettings.instanceRole !== 'unset', 'Instance role is not set'); - if (isLeader) this.startPruning(); + if (this.instanceSettings.isLeader) this.startPruning(); - if (isMultiMainSetupEnabled) { + if (this.orchestrationService.isMultiMainSetupEnabled) { this.orchestrationService.multiMainSetup.on('leader-takeover', () => this.startPruning()); this.orchestrationService.multiMainSetup.on('leader-stepdown', () => this.stopPruning()); } } - private isEnabled() { - const { instanceType, isFollower } = this.instanceSettings; - if (!this.globalConfig.pruning.isEnabled || instanceType !== 'main') { - return false; - } - - if (this.globalConfig.multiMainSetup.enabled && instanceType === 'main' && isFollower) { - return false; - } - - return true; + get isEnabled() { + return ( + this.pruningConfig.isEnabled && + this.instanceSettings.instanceType === 'main' && + this.instanceSettings.isLeader + ); } - /** - * @important Call this method only after DB migrations have completed. - */ startPruning() { - if (!this.isEnabled()) return; + if (!this.isEnabled || !dbConnectionState.migrated || this.isShuttingDown) return; - if (this.isShuttingDown) { - this.logger.warn('Cannot start pruning while shutting down'); - return; - } - - this.logger.debug('Starting soft-deletion and hard-deletion timers'); - - this.setSoftDeletionInterval(); - this.scheduleHardDeletion(); + this.scheduleRollingSoftDeletions(); + this.scheduleNextHardDeletion(); } stopPruning() { - if (!this.isEnabled()) return; - - this.logger.debug('Removing soft-deletion and hard-deletion timers'); + if (!this.isEnabled) return; clearInterval(this.softDeletionInterval); clearTimeout(this.hardDeletionTimeout); } - private setSoftDeletionInterval(rateMs = this.rates.softDeletion) { - const when = [rateMs / TIME.MINUTE, 'min'].join(' '); - + private scheduleRollingSoftDeletions(rateMs = this.rates.softDeletion) { this.softDeletionInterval = setInterval( - async () => await this.softDeleteOnPruningCycle(), + async () => await this.softDelete(), this.rates.softDeletion, ); - this.logger.debug(`Soft-deletion scheduled every ${when}`); + this.logger.debug(`Soft-deletion every ${rateMs * Time.milliseconds.toMinutes} minutes`); } - private scheduleHardDeletion(rateMs = this.rates.hardDeletion) { - const when = [rateMs / TIME.MINUTE, 'min'].join(' '); - + private scheduleNextHardDeletion(rateMs = this.rates.hardDeletion) { this.hardDeletionTimeout = setTimeout(() => { - this.hardDeleteOnPruningCycle() - .then((rate) => this.scheduleHardDeletion(rate)) + this.hardDelete() + .then((rate) => this.scheduleNextHardDeletion(rate)) .catch((error) => { - this.scheduleHardDeletion(1 * TIME.SECOND); - - const errorMessage = - error instanceof Error - ? error.message - : jsonStringify(error, { replaceCircularRefs: true }); - - this.logger.error('Failed to hard-delete executions', { errorMessage }); + this.scheduleNextHardDeletion(1_000); + this.logger.error('Failed to hard-delete executions', { error: ensureError(error) }); }); }, rateMs); - this.logger.debug(`Hard-deletion scheduled for next ${when}`); + this.logger.debug(`Hard-deletion in next ${rateMs * Time.milliseconds.toMinutes} minutes`); } - /** - * Mark executions as deleted based on age and count, in a pruning cycle. - */ - async softDeleteOnPruningCycle() { - this.logger.debug('Starting soft-deletion of executions'); - + /** Soft-delete executions based on max age and/or max count. */ + async softDelete() { const result = await this.executionRepository.softDeletePrunableExecutions(); if (result.affected === 0) { @@ -145,10 +120,11 @@ export class PruningService { } /** - * Permanently remove all soft-deleted executions and their binary data, in a pruning cycle. - * @return Delay in ms after which the next cycle should be started + * Delete all soft-deleted executions and their binary data. + * + * @returns Delay in milliseconds until next hard-deletion */ - private async hardDeleteOnPruningCycle() { + private async hardDelete(): Promise { const ids = await this.executionRepository.findSoftDeletedExecutions(); const executionIds = ids.map((o) => o.executionId); @@ -160,8 +136,6 @@ export class PruningService { } try { - this.logger.debug('Starting hard-deletion of executions', { executionIds }); - await this.binaryDataService.deleteMany(ids); await this.executionRepository.deleteByIds(executionIds); @@ -170,16 +144,13 @@ export class PruningService { } catch (error) { this.logger.error('Failed to hard-delete executions', { executionIds, - error: error instanceof Error ? error.message : `${error}`, + error: ensureError(error), }); } - /** - * For next batch, speed up hard-deletion cycle in high-volume case - * to prevent high concurrency from causing duplicate deletions. - */ - const isHighVolume = executionIds.length >= this.hardDeletionBatchSize; + // if high volume, speed up next hard-deletion + if (executionIds.length >= this.batchSize) return 1 * Time.seconds.toMilliseconds; - return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion; + return this.rates.hardDeletion; } } diff --git a/packages/cli/test/integration/pruning.service.test.ts b/packages/cli/test/integration/pruning.service.test.ts index 76ee107903..4ea8455b94 100644 --- a/packages/cli/test/integration/pruning.service.test.ts +++ b/packages/cli/test/integration/pruning.service.test.ts @@ -1,4 +1,4 @@ -import { GlobalConfig } from '@n8n/config'; +import { PruningConfig } from '@n8n/config'; import { mock } from 'jest-mock-extended'; import { BinaryDataService, InstanceSettings } from 'n8n-core'; import type { ExecutionStatus } from 'n8n-workflow'; @@ -27,19 +27,19 @@ describe('softDeleteOnPruningCycle()', () => { const now = new Date(); const yesterday = new Date(Date.now() - TIME.DAY); let workflow: WorkflowEntity; - let globalConfig: GlobalConfig; + let pruningConfig: PruningConfig; beforeAll(async () => { await testDb.init(); - globalConfig = Container.get(GlobalConfig); + pruningConfig = Container.get(PruningConfig); pruningService = new PruningService( mockLogger(), instanceSettings, Container.get(ExecutionRepository), mockInstance(BinaryDataService), mock(), - globalConfig, + pruningConfig, ); workflow = await createWorkflow(); @@ -62,8 +62,8 @@ describe('softDeleteOnPruningCycle()', () => { describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => { beforeAll(() => { - globalConfig.pruning.maxAge = 336; - globalConfig.pruning.maxCount = 1; + pruningConfig.maxAge = 336; + pruningConfig.maxCount = 1; }); test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => { @@ -73,7 +73,7 @@ describe('softDeleteOnPruningCycle()', () => { await createSuccessfulExecution(workflow), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -92,7 +92,7 @@ describe('softDeleteOnPruningCycle()', () => { await createSuccessfulExecution(workflow), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -113,7 +113,7 @@ describe('softDeleteOnPruningCycle()', () => { await createSuccessfulExecution(workflow), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -132,7 +132,7 @@ describe('softDeleteOnPruningCycle()', () => { await createSuccessfulExecution(workflow), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -150,7 +150,7 @@ describe('softDeleteOnPruningCycle()', () => { await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]); - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -163,8 +163,8 @@ describe('softDeleteOnPruningCycle()', () => { describe('when EXECUTIONS_DATA_MAX_AGE is set', () => { beforeAll(() => { - globalConfig.pruning.maxAge = 1; - globalConfig.pruning.maxCount = 0; + pruningConfig.maxAge = 1; + pruningConfig.maxCount = 0; }); test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => { @@ -179,7 +179,7 @@ describe('softDeleteOnPruningCycle()', () => { ), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -203,7 +203,7 @@ describe('softDeleteOnPruningCycle()', () => { await createSuccessfulExecution(workflow), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -221,7 +221,7 @@ describe('softDeleteOnPruningCycle()', () => { ])('should prune %s executions', async (status, attributes) => { const execution = await createExecution({ status, ...attributes }, workflow); - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -239,7 +239,7 @@ describe('softDeleteOnPruningCycle()', () => { await createSuccessfulExecution(workflow), ]; - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([ @@ -266,7 +266,7 @@ describe('softDeleteOnPruningCycle()', () => { await annotateExecution(executions[0].id, { vote: 'up' }, [workflow.id]); - await pruningService.softDeleteOnPruningCycle(); + await pruningService.softDelete(); const result = await findAllExecutions(); expect(result).toEqual([