diff --git a/packages/cli/src/modules/insights/__tests__/insights.module.test.ts b/packages/cli/src/modules/insights/__tests__/insights.module.test.ts deleted file mode 100644 index 391efed24f..0000000000 --- a/packages/cli/src/modules/insights/__tests__/insights.module.test.ts +++ /dev/null @@ -1,34 +0,0 @@ -import type { Logger } from '@n8n/backend-common'; -import { InstanceSettings } from 'n8n-core'; - -import { mockInstance, mockLogger } from '@test/mocking'; - -import { InsightsModule } from '../insights.module'; -import { InsightsService } from '../insights.service'; - -describe('InsightsModule', () => { - let logger: Logger; - let insightsService: InsightsService; - let instanceSettings: InstanceSettings; - - beforeEach(() => { - logger = mockLogger(); - insightsService = mockInstance(InsightsService); - }); - - describe('backgroundProcess', () => { - it('should start background process if instance is main and leader', () => { - instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true }); - const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); - insightsModule.initialize(); - expect(insightsService.startTimers).toHaveBeenCalled(); - }); - - it('should not start background process if instance is main but not leader', () => { - instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false }); - const insightsModule = new InsightsModule(logger, insightsService, instanceSettings); - insightsModule.initialize(); - expect(insightsService.startTimers).not.toHaveBeenCalled(); - }); - }); -}); diff --git a/packages/cli/src/modules/insights/__tests__/insights.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights.service.test.ts index c198ad71c7..5fdaf33fb6 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.service.test.ts @@ -5,8 +5,10 @@ import type { WorkflowEntity } from '@n8n/db'; import type { IWorkflowDb } from '@n8n/db'; import type { WorkflowExecuteAfterContext } from '@n8n/decorators'; import { Container } from '@n8n/di'; +import type { MockProxy } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; +import type { InstanceSettings } from 'n8n-core'; import type { IRun } from 'n8n-workflow'; import { mockLogger } from '@test/mocking'; @@ -47,6 +49,84 @@ afterAll(async () => { await testDb.terminate(); }); +describe('startTimers', () => { + let insightsService: InsightsService; + let compactionService: InsightsCompactionService; + let collectionService: InsightsCollectionService; + let pruningService: InsightsPruningService; + let instanceSettings: MockProxy; + + beforeEach(() => { + compactionService = mock(); + collectionService = mock(); + pruningService = mock(); + instanceSettings = mock({ + instanceType: 'main', + }); + insightsService = new InsightsService( + mock(), + compactionService, + collectionService, + pruningService, + mock(), + instanceSettings, + mockLogger(), + ); + + jest.clearAllMocks(); + }); + + const setupMocks = ( + instanceType: string, + isLeader: boolean = false, + isPruningEnabled: boolean = false, + ) => { + (instanceSettings as any).instanceType = instanceType; + Object.defineProperty(instanceSettings, 'isLeader', { + get: jest.fn(() => isLeader), + }); + Object.defineProperty(pruningService, 'isPruningEnabled', { + get: jest.fn(() => isPruningEnabled), + }); + }; + + test('starts flushing timer for main instance', () => { + setupMocks('main', false, false); + insightsService.startTimers(); + + expect(collectionService.startFlushingTimer).toHaveBeenCalled(); + expect(compactionService.startCompactionTimer).not.toHaveBeenCalled(); + expect(pruningService.startPruningTimer).not.toHaveBeenCalled(); + }); + + test('starts compaction and flushing timers for main leader instances', () => { + setupMocks('main', true, false); + insightsService.startTimers(); + + expect(collectionService.startFlushingTimer).toHaveBeenCalled(); + expect(compactionService.startCompactionTimer).toHaveBeenCalled(); + expect(pruningService.startPruningTimer).not.toHaveBeenCalled(); + }); + + test('starts compaction, flushing and pruning timers for main leader instance with pruning enabled', () => { + setupMocks('main', true, true); + insightsService.startTimers(); + + expect(collectionService.startFlushingTimer).toHaveBeenCalled(); + expect(compactionService.startCompactionTimer).toHaveBeenCalled(); + expect(pruningService.startPruningTimer).toHaveBeenCalled(); + }); + + test('starts only collection flushing timer for webhook instance', () => { + setupMocks('webhook', false, false); + insightsService.startTimers(); + + expect(collectionService.startFlushingTimer).toHaveBeenCalled(); + expect(compactionService.startCompactionTimer).not.toHaveBeenCalled(); + expect(pruningService.startPruningTimer).not.toHaveBeenCalled(); + }); +}); + describe('getInsightsSummary', () => { let insightsService: InsightsService; beforeAll(async () => { @@ -512,6 +592,7 @@ describe('getAvailableDateRanges', () => { mock(), mock(), licenseMock, + mock(), mockLogger(), ); }); @@ -614,6 +695,7 @@ describe('getMaxAgeInDaysAndGranularity', () => { mock(), mock(), licenseMock, + mock(), mockLogger(), ); }); @@ -702,6 +784,7 @@ describe('shutdown', () => { mockCollectionService, mockPruningService, mock(), + mock(), mockLogger(), ); }); @@ -717,74 +800,6 @@ describe('shutdown', () => { }); }); -describe('timers', () => { - let insightsService: InsightsService; - - const mockCollectionService = mock({ - startFlushingTimer: jest.fn(), - stopFlushingTimer: jest.fn(), - }); - - const mockCompactionService = mock({ - startCompactionTimer: jest.fn(), - stopCompactionTimer: jest.fn(), - }); - - const mockPruningService = mock({ - startPruningTimer: jest.fn(), - stopPruningTimer: jest.fn(), - isPruningEnabled: false, - }); - - const mockedLogger = mockLogger(); - const mockedConfig = mock({ - maxAgeDays: -1, - }); - - beforeAll(() => { - insightsService = new InsightsService( - mock(), - mockCompactionService, - mockCollectionService, - mockPruningService, - mock(), - mockedLogger, - ); - }); - - test('startTimers starts timers except pruning', () => { - // ACT - insightsService.startTimers(); - - // ASSERT - expect(mockCompactionService.startCompactionTimer).toHaveBeenCalled(); - expect(mockCollectionService.startFlushingTimer).toHaveBeenCalled(); - expect(mockPruningService.startPruningTimer).not.toHaveBeenCalled(); - }); - - test('startTimers starts pruning timer', () => { - // ARRANGE - mockedConfig.maxAgeDays = 30; - Object.defineProperty(mockPruningService, 'isPruningEnabled', { value: true }); - - // ACT - insightsService.startTimers(); - - // ASSERT - expect(mockPruningService.startPruningTimer).toHaveBeenCalled(); - }); - - test('stopTimers stops timers', () => { - // ACT - insightsService.stopTimers(); - - // ASSERT - expect(mockCompactionService.stopCompactionTimer).toHaveBeenCalled(); - expect(mockCollectionService.stopFlushingTimer).toHaveBeenCalled(); - expect(mockPruningService.stopPruningTimer).toHaveBeenCalled(); - }); -}); - describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => { let initialFlushBatchSize: number; let insightsConfig: InsightsConfig; diff --git a/packages/cli/src/modules/insights/insights-collection.service.ts b/packages/cli/src/modules/insights/insights-collection.service.ts index 291b8aeeff..bdeceadc02 100644 --- a/packages/cli/src/modules/insights/insights-collection.service.ts +++ b/packages/cli/src/modules/insights/insights-collection.service.ts @@ -76,7 +76,6 @@ export class InsightsCollectionService { startFlushingTimer() { this.isAsynchronouslySavingInsights = true; this.scheduleFlushing(); - this.logger.debug('Started flushing timer'); } scheduleFlushing() { diff --git a/packages/cli/src/modules/insights/insights.module.ts b/packages/cli/src/modules/insights/insights.module.ts index eb4558caa7..2b25715f57 100644 --- a/packages/cli/src/modules/insights/insights.module.ts +++ b/packages/cli/src/modules/insights/insights.module.ts @@ -1,7 +1,5 @@ -import { Logger } from '@n8n/backend-common'; import type { BaseN8nModule } from '@n8n/decorators'; -import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators'; -import { InstanceSettings } from 'n8n-core'; +import { N8nModule } from '@n8n/decorators'; import { InsightsService } from './insights.service'; @@ -9,29 +7,9 @@ import './insights.controller'; @N8nModule() export class InsightsModule implements BaseN8nModule { - constructor( - private readonly logger: Logger, - private readonly insightsService: InsightsService, - private readonly instanceSettings: InstanceSettings, - ) { - this.logger = this.logger.scoped('insights'); - } + constructor(private readonly insightsService: InsightsService) {} initialize() { - // We want to initialize the insights background process (schedulers) for the main leader instance - // to have only one main instance saving the insights data - if (this.instanceSettings.isLeader) { - this.insightsService.startTimers(); - } - } - - @OnLeaderTakeover() - startBackgroundProcess() { this.insightsService.startTimers(); } - - @OnLeaderStepdown() - stopBackgroundProcess() { - this.insightsService.stopTimers(); - } } diff --git a/packages/cli/src/modules/insights/insights.service.ts b/packages/cli/src/modules/insights/insights.service.ts index a883cc24a6..65eb3e38d2 100644 --- a/packages/cli/src/modules/insights/insights.service.ts +++ b/packages/cli/src/modules/insights/insights.service.ts @@ -4,8 +4,9 @@ import { INSIGHTS_DATE_RANGE_KEYS, } from '@n8n/api-types'; import { LicenseState, Logger } from '@n8n/backend-common'; -import { OnShutdown } from '@n8n/decorators'; +import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators'; import { Service } from '@n8n/di'; +import { InstanceSettings } from 'n8n-core'; import { UserError } from 'n8n-workflow'; import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared'; @@ -33,31 +34,44 @@ export class InsightsService { private readonly collectionService: InsightsCollectionService, private readonly pruningService: InsightsPruningService, private readonly licenseState: LicenseState, + private readonly instanceSettings: InstanceSettings, private readonly logger: Logger, ) { this.logger = this.logger.scoped('insights'); } startTimers() { - this.compactionService.startCompactionTimer(); this.collectionService.startFlushingTimer(); - if (this.pruningService.isPruningEnabled) { - this.pruningService.startPruningTimer(); + this.logger.debug('Started flushing timer'); + + // Start compaction and pruning timers for main leader instance only + if (this.instanceSettings.isLeader) { + this.startCompactionAndPruningTimers(); } - this.logger.debug('Started compaction, flushing and pruning schedulers'); } - stopTimers() { + @OnLeaderTakeover() + startCompactionAndPruningTimers() { + this.compactionService.startCompactionTimer(); + this.logger.debug('Started compaction timer'); + if (this.pruningService.isPruningEnabled) { + this.pruningService.startPruningTimer(); + this.logger.debug('Started pruning timer'); + } + } + + @OnLeaderStepdown() + stopCompactionAndPruningTimers() { this.compactionService.stopCompactionTimer(); - this.collectionService.stopFlushingTimer(); + this.logger.debug('Stopped compaction timer'); this.pruningService.stopPruningTimer(); - this.logger.debug('Stopped compaction, flushing and pruning schedulers'); + this.logger.debug('Stopped pruning timer'); } @OnShutdown() async shutdown() { await this.collectionService.shutdown(); - this.stopTimers(); + this.stopCompactionAndPruningTimers(); } async getInsightsSummary({