fix(core): Add mechanism to prevent concurrent compaction on Insights (#14988)

Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
Guillaume Jacquart
2025-05-08 12:18:51 +02:00
committed by GitHub
parent 539f4cc5be
commit 392e91480a
3 changed files with 163 additions and 63 deletions

View File

@@ -1,8 +1,10 @@
import { GlobalConfig } from '@n8n/config'; import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di'; import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon'; import { DateTime } from 'luxon';
import { InsightsRawRepository } from '@/modules/insights/database/repositories/insights-raw.repository'; import { InsightsRawRepository } from '@/modules/insights/database/repositories/insights-raw.repository';
import { mockLogger } from '@test/mocking';
import { createTeamProject } from '@test-integration/db/projects'; import { createTeamProject } from '@test-integration/db/projects';
import { createWorkflow } from '@test-integration/db/workflows'; import { createWorkflow } from '@test-integration/db/workflows';
import * as testDb from '@test-integration/test-db'; import * as testDb from '@test-integration/test-db';
@@ -45,6 +47,7 @@ if (dbType === 'sqlite' && !globalConfig.database.sqlite.poolSize) {
afterAll(async () => { afterAll(async () => {
await testDb.terminate(); await testDb.terminate();
}); });
describe('compaction', () => { describe('compaction', () => {
describe('compactRawToHour', () => { describe('compactRawToHour', () => {
type TestData = { type TestData = {
@@ -317,14 +320,21 @@ if (dbType === 'sqlite' && !globalConfig.database.sqlite.poolSize) {
describe('compactionSchedule', () => { describe('compactionSchedule', () => {
test('compaction is running on schedule', async () => { test('compaction is running on schedule', async () => {
// ARRANGE
jest.useFakeTimers(); jest.useFakeTimers();
try { const insightsCompactionService = new InsightsCompactionService(
// ARRANGE mock<InsightsByPeriodRepository>(),
const insightsCompactionService = Container.get(InsightsCompactionService); mock<InsightsRawRepository>(),
insightsCompactionService.startCompactionTimer(); mock<InsightsConfig>({
compactionIntervalMinutes: 60,
}),
mockLogger(),
);
// spy on the compactInsights method to check if it's called
const compactInsightsSpy = jest.spyOn(insightsCompactionService, 'compactInsights');
// spy on the compactInsights method to check if it's called try {
const compactInsightsSpy = jest.spyOn(insightsCompactionService, 'compactInsights'); insightsCompactionService.startCompactionTimer();
// ACT // ACT
// advance by 1 hour and 1 minute // advance by 1 hour and 1 minute
@@ -333,6 +343,7 @@ if (dbType === 'sqlite' && !globalConfig.database.sqlite.poolSize) {
// ASSERT // ASSERT
expect(compactInsightsSpy).toHaveBeenCalledTimes(1); expect(compactInsightsSpy).toHaveBeenCalledTimes(1);
} finally { } finally {
insightsCompactionService.stopCompactionTimer();
jest.useRealTimers(); jest.useRealTimers();
} }
}); });

View File

@@ -0,0 +1,77 @@
import { Container } from '@n8n/di';
import { DateTime } from 'luxon';
import { InsightsConfig } from '@/modules/insights/insights.config';
import { createTeamProject } from '@test-integration/db/projects';
import { createWorkflow } from '@test-integration/db/workflows';
import * as testDb from '@test-integration/test-db';
import { createCompactedInsightsEvent, createMetadata } from '../../entities/__tests__/db-utils';
import { InsightsByPeriodRepository } from '../insights-by-period.repository';
describe('InsightsByPeriodRepository', () => {
beforeAll(async () => {
await testDb.init();
});
describe('Avoid deadlock error', () => {
let defaultBatchSize: number;
beforeAll(() => {
// Store the original config value
const insightsConfig = Container.get(InsightsConfig);
defaultBatchSize = insightsConfig.compactionBatchSize;
// Set a smaller batch size to trigger the deadlock error
insightsConfig.compactionBatchSize = 3;
});
afterAll(() => {
// Reset the config to its original state
const insightsConfig = Container.get(InsightsConfig);
insightsConfig.compactionBatchSize = defaultBatchSize;
});
test('should not throw deadlock error on concurrent compaction', async () => {
// ARRANGE
const insightsConfig = Container.get(InsightsConfig);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const transactionSpy = jest.spyOn(insightsByPeriodRepository.manager, 'transaction');
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createMetadata(workflow);
const batchQuery = insightsByPeriodRepository.getPeriodInsightsBatchQuery({
periodUnitToCompactFrom: 'hour',
compactionBatchSize: insightsConfig.compactionBatchSize,
maxAgeInDays: insightsConfig.compactionHourlyToDailyThresholdDays,
});
// Create test data
const promises = [];
for (let i = 0; i < 100; i++) {
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'hour',
periodStart: DateTime.now().minus({ day: 91, hour: i + 1 }),
});
}
// ACT
for (let i = 0; i < 10; i++) {
promises.push(
insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({
sourceBatchQuery: batchQuery,
sourceTableName: insightsByPeriodRepository.metadata.tableName,
periodUnitToCompactInto: 'day',
}),
);
}
// ASSERT
// await all promises concurrently
await expect(Promise.all(promises)).resolves.toBeDefined();
expect(transactionSpy).toHaveBeenCalledTimes(1);
});
});
});

View File

@@ -57,6 +57,8 @@ const aggregatedInsightsByTimeParser = z
@Service() @Service()
export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> { export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
private isRunningCompaction = false;
constructor(dataSource: DataSource) { constructor(dataSource: DataSource) {
super(InsightsByPeriod, dataSource.manager); super(InsightsByPeriod, dataSource.manager);
} }
@@ -171,73 +173,83 @@ export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
*/ */
periodUnitToCompactInto: PeriodUnit; periodUnitToCompactInto: PeriodUnit;
}): Promise<number> { }): Promise<number> {
// Create temp table that only exists in this transaction for rows to compact // Skip compaction if the process is already running
const getBatchAndStoreInTemporaryTable = sql` if (this.isRunningCompaction) {
CREATE TEMPORARY TABLE rows_to_compact AS return 0;
${sourceBatchQuery.getSql()}; }
`; this.isRunningCompaction = true;
const countBatch = sql` try {
SELECT COUNT(*) ${this.escapeField('rowsInBatch')} FROM rows_to_compact; // Create temp table that only exists in this transaction for rows to compact
`; const getBatchAndStoreInTemporaryTable = sql`
CREATE TEMPORARY TABLE rows_to_compact AS
${sourceBatchQuery.getSql()};
`;
const targetColumnNamesStr = ['metaId', 'type', 'periodUnit', 'periodStart'] const countBatch = sql`
.map((param) => this.escapeField(param)) SELECT COUNT(*) ${this.escapeField('rowsInBatch')} FROM rows_to_compact;
.join(', '); `;
const targetColumnNamesWithValue = `${targetColumnNamesStr}, value`;
// Function to get the aggregation query const targetColumnNamesStr = ['metaId', 'type', 'periodUnit', 'periodStart']
const aggregationQuery = this.getAggregationQuery(periodUnitToCompactInto); .map((param) => this.escapeField(param))
.join(', ');
const targetColumnNamesWithValue = `${targetColumnNamesStr}, value`;
// Insert or update aggregated data // Function to get the aggregation query
const insertQueryBase = sql` const aggregationQuery = this.getAggregationQuery(periodUnitToCompactInto);
INSERT INTO ${this.metadata.tableName}
(${targetColumnNamesWithValue})
${aggregationQuery.getSql()}
`;
// Database-specific duplicate key logic // Insert or update aggregated data
let deduplicateQuery: string; const insertQueryBase = sql`
if (dbType === 'mysqldb' || dbType === 'mariadb') { INSERT INTO ${this.metadata.tableName}
deduplicateQuery = sql` (${targetColumnNamesWithValue})
${aggregationQuery.getSql()}
`;
// Database-specific duplicate key logic
let deduplicateQuery: string;
if (dbType === 'mysqldb' || dbType === 'mariadb') {
deduplicateQuery = sql`
ON DUPLICATE KEY UPDATE value = value + VALUES(value)`; ON DUPLICATE KEY UPDATE value = value + VALUES(value)`;
} else { } else {
deduplicateQuery = sql` deduplicateQuery = sql`
ON CONFLICT(${targetColumnNamesStr}) ON CONFLICT(${targetColumnNamesStr})
DO UPDATE SET value = ${this.metadata.tableName}.value + excluded.value DO UPDATE SET value = ${this.metadata.tableName}.value + excluded.value
RETURNING *`; RETURNING *`;
}
const upsertEvents = sql`
${insertQueryBase}
${deduplicateQuery}
`;
// Delete the processed rows
const deleteBatch = sql`
DELETE FROM ${sourceTableName}
WHERE id IN (SELECT id FROM rows_to_compact);
`;
// Clean up
const dropTemporaryTable = sql`
DROP TABLE rows_to_compact;
`;
const result = await this.manager.transaction(async (trx) => {
await trx.query(getBatchAndStoreInTemporaryTable);
await trx.query<Array<{ type: any; value: number }>>(upsertEvents);
const rowsInBatch = await trx.query<[{ rowsInBatch: number | string }]>(countBatch);
await trx.query(deleteBatch);
await trx.query(dropTemporaryTable);
return Number(rowsInBatch[0].rowsInBatch);
});
return result;
} finally {
this.isRunningCompaction = false;
} }
const upsertEvents = sql`
${insertQueryBase}
${deduplicateQuery}
`;
// Delete the processed rows
const deleteBatch = sql`
DELETE FROM ${sourceTableName}
WHERE id IN (SELECT id FROM rows_to_compact);
`;
// Clean up
const dropTemporaryTable = sql`
DROP TABLE rows_to_compact;
`;
const result = await this.manager.transaction(async (trx) => {
await trx.query(getBatchAndStoreInTemporaryTable);
await trx.query<Array<{ type: any; value: number }>>(upsertEvents);
const rowsInBatch = await trx.query<[{ rowsInBatch: number | string }]>(countBatch);
await trx.query(deleteBatch);
await trx.query(dropTemporaryTable);
return Number(rowsInBatch[0].rowsInBatch);
});
return result;
} }
private getAgeLimitQuery(maxAgeInDays: number) { private getAgeLimitQuery(maxAgeInDays: number) {