fix(core): Start insights collection timer for webhook instances (#15964)

This commit is contained in:
Guillaume Jacquart
2025-06-06 09:45:27 +02:00
committed by GitHub
parent 8c63ca7d57
commit 7a67dcb686
5 changed files with 108 additions and 136 deletions

View File

@@ -1,34 +0,0 @@
import type { Logger } from '@n8n/backend-common';
import { InstanceSettings } from 'n8n-core';
import { mockInstance, mockLogger } from '@test/mocking';
import { InsightsModule } from '../insights.module';
import { InsightsService } from '../insights.service';
describe('InsightsModule', () => {
let logger: Logger;
let insightsService: InsightsService;
let instanceSettings: InstanceSettings;
beforeEach(() => {
logger = mockLogger();
insightsService = mockInstance(InsightsService);
});
describe('backgroundProcess', () => {
it('should start background process if instance is main and leader', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true });
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.startTimers).toHaveBeenCalled();
});
it('should not start background process if instance is main but not leader', () => {
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false });
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
insightsModule.initialize();
expect(insightsService.startTimers).not.toHaveBeenCalled();
});
});
});

View File

@@ -5,8 +5,10 @@ import type { WorkflowEntity } from '@n8n/db';
import type { IWorkflowDb } from '@n8n/db'; import type { IWorkflowDb } from '@n8n/db';
import type { WorkflowExecuteAfterContext } from '@n8n/decorators'; import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
import { Container } from '@n8n/di'; import { Container } from '@n8n/di';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon'; import { DateTime } from 'luxon';
import type { InstanceSettings } from 'n8n-core';
import type { IRun } from 'n8n-workflow'; import type { IRun } from 'n8n-workflow';
import { mockLogger } from '@test/mocking'; import { mockLogger } from '@test/mocking';
@@ -47,6 +49,84 @@ afterAll(async () => {
await testDb.terminate(); await testDb.terminate();
}); });
describe('startTimers', () => {
let insightsService: InsightsService;
let compactionService: InsightsCompactionService;
let collectionService: InsightsCollectionService;
let pruningService: InsightsPruningService;
let instanceSettings: MockProxy<InstanceSettings>;
beforeEach(() => {
compactionService = mock<InsightsCompactionService>();
collectionService = mock<InsightsCollectionService>();
pruningService = mock<InsightsPruningService>();
instanceSettings = mock<InstanceSettings>({
instanceType: 'main',
});
insightsService = new InsightsService(
mock<InsightsByPeriodRepository>(),
compactionService,
collectionService,
pruningService,
mock<LicenseState>(),
instanceSettings,
mockLogger(),
);
jest.clearAllMocks();
});
const setupMocks = (
instanceType: string,
isLeader: boolean = false,
isPruningEnabled: boolean = false,
) => {
(instanceSettings as any).instanceType = instanceType;
Object.defineProperty(instanceSettings, 'isLeader', {
get: jest.fn(() => isLeader),
});
Object.defineProperty(pruningService, 'isPruningEnabled', {
get: jest.fn(() => isPruningEnabled),
});
};
test('starts flushing timer for main instance', () => {
setupMocks('main', false, false);
insightsService.startTimers();
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
expect(compactionService.startCompactionTimer).not.toHaveBeenCalled();
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
});
test('starts compaction and flushing timers for main leader instances', () => {
setupMocks('main', true, false);
insightsService.startTimers();
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
expect(compactionService.startCompactionTimer).toHaveBeenCalled();
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
});
test('starts compaction, flushing and pruning timers for main leader instance with pruning enabled', () => {
setupMocks('main', true, true);
insightsService.startTimers();
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
expect(compactionService.startCompactionTimer).toHaveBeenCalled();
expect(pruningService.startPruningTimer).toHaveBeenCalled();
});
test('starts only collection flushing timer for webhook instance', () => {
setupMocks('webhook', false, false);
insightsService.startTimers();
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
expect(compactionService.startCompactionTimer).not.toHaveBeenCalled();
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
});
});
describe('getInsightsSummary', () => { describe('getInsightsSummary', () => {
let insightsService: InsightsService; let insightsService: InsightsService;
beforeAll(async () => { beforeAll(async () => {
@@ -512,6 +592,7 @@ describe('getAvailableDateRanges', () => {
mock<InsightsCollectionService>(), mock<InsightsCollectionService>(),
mock<InsightsPruningService>(), mock<InsightsPruningService>(),
licenseMock, licenseMock,
mock<InstanceSettings>(),
mockLogger(), mockLogger(),
); );
}); });
@@ -614,6 +695,7 @@ describe('getMaxAgeInDaysAndGranularity', () => {
mock<InsightsCollectionService>(), mock<InsightsCollectionService>(),
mock<InsightsPruningService>(), mock<InsightsPruningService>(),
licenseMock, licenseMock,
mock<InstanceSettings>(),
mockLogger(), mockLogger(),
); );
}); });
@@ -702,6 +784,7 @@ describe('shutdown', () => {
mockCollectionService, mockCollectionService,
mockPruningService, mockPruningService,
mock<LicenseState>(), mock<LicenseState>(),
mock<InstanceSettings>(),
mockLogger(), mockLogger(),
); );
}); });
@@ -717,74 +800,6 @@ describe('shutdown', () => {
}); });
}); });
describe('timers', () => {
let insightsService: InsightsService;
const mockCollectionService = mock<InsightsCollectionService>({
startFlushingTimer: jest.fn(),
stopFlushingTimer: jest.fn(),
});
const mockCompactionService = mock<InsightsCompactionService>({
startCompactionTimer: jest.fn(),
stopCompactionTimer: jest.fn(),
});
const mockPruningService = mock<InsightsPruningService>({
startPruningTimer: jest.fn(),
stopPruningTimer: jest.fn(),
isPruningEnabled: false,
});
const mockedLogger = mockLogger();
const mockedConfig = mock<InsightsConfig>({
maxAgeDays: -1,
});
beforeAll(() => {
insightsService = new InsightsService(
mock<InsightsByPeriodRepository>(),
mockCompactionService,
mockCollectionService,
mockPruningService,
mock<LicenseState>(),
mockedLogger,
);
});
test('startTimers starts timers except pruning', () => {
// ACT
insightsService.startTimers();
// ASSERT
expect(mockCompactionService.startCompactionTimer).toHaveBeenCalled();
expect(mockCollectionService.startFlushingTimer).toHaveBeenCalled();
expect(mockPruningService.startPruningTimer).not.toHaveBeenCalled();
});
test('startTimers starts pruning timer', () => {
// ARRANGE
mockedConfig.maxAgeDays = 30;
Object.defineProperty(mockPruningService, 'isPruningEnabled', { value: true });
// ACT
insightsService.startTimers();
// ASSERT
expect(mockPruningService.startPruningTimer).toHaveBeenCalled();
});
test('stopTimers stops timers', () => {
// ACT
insightsService.stopTimers();
// ASSERT
expect(mockCompactionService.stopCompactionTimer).toHaveBeenCalled();
expect(mockCollectionService.stopFlushingTimer).toHaveBeenCalled();
expect(mockPruningService.stopPruningTimer).toHaveBeenCalled();
});
});
describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => { describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => {
let initialFlushBatchSize: number; let initialFlushBatchSize: number;
let insightsConfig: InsightsConfig; let insightsConfig: InsightsConfig;

View File

@@ -76,7 +76,6 @@ export class InsightsCollectionService {
startFlushingTimer() { startFlushingTimer() {
this.isAsynchronouslySavingInsights = true; this.isAsynchronouslySavingInsights = true;
this.scheduleFlushing(); this.scheduleFlushing();
this.logger.debug('Started flushing timer');
} }
scheduleFlushing() { scheduleFlushing() {

View File

@@ -1,7 +1,5 @@
import { Logger } from '@n8n/backend-common';
import type { BaseN8nModule } from '@n8n/decorators'; import type { BaseN8nModule } from '@n8n/decorators';
import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators'; import { N8nModule } from '@n8n/decorators';
import { InstanceSettings } from 'n8n-core';
import { InsightsService } from './insights.service'; import { InsightsService } from './insights.service';
@@ -9,29 +7,9 @@ import './insights.controller';
@N8nModule() @N8nModule()
export class InsightsModule implements BaseN8nModule { export class InsightsModule implements BaseN8nModule {
constructor( constructor(private readonly insightsService: InsightsService) {}
private readonly logger: Logger,
private readonly insightsService: InsightsService,
private readonly instanceSettings: InstanceSettings,
) {
this.logger = this.logger.scoped('insights');
}
initialize() { initialize() {
// We want to initialize the insights background process (schedulers) for the main leader instance
// to have only one main instance saving the insights data
if (this.instanceSettings.isLeader) {
this.insightsService.startTimers(); this.insightsService.startTimers();
} }
}
@OnLeaderTakeover()
startBackgroundProcess() {
this.insightsService.startTimers();
}
@OnLeaderStepdown()
stopBackgroundProcess() {
this.insightsService.stopTimers();
}
} }

View File

@@ -4,8 +4,9 @@ import {
INSIGHTS_DATE_RANGE_KEYS, INSIGHTS_DATE_RANGE_KEYS,
} from '@n8n/api-types'; } from '@n8n/api-types';
import { LicenseState, Logger } from '@n8n/backend-common'; import { LicenseState, Logger } from '@n8n/backend-common';
import { OnShutdown } from '@n8n/decorators'; import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators';
import { Service } from '@n8n/di'; import { Service } from '@n8n/di';
import { InstanceSettings } from 'n8n-core';
import { UserError } from 'n8n-workflow'; import { UserError } from 'n8n-workflow';
import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared'; import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared';
@@ -33,31 +34,44 @@ export class InsightsService {
private readonly collectionService: InsightsCollectionService, private readonly collectionService: InsightsCollectionService,
private readonly pruningService: InsightsPruningService, private readonly pruningService: InsightsPruningService,
private readonly licenseState: LicenseState, private readonly licenseState: LicenseState,
private readonly instanceSettings: InstanceSettings,
private readonly logger: Logger, private readonly logger: Logger,
) { ) {
this.logger = this.logger.scoped('insights'); this.logger = this.logger.scoped('insights');
} }
startTimers() { startTimers() {
this.compactionService.startCompactionTimer();
this.collectionService.startFlushingTimer(); this.collectionService.startFlushingTimer();
if (this.pruningService.isPruningEnabled) { this.logger.debug('Started flushing timer');
this.pruningService.startPruningTimer();
// Start compaction and pruning timers for main leader instance only
if (this.instanceSettings.isLeader) {
this.startCompactionAndPruningTimers();
} }
this.logger.debug('Started compaction, flushing and pruning schedulers');
} }
stopTimers() { @OnLeaderTakeover()
startCompactionAndPruningTimers() {
this.compactionService.startCompactionTimer();
this.logger.debug('Started compaction timer');
if (this.pruningService.isPruningEnabled) {
this.pruningService.startPruningTimer();
this.logger.debug('Started pruning timer');
}
}
@OnLeaderStepdown()
stopCompactionAndPruningTimers() {
this.compactionService.stopCompactionTimer(); this.compactionService.stopCompactionTimer();
this.collectionService.stopFlushingTimer(); this.logger.debug('Stopped compaction timer');
this.pruningService.stopPruningTimer(); this.pruningService.stopPruningTimer();
this.logger.debug('Stopped compaction, flushing and pruning schedulers'); this.logger.debug('Stopped pruning timer');
} }
@OnShutdown() @OnShutdown()
async shutdown() { async shutdown() {
await this.collectionService.shutdown(); await this.collectionService.shutdown();
this.stopTimers(); this.stopCompactionAndPruningTimers();
} }
async getInsightsSummary({ async getInsightsSummary({