mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 10:31:15 +00:00
feat(core): Implement Insights pruning system (#14468)
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
This commit is contained in:
committed by
GitHub
parent
d14fb4dde3
commit
ae27b48ee7
@@ -27,7 +27,7 @@ import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
|
|||||||
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
|
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
|
||||||
import { Server } from '@/server';
|
import { Server } from '@/server';
|
||||||
import { OwnershipService } from '@/services/ownership.service';
|
import { OwnershipService } from '@/services/ownership.service';
|
||||||
import { PruningService } from '@/services/pruning/pruning.service';
|
import { ExecutionsPruningService } from '@/services/pruning/executions-pruning.service';
|
||||||
import { UrlService } from '@/services/url.service';
|
import { UrlService } from '@/services/url.service';
|
||||||
import { WaitTracker } from '@/wait-tracker';
|
import { WaitTracker } from '@/wait-tracker';
|
||||||
import { WorkflowRunner } from '@/workflow-runner';
|
import { WorkflowRunner } from '@/workflow-runner';
|
||||||
@@ -315,7 +315,7 @@ export class Start extends BaseCommand {
|
|||||||
|
|
||||||
await this.server.start();
|
await this.server.start();
|
||||||
|
|
||||||
Container.get(PruningService).init();
|
Container.get(ExecutionsPruningService).init();
|
||||||
|
|
||||||
if (config.getEnv('executions.mode') === 'regular') {
|
if (config.getEnv('executions.mode') === 'regular') {
|
||||||
await this.runEnqueuedExecutions();
|
await this.runEnqueuedExecutions();
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import { Container } from '@n8n/di';
|
|||||||
import { In, type EntityManager } from '@n8n/typeorm';
|
import { In, type EntityManager } from '@n8n/typeorm';
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { DateTime } from 'luxon';
|
import { DateTime } from 'luxon';
|
||||||
import type { Logger } from 'n8n-core';
|
|
||||||
import {
|
import {
|
||||||
createDeferredPromise,
|
createDeferredPromise,
|
||||||
type ExecutionStatus,
|
type ExecutionStatus,
|
||||||
@@ -402,14 +401,7 @@ describe('workflowExecuteAfterHandler - flushEvents', () => {
|
|||||||
const sharedWorkflowRepositoryMock: jest.Mocked<SharedWorkflowRepository> = {
|
const sharedWorkflowRepositoryMock: jest.Mocked<SharedWorkflowRepository> = {
|
||||||
manager: entityManagerMock,
|
manager: entityManagerMock,
|
||||||
} as unknown as jest.Mocked<SharedWorkflowRepository>;
|
} as unknown as jest.Mocked<SharedWorkflowRepository>;
|
||||||
const logger = mock<Logger>({
|
const logger = mockLogger();
|
||||||
scoped: jest.fn().mockReturnValue(
|
|
||||||
mock<Logger>({
|
|
||||||
error: jest.fn(),
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
});
|
|
||||||
|
|
||||||
const startedAt = DateTime.utc();
|
const startedAt = DateTime.utc();
|
||||||
const stoppedAt = startedAt.plus({ seconds: 5 });
|
const stoppedAt = startedAt.plus({ seconds: 5 });
|
||||||
const runData = mock<IRun>({
|
const runData = mock<IRun>({
|
||||||
|
|||||||
@@ -0,0 +1,178 @@
|
|||||||
|
import { Container } from '@n8n/di';
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import { DateTime } from 'luxon';
|
||||||
|
|
||||||
|
import { Time } from '@/constants';
|
||||||
|
import { mockLogger } from '@test/mocking';
|
||||||
|
import { createTeamProject } from '@test-integration/db/projects';
|
||||||
|
import { createWorkflow } from '@test-integration/db/workflows';
|
||||||
|
import * as testDb from '@test-integration/test-db';
|
||||||
|
|
||||||
|
import {
|
||||||
|
createCompactedInsightsEvent,
|
||||||
|
createMetadata,
|
||||||
|
} from '../database/entities/__tests__/db-utils';
|
||||||
|
import { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository';
|
||||||
|
import { InsightsPruningService } from '../insights-pruning.service';
|
||||||
|
import { InsightsConfig } from '../insights.config';
|
||||||
|
|
||||||
|
beforeAll(async () => {
|
||||||
|
await testDb.init();
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await testDb.truncate([
|
||||||
|
'InsightsRaw',
|
||||||
|
'InsightsByPeriod',
|
||||||
|
'InsightsMetadata',
|
||||||
|
'WorkflowEntity',
|
||||||
|
'Project',
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterAll(async () => {
|
||||||
|
await testDb.terminate();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('InsightsPruningService', () => {
|
||||||
|
let insightsConfig: InsightsConfig;
|
||||||
|
let insightsByPeriodRepository: InsightsByPeriodRepository;
|
||||||
|
let insightsPruningService: InsightsPruningService;
|
||||||
|
beforeAll(async () => {
|
||||||
|
insightsConfig = Container.get(InsightsConfig);
|
||||||
|
insightsConfig.maxAgeDays = 10;
|
||||||
|
insightsConfig.pruneCheckIntervalHours = 1;
|
||||||
|
insightsPruningService = Container.get(InsightsPruningService);
|
||||||
|
insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('old insights get pruned successfully', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const project = await createTeamProject();
|
||||||
|
const workflow = await createWorkflow({}, project);
|
||||||
|
|
||||||
|
await createMetadata(workflow);
|
||||||
|
|
||||||
|
const timestamp = DateTime.utc().minus({ days: insightsConfig.maxAgeDays + 1 });
|
||||||
|
await createCompactedInsightsEvent(workflow, {
|
||||||
|
type: 'success',
|
||||||
|
value: 1,
|
||||||
|
periodUnit: 'day',
|
||||||
|
periodStart: timestamp,
|
||||||
|
});
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await insightsPruningService.pruneInsights();
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
await expect(insightsByPeriodRepository.count()).resolves.toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('insights newer than maxAgeDays do not get pruned', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const project = await createTeamProject();
|
||||||
|
const workflow = await createWorkflow({}, project);
|
||||||
|
|
||||||
|
await createMetadata(workflow);
|
||||||
|
|
||||||
|
const timestamp = DateTime.utc().minus({ days: insightsConfig.maxAgeDays - 1 });
|
||||||
|
await createCompactedInsightsEvent(workflow, {
|
||||||
|
type: 'success',
|
||||||
|
value: 1,
|
||||||
|
periodUnit: 'day',
|
||||||
|
periodStart: timestamp,
|
||||||
|
});
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await insightsPruningService.pruneInsights();
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(await insightsByPeriodRepository.count()).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('pruning scheduling', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.useFakeTimers();
|
||||||
|
insightsPruningService.startPruningTimer();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
jest.useRealTimers();
|
||||||
|
insightsPruningService.stopPruningTimer();
|
||||||
|
jest.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('pruning timeout is scheduled on start and rescheduled after each run', async () => {
|
||||||
|
const insightsByPeriodRepository = mock<InsightsByPeriodRepository>({
|
||||||
|
pruneOldData: async () => {
|
||||||
|
return { affected: 0 };
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const insightsPruningService = new InsightsPruningService(
|
||||||
|
insightsByPeriodRepository,
|
||||||
|
insightsConfig,
|
||||||
|
mockLogger(),
|
||||||
|
);
|
||||||
|
const pruneSpy = jest.spyOn(insightsPruningService, 'pruneInsights');
|
||||||
|
const scheduleNextPruneSpy = jest.spyOn(insightsPruningService as any, 'scheduleNextPrune');
|
||||||
|
|
||||||
|
insightsPruningService.startPruningTimer();
|
||||||
|
|
||||||
|
// Wait for pruning timer promise to resolve
|
||||||
|
await jest.advanceTimersToNextTimerAsync();
|
||||||
|
|
||||||
|
expect(pruneSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(scheduleNextPruneSpy).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('if stopped during prune, it does not reschedule the timeout', async () => {
|
||||||
|
const insightsByPeriodRepository = mock<InsightsByPeriodRepository>({
|
||||||
|
pruneOldData: async () => {
|
||||||
|
return { affected: 0 };
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const insightsPruningService = new InsightsPruningService(
|
||||||
|
insightsByPeriodRepository,
|
||||||
|
insightsConfig,
|
||||||
|
mockLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let resolvePrune!: () => void;
|
||||||
|
const pruneInsightsMock = jest
|
||||||
|
.spyOn(insightsPruningService, 'pruneInsights')
|
||||||
|
.mockImplementation(
|
||||||
|
async () =>
|
||||||
|
await new Promise((resolve) => {
|
||||||
|
resolvePrune = () => resolve();
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
insightsConfig.pruneCheckIntervalHours = 1;
|
||||||
|
|
||||||
|
insightsPruningService.startPruningTimer();
|
||||||
|
jest.advanceTimersByTime(Time.hours.toMilliseconds + 1); // 1h + 1min
|
||||||
|
|
||||||
|
// Immediately stop while pruning is "in progress"
|
||||||
|
insightsPruningService.stopPruningTimer();
|
||||||
|
resolvePrune(); // Now allow the fake pruning to complete
|
||||||
|
|
||||||
|
// Wait for pruning timer promise and reschedule to resolve
|
||||||
|
await jest.runOnlyPendingTimersAsync();
|
||||||
|
|
||||||
|
expect(pruneInsightsMock).toHaveBeenCalledTimes(1); // Only from start, not re-scheduled
|
||||||
|
});
|
||||||
|
|
||||||
|
test('pruneInsights is retried up when failing', async () => {
|
||||||
|
const pruneOldDataSpy = jest
|
||||||
|
.spyOn(insightsByPeriodRepository, 'pruneOldData')
|
||||||
|
.mockRejectedValueOnce(new Error('Fail 1'))
|
||||||
|
.mockRejectedValueOnce(new Error('Fail 2'))
|
||||||
|
.mockResolvedValueOnce({ affected: 0 });
|
||||||
|
|
||||||
|
await insightsPruningService.pruneInsights();
|
||||||
|
await jest.advanceTimersByTimeAsync(Time.seconds.toMilliseconds * 2 + 1);
|
||||||
|
|
||||||
|
expect(pruneOldDataSpy).toHaveBeenCalledTimes(3);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,8 +1,7 @@
|
|||||||
import { mock } from 'jest-mock-extended';
|
|
||||||
import { InstanceSettings } from 'n8n-core';
|
import { InstanceSettings } from 'n8n-core';
|
||||||
import type { Logger } from 'n8n-core';
|
import type { Logger } from 'n8n-core';
|
||||||
|
|
||||||
import { mockInstance } from '@test/mocking';
|
import { mockInstance, mockLogger } from '@test/mocking';
|
||||||
|
|
||||||
import { InsightsModule } from '../insights.module';
|
import { InsightsModule } from '../insights.module';
|
||||||
import { InsightsService } from '../insights.service';
|
import { InsightsService } from '../insights.service';
|
||||||
@@ -13,13 +12,7 @@ describe('InsightsModule', () => {
|
|||||||
let instanceSettings: InstanceSettings;
|
let instanceSettings: InstanceSettings;
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
logger = mock<Logger>({
|
logger = mockLogger();
|
||||||
scoped: jest.fn().mockReturnValue(
|
|
||||||
mock<Logger>({
|
|
||||||
error: jest.fn(),
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
});
|
|
||||||
insightsService = mockInstance(InsightsService);
|
insightsService = mockInstance(InsightsService);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -28,14 +21,14 @@ describe('InsightsModule', () => {
|
|||||||
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true });
|
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: true });
|
||||||
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
|
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
|
||||||
insightsModule.initialize();
|
insightsModule.initialize();
|
||||||
expect(insightsService.startBackgroundProcess).toHaveBeenCalled();
|
expect(insightsService.startTimers).toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not start background process if instance is main but not leader', () => {
|
it('should not start background process if instance is main but not leader', () => {
|
||||||
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false });
|
instanceSettings = mockInstance(InstanceSettings, { instanceType: 'main', isLeader: false });
|
||||||
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
|
const insightsModule = new InsightsModule(logger, insightsService, instanceSettings);
|
||||||
insightsModule.initialize();
|
insightsModule.initialize();
|
||||||
expect(insightsService.startBackgroundProcess).not.toHaveBeenCalled();
|
expect(insightsService.startTimers).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import { Container } from '@n8n/di';
|
|||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
import { DateTime } from 'luxon';
|
import { DateTime } from 'luxon';
|
||||||
|
|
||||||
|
import { mockLogger } from '@test/mocking';
|
||||||
import { createTeamProject } from '@test-integration/db/projects';
|
import { createTeamProject } from '@test-integration/db/projects';
|
||||||
import { createWorkflow } from '@test-integration/db/workflows';
|
import { createWorkflow } from '@test-integration/db/workflows';
|
||||||
import * as testDb from '@test-integration/test-db';
|
import * as testDb from '@test-integration/test-db';
|
||||||
@@ -15,6 +16,8 @@ import { createCompactedInsightsEvent } from '../database/entities/__tests__/db-
|
|||||||
import type { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository';
|
import type { InsightsByPeriodRepository } from '../database/repositories/insights-by-period.repository';
|
||||||
import type { InsightsCollectionService } from '../insights-collection.service';
|
import type { InsightsCollectionService } from '../insights-collection.service';
|
||||||
import type { InsightsCompactionService } from '../insights-compaction.service';
|
import type { InsightsCompactionService } from '../insights-compaction.service';
|
||||||
|
import type { InsightsPruningService } from '../insights-pruning.service';
|
||||||
|
import type { InsightsConfig } from '../insights.config';
|
||||||
import { InsightsService } from '../insights.service';
|
import { InsightsService } from '../insights.service';
|
||||||
|
|
||||||
// Initialize DB once for all tests
|
// Initialize DB once for all tests
|
||||||
@@ -500,7 +503,10 @@ describe('getAvailableDateRanges', () => {
|
|||||||
mock<InsightsByPeriodRepository>(),
|
mock<InsightsByPeriodRepository>(),
|
||||||
mock<InsightsCompactionService>(),
|
mock<InsightsCompactionService>(),
|
||||||
mock<InsightsCollectionService>(),
|
mock<InsightsCollectionService>(),
|
||||||
|
mock<InsightsPruningService>(),
|
||||||
licenseMock,
|
licenseMock,
|
||||||
|
mock<InsightsConfig>(),
|
||||||
|
mockLogger(),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -600,7 +606,10 @@ describe('getMaxAgeInDaysAndGranularity', () => {
|
|||||||
mock<InsightsByPeriodRepository>(),
|
mock<InsightsByPeriodRepository>(),
|
||||||
mock<InsightsCompactionService>(),
|
mock<InsightsCompactionService>(),
|
||||||
mock<InsightsCollectionService>(),
|
mock<InsightsCollectionService>(),
|
||||||
|
mock<InsightsPruningService>(),
|
||||||
licenseMock,
|
licenseMock,
|
||||||
|
mock<InsightsConfig>(),
|
||||||
|
mockLogger(),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -664,3 +673,109 @@ describe('getMaxAgeInDaysAndGranularity', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('shutdown', () => {
|
||||||
|
let insightsService: InsightsService;
|
||||||
|
|
||||||
|
const mockCollectionService = mock<InsightsCollectionService>({
|
||||||
|
shutdown: jest.fn().mockResolvedValue(undefined),
|
||||||
|
stopFlushingTimer: jest.fn(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockCompactionService = mock<InsightsCompactionService>({
|
||||||
|
stopCompactionTimer: jest.fn(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockPruningService = mock<InsightsPruningService>({
|
||||||
|
stopPruningTimer: jest.fn(),
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeAll(() => {
|
||||||
|
insightsService = new InsightsService(
|
||||||
|
mock<InsightsByPeriodRepository>(),
|
||||||
|
mockCompactionService,
|
||||||
|
mockCollectionService,
|
||||||
|
mockPruningService,
|
||||||
|
mock<LicenseState>(),
|
||||||
|
mock<InsightsConfig>(),
|
||||||
|
mockLogger(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('shutdown stops timers and shuts down services', async () => {
|
||||||
|
// ACT
|
||||||
|
await insightsService.shutdown();
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(mockCollectionService.shutdown).toHaveBeenCalled();
|
||||||
|
expect(mockCompactionService.stopCompactionTimer).toHaveBeenCalled();
|
||||||
|
expect(mockPruningService.stopPruningTimer).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('timers', () => {
|
||||||
|
let insightsService: InsightsService;
|
||||||
|
|
||||||
|
const mockCollectionService = mock<InsightsCollectionService>({
|
||||||
|
startFlushingTimer: jest.fn(),
|
||||||
|
stopFlushingTimer: jest.fn(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockCompactionService = mock<InsightsCompactionService>({
|
||||||
|
startCompactionTimer: jest.fn(),
|
||||||
|
stopCompactionTimer: jest.fn(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockPruningService = mock<InsightsPruningService>({
|
||||||
|
startPruningTimer: jest.fn(),
|
||||||
|
stopPruningTimer: jest.fn(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const mockedLogger = mockLogger();
|
||||||
|
const mockedConfig = mock<InsightsConfig>({
|
||||||
|
maxAgeDays: -1,
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeAll(() => {
|
||||||
|
insightsService = new InsightsService(
|
||||||
|
mock<InsightsByPeriodRepository>(),
|
||||||
|
mockCompactionService,
|
||||||
|
mockCollectionService,
|
||||||
|
mockPruningService,
|
||||||
|
mock<LicenseState>(),
|
||||||
|
mockedConfig,
|
||||||
|
mockedLogger,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('startTimers starts timers except pruning', () => {
|
||||||
|
// ACT
|
||||||
|
insightsService.startTimers();
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(mockCompactionService.startCompactionTimer).toHaveBeenCalled();
|
||||||
|
expect(mockCollectionService.startFlushingTimer).toHaveBeenCalled();
|
||||||
|
expect(mockPruningService.startPruningTimer).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('startTimers starts pruning timer', () => {
|
||||||
|
// ARRANGE
|
||||||
|
mockedConfig.maxAgeDays = 30;
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
insightsService.startTimers();
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(mockPruningService.startPruningTimer).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
|
||||||
|
test('stopTimers stops timers', () => {
|
||||||
|
// ACT
|
||||||
|
insightsService.stopTimers();
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(mockCompactionService.stopCompactionTimer).toHaveBeenCalled();
|
||||||
|
expect(mockCollectionService.stopFlushingTimer).toHaveBeenCalled();
|
||||||
|
expect(mockPruningService.stopPruningTimer).toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@@ -1,26 +1,6 @@
|
|||||||
import { Container } from '@n8n/di';
|
|
||||||
|
|
||||||
import * as testDb from '@test-integration/test-db';
|
|
||||||
|
|
||||||
import { InsightsRawRepository } from '../../repositories/insights-raw.repository';
|
|
||||||
import { InsightsByPeriod } from '../insights-by-period';
|
import { InsightsByPeriod } from '../insights-by-period';
|
||||||
import type { PeriodUnit, TypeUnit } from '../insights-shared';
|
import type { PeriodUnit, TypeUnit } from '../insights-shared';
|
||||||
|
|
||||||
let insightsRawRepository: InsightsRawRepository;
|
|
||||||
|
|
||||||
beforeAll(async () => {
|
|
||||||
await testDb.init();
|
|
||||||
insightsRawRepository = Container.get(InsightsRawRepository);
|
|
||||||
});
|
|
||||||
|
|
||||||
beforeEach(async () => {
|
|
||||||
await insightsRawRepository.delete({});
|
|
||||||
});
|
|
||||||
|
|
||||||
afterAll(async () => {
|
|
||||||
await testDb.terminate();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe('Insights By Period', () => {
|
describe('Insights By Period', () => {
|
||||||
test.each(['time_saved_min', 'runtime_ms', 'failure', 'success'] satisfies TypeUnit[])(
|
test.each(['time_saved_min', 'runtime_ms', 'failure', 'success'] satisfies TypeUnit[])(
|
||||||
'`%s` can be serialized and deserialized correctly',
|
'`%s` can be serialized and deserialized correctly',
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { GlobalConfig } from '@n8n/config';
|
import { GlobalConfig } from '@n8n/config';
|
||||||
import { Container, Service } from '@n8n/di';
|
import { Container, Service } from '@n8n/di';
|
||||||
import type { SelectQueryBuilder } from '@n8n/typeorm';
|
import type { SelectQueryBuilder } from '@n8n/typeorm';
|
||||||
import { DataSource, Repository } from '@n8n/typeorm';
|
import { DataSource, LessThanOrEqual, Repository } from '@n8n/typeorm';
|
||||||
import { DateTime } from 'luxon';
|
import { DateTime } from 'luxon';
|
||||||
import { z } from 'zod';
|
import { z } from 'zod';
|
||||||
|
|
||||||
@@ -388,4 +388,13 @@ export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
|
|||||||
|
|
||||||
return aggregatedInsightsByTimeParser.parse(rawRows);
|
return aggregatedInsightsByTimeParser.parse(rawRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async pruneOldData(maxAgeInDays: number): Promise<{ affected: number | null | undefined }> {
|
||||||
|
const thresholdDate = DateTime.now().minus({ days: maxAgeInDays }).startOf('day').toJSDate();
|
||||||
|
const result = await this.delete({
|
||||||
|
periodStart: LessThanOrEqual(thresholdDate),
|
||||||
|
});
|
||||||
|
|
||||||
|
return { affected: result.affected };
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,73 @@
|
|||||||
|
import { Service } from '@n8n/di';
|
||||||
|
import { strict } from 'assert';
|
||||||
|
import { Logger } from 'n8n-core';
|
||||||
|
|
||||||
|
import { Time } from '@/constants';
|
||||||
|
|
||||||
|
import { InsightsByPeriodRepository } from './database/repositories/insights-by-period.repository';
|
||||||
|
import { InsightsConfig } from './insights.config';
|
||||||
|
|
||||||
|
@Service()
|
||||||
|
export class InsightsPruningService {
|
||||||
|
private pruneInsightsTimeout: NodeJS.Timeout | undefined;
|
||||||
|
|
||||||
|
private isStopped = true;
|
||||||
|
|
||||||
|
private readonly delayOnError = Time.seconds.toMilliseconds;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly insightsByPeriodRepository: InsightsByPeriodRepository,
|
||||||
|
private readonly config: InsightsConfig,
|
||||||
|
private readonly logger: Logger,
|
||||||
|
) {
|
||||||
|
this.logger = this.logger.scoped('insights');
|
||||||
|
}
|
||||||
|
|
||||||
|
startPruningTimer() {
|
||||||
|
strict(this.isStopped);
|
||||||
|
this.clearPruningTimer();
|
||||||
|
this.isStopped = false;
|
||||||
|
this.scheduleNextPrune();
|
||||||
|
this.logger.debug(`Insights pruning every ${this.config.pruneCheckIntervalHours} hours`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearPruningTimer() {
|
||||||
|
if (this.pruneInsightsTimeout !== undefined) {
|
||||||
|
clearTimeout(this.pruneInsightsTimeout);
|
||||||
|
this.pruneInsightsTimeout = undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stopPruningTimer() {
|
||||||
|
this.isStopped = true;
|
||||||
|
this.clearPruningTimer();
|
||||||
|
this.logger.debug('Stopped Insights pruning');
|
||||||
|
}
|
||||||
|
|
||||||
|
private scheduleNextPrune(
|
||||||
|
delayMs = this.config.pruneCheckIntervalHours * Time.hours.toMilliseconds,
|
||||||
|
) {
|
||||||
|
if (this.isStopped) return;
|
||||||
|
|
||||||
|
this.pruneInsightsTimeout = setTimeout(async () => {
|
||||||
|
await this.pruneInsights();
|
||||||
|
}, delayMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
async pruneInsights() {
|
||||||
|
this.logger.info('Pruning old insights data');
|
||||||
|
try {
|
||||||
|
const result = await this.insightsByPeriodRepository.pruneOldData(this.config.maxAgeDays);
|
||||||
|
this.logger.debug(
|
||||||
|
'Deleted insights by period',
|
||||||
|
result.affected ? { count: result.affected } : {},
|
||||||
|
);
|
||||||
|
this.scheduleNextPrune();
|
||||||
|
} catch (error: unknown) {
|
||||||
|
this.logger.warn('Pruning failed', { error });
|
||||||
|
|
||||||
|
// In case of failure, we retry the operation after a shorter time
|
||||||
|
this.scheduleNextPrune(this.delayOnError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -43,4 +43,18 @@ export class InsightsConfig {
|
|||||||
*/
|
*/
|
||||||
@Env('N8N_INSIGHTS_FLUSH_INTERVAL_SECONDS')
|
@Env('N8N_INSIGHTS_FLUSH_INTERVAL_SECONDS')
|
||||||
flushIntervalSeconds: number = 30;
|
flushIntervalSeconds: number = 30;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How old (days) insights data must be to qualify for regular deletion
|
||||||
|
* Default: -1 (no pruning)
|
||||||
|
*/
|
||||||
|
@Env('N8N_INSIGHTS_MAX_AGE_DAYS')
|
||||||
|
maxAgeDays: number = -1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How often (hours) insights data will be checked for regular deletion.
|
||||||
|
* Default: 24
|
||||||
|
*/
|
||||||
|
@Env('N8N_INSIGHTS_PRUNE_CHECK_INTERVAL_HOURS')
|
||||||
|
pruneCheckIntervalHours: number = 24;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,17 +20,17 @@ export class InsightsModule implements BaseN8nModule {
|
|||||||
// We want to initialize the insights background process (schedulers) for the main leader instance
|
// We want to initialize the insights background process (schedulers) for the main leader instance
|
||||||
// to have only one main instance saving the insights data
|
// to have only one main instance saving the insights data
|
||||||
if (this.instanceSettings.isLeader) {
|
if (this.instanceSettings.isLeader) {
|
||||||
this.insightsService.startBackgroundProcess();
|
this.insightsService.startTimers();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnLeaderTakeover()
|
@OnLeaderTakeover()
|
||||||
startBackgroundProcess() {
|
startBackgroundProcess() {
|
||||||
this.insightsService.startBackgroundProcess();
|
this.insightsService.startTimers();
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnLeaderStepdown()
|
@OnLeaderStepdown()
|
||||||
stopBackgroundProcess() {
|
stopBackgroundProcess() {
|
||||||
this.insightsService.stopBackgroundProcess();
|
this.insightsService.stopTimers();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import {
|
|||||||
import { LicenseState } from '@n8n/backend-common';
|
import { LicenseState } from '@n8n/backend-common';
|
||||||
import { OnShutdown } from '@n8n/decorators';
|
import { OnShutdown } from '@n8n/decorators';
|
||||||
import { Service } from '@n8n/di';
|
import { Service } from '@n8n/di';
|
||||||
|
import { Logger } from 'n8n-core';
|
||||||
import { UserError } from 'n8n-workflow';
|
import { UserError } from 'n8n-workflow';
|
||||||
|
|
||||||
import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared';
|
import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared';
|
||||||
@@ -13,6 +14,8 @@ import { NumberToType } from './database/entities/insights-shared';
|
|||||||
import { InsightsByPeriodRepository } from './database/repositories/insights-by-period.repository';
|
import { InsightsByPeriodRepository } from './database/repositories/insights-by-period.repository';
|
||||||
import { InsightsCollectionService } from './insights-collection.service';
|
import { InsightsCollectionService } from './insights-collection.service';
|
||||||
import { InsightsCompactionService } from './insights-compaction.service';
|
import { InsightsCompactionService } from './insights-compaction.service';
|
||||||
|
import { InsightsPruningService } from './insights-pruning.service';
|
||||||
|
import { InsightsConfig } from './insights.config';
|
||||||
|
|
||||||
const keyRangeToDays: Record<InsightsDateRange['key'], number> = {
|
const keyRangeToDays: Record<InsightsDateRange['key'], number> = {
|
||||||
day: 1,
|
day: 1,
|
||||||
@@ -30,23 +33,38 @@ export class InsightsService {
|
|||||||
private readonly insightsByPeriodRepository: InsightsByPeriodRepository,
|
private readonly insightsByPeriodRepository: InsightsByPeriodRepository,
|
||||||
private readonly compactionService: InsightsCompactionService,
|
private readonly compactionService: InsightsCompactionService,
|
||||||
private readonly collectionService: InsightsCollectionService,
|
private readonly collectionService: InsightsCollectionService,
|
||||||
|
private readonly pruningService: InsightsPruningService,
|
||||||
private readonly licenseState: LicenseState,
|
private readonly licenseState: LicenseState,
|
||||||
) {}
|
private readonly config: InsightsConfig,
|
||||||
|
private readonly logger: Logger,
|
||||||
startBackgroundProcess() {
|
) {
|
||||||
this.compactionService.startCompactionTimer();
|
this.logger = this.logger.scoped('insights');
|
||||||
this.collectionService.startFlushingTimer();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stopBackgroundProcess() {
|
get isPruningEnabled() {
|
||||||
|
return this.config.maxAgeDays > -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
startTimers() {
|
||||||
|
this.compactionService.startCompactionTimer();
|
||||||
|
this.collectionService.startFlushingTimer();
|
||||||
|
if (this.isPruningEnabled) {
|
||||||
|
this.pruningService.startPruningTimer();
|
||||||
|
}
|
||||||
|
this.logger.debug('Started compaction, flushing and pruning schedulers');
|
||||||
|
}
|
||||||
|
|
||||||
|
stopTimers() {
|
||||||
this.compactionService.stopCompactionTimer();
|
this.compactionService.stopCompactionTimer();
|
||||||
this.collectionService.stopFlushingTimer();
|
this.collectionService.stopFlushingTimer();
|
||||||
|
this.pruningService.stopPruningTimer();
|
||||||
|
this.logger.debug('Stopped compaction, flushing and pruning schedulers');
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnShutdown()
|
@OnShutdown()
|
||||||
async shutdown() {
|
async shutdown() {
|
||||||
await this.collectionService.shutdown();
|
await this.collectionService.shutdown();
|
||||||
this.compactionService.stopCompactionTimer();
|
this.stopTimers();
|
||||||
}
|
}
|
||||||
|
|
||||||
async getInsightsSummary({
|
async getInsightsSummary({
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import type { InstanceSettings } from 'n8n-core';
|
|||||||
|
|
||||||
import { mockLogger } from '@test/mocking';
|
import { mockLogger } from '@test/mocking';
|
||||||
|
|
||||||
import { PruningService } from '../pruning.service';
|
import { ExecutionsPruningService } from '../executions-pruning.service';
|
||||||
|
|
||||||
jest.mock('@/db', () => ({
|
jest.mock('@/db', () => ({
|
||||||
connectionState: { migrated: true },
|
connectionState: { migrated: true },
|
||||||
@@ -13,7 +13,7 @@ jest.mock('@/db', () => ({
|
|||||||
describe('PruningService', () => {
|
describe('PruningService', () => {
|
||||||
describe('init', () => {
|
describe('init', () => {
|
||||||
it('should start pruning on main instance that is the leader', () => {
|
it('should start pruning on main instance that is the leader', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: true, isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -28,7 +28,7 @@ describe('PruningService', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should not start pruning on main instance that is a follower', () => {
|
it('should not start pruning on main instance that is a follower', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: false, isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: false, isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -45,7 +45,7 @@ describe('PruningService', () => {
|
|||||||
|
|
||||||
describe('isEnabled', () => {
|
describe('isEnabled', () => {
|
||||||
it('should return `true` based on config if leader main', () => {
|
it('should return `true` based on config if leader main', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -57,7 +57,7 @@ describe('PruningService', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should return `false` based on config if leader main', () => {
|
it('should return `false` based on config if leader main', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -69,7 +69,7 @@ describe('PruningService', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should return `false` if non-main even if config is enabled', () => {
|
it('should return `false` if non-main even if config is enabled', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker', isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: false, instanceType: 'worker', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -81,7 +81,7 @@ describe('PruningService', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should return `false` if follower main even if config is enabled', () => {
|
it('should return `false` if follower main even if config is enabled', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({
|
mock<InstanceSettings>({
|
||||||
isLeader: false,
|
isLeader: false,
|
||||||
@@ -100,7 +100,7 @@ describe('PruningService', () => {
|
|||||||
|
|
||||||
describe('startPruning', () => {
|
describe('startPruning', () => {
|
||||||
it('should not start pruning if service is disabled', () => {
|
it('should not start pruning if service is disabled', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -124,7 +124,7 @@ describe('PruningService', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should start pruning if service is enabled and DB is migrated', () => {
|
it('should start pruning if service is enabled and DB is migrated', () => {
|
||||||
const pruningService = new PruningService(
|
const pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
mock<InstanceSettings>({ isLeader: true, instanceType: 'main', isMultiMain: true }),
|
||||||
mock(),
|
mock(),
|
||||||
@@ -23,7 +23,7 @@ import { connectionState as dbConnectionState } from '@/db';
|
|||||||
* - Once mostly caught up, hard deletion goes back to the 15m schedule.
|
* - Once mostly caught up, hard deletion goes back to the 15m schedule.
|
||||||
*/
|
*/
|
||||||
@Service()
|
@Service()
|
||||||
export class PruningService {
|
export class ExecutionsPruningService {
|
||||||
/** Timer for soft-deleting executions on a rolling basis. */
|
/** Timer for soft-deleting executions on a rolling basis. */
|
||||||
private softDeletionInterval: NodeJS.Timer | undefined;
|
private softDeletionInterval: NodeJS.Timer | undefined;
|
||||||
|
|
||||||
@@ -6,7 +6,7 @@ import { BinaryDataService, InstanceSettings } from 'n8n-core';
|
|||||||
import type { ExecutionStatus, IWorkflowBase } from 'n8n-workflow';
|
import type { ExecutionStatus, IWorkflowBase } from 'n8n-workflow';
|
||||||
|
|
||||||
import { Time } from '@/constants';
|
import { Time } from '@/constants';
|
||||||
import { PruningService } from '@/services/pruning/pruning.service';
|
import { ExecutionsPruningService } from '@/services/pruning/executions-pruning.service';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
annotateExecution,
|
annotateExecution,
|
||||||
@@ -18,7 +18,7 @@ import * as testDb from './shared/test-db';
|
|||||||
import { mockInstance, mockLogger } from '../shared/mocking';
|
import { mockInstance, mockLogger } from '../shared/mocking';
|
||||||
|
|
||||||
describe('softDeleteOnPruningCycle()', () => {
|
describe('softDeleteOnPruningCycle()', () => {
|
||||||
let pruningService: PruningService;
|
let pruningService: ExecutionsPruningService;
|
||||||
const instanceSettings = Container.get(InstanceSettings);
|
const instanceSettings = Container.get(InstanceSettings);
|
||||||
instanceSettings.markAsLeader();
|
instanceSettings.markAsLeader();
|
||||||
|
|
||||||
@@ -31,7 +31,7 @@ describe('softDeleteOnPruningCycle()', () => {
|
|||||||
await testDb.init();
|
await testDb.init();
|
||||||
|
|
||||||
executionsConfig = Container.get(ExecutionsConfig);
|
executionsConfig = Container.get(ExecutionsConfig);
|
||||||
pruningService = new PruningService(
|
pruningService = new ExecutionsPruningService(
|
||||||
mockLogger(),
|
mockLogger(),
|
||||||
instanceSettings,
|
instanceSettings,
|
||||||
Container.get(ExecutionRepository),
|
Container.get(ExecutionRepository),
|
||||||
|
|||||||
Reference in New Issue
Block a user