feat(API): Implement compaction logic for insights (#14062)

Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
Guillaume Jacquart
2025-03-24 09:59:32 +01:00
committed by GitHub
parent e0f9506912
commit d8433d2895
6 changed files with 669 additions and 6 deletions

View File

@@ -14,6 +14,12 @@ import { createTeamProject } from '@test-integration/db/projects';
import { createWorkflow } from '@test-integration/db/workflows';
import * as testDb from '@test-integration/test-db';
import {
createMetadata,
createRawInsightsEvent,
createCompactedInsightsEvent,
createRawInsightsEvents,
} from '../entities/__tests__/db-utils';
import { InsightsService } from '../insights.service';
import { InsightsByPeriodRepository } from '../repositories/insights-by-period.repository';
@@ -30,13 +36,22 @@ async function truncateAll() {
}
}
// Initialize DB once for all tests
beforeAll(async () => {
jest.useFakeTimers();
await testDb.init();
});
// Terminate DB once after all tests complete
afterAll(async () => {
await testDb.terminate();
});
describe('workflowExecuteAfterHandler', () => {
let insightsService: InsightsService;
let insightsRawRepository: InsightsRawRepository;
let insightsMetadataRepository: InsightsMetadataRepository;
beforeAll(async () => {
await testDb.init();
insightsService = Container.get(InsightsService);
insightsRawRepository = Container.get(InsightsRawRepository);
insightsMetadataRepository = Container.get(InsightsMetadataRepository);
@@ -245,3 +260,345 @@ describe('workflowExecuteAfterHandler', () => {
);
});
});
describe('compaction', () => {
beforeEach(async () => {
await truncateAll();
});
describe('compactRawToHour', () => {
type TestData = {
name: string;
timestamps: DateTime[];
batches: number[];
};
test.each<TestData>([
{
name: 'compact into 2 rows',
timestamps: [
DateTime.utc(2000, 1, 1, 0, 0),
DateTime.utc(2000, 1, 1, 0, 59),
DateTime.utc(2000, 1, 1, 1, 0),
],
batches: [2, 1],
},
{
name: 'compact into 3 rows',
timestamps: [
DateTime.utc(2000, 1, 1, 0, 0),
DateTime.utc(2000, 1, 1, 1, 0),
DateTime.utc(2000, 1, 1, 2, 0),
],
batches: [1, 1, 1],
},
])('$name', async ({ timestamps, batches }) => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
// create before so we can create the raw events in parallel
await createMetadata(workflow);
for (const timestamp of timestamps) {
await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp });
}
// ACT
const compactedRows = await insightsService.compactRawToHour();
// ASSERT
expect(compactedRows).toBe(timestamps.length);
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
expect(allCompacted).toHaveLength(batches.length);
for (const [index, compacted] of allCompacted.entries()) {
expect(compacted.value).toBe(batches[index]);
}
});
test('batch compaction split events in hourly insight periods', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
const batchSize = 100;
let timestamp = DateTime.utc().startOf('hour');
for (let i = 0; i < batchSize; i++) {
await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp });
// create 60 events per hour
timestamp = timestamp.plus({ minute: 1 });
}
// ACT
await insightsService.compactInsights();
// ASSERT
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0);
expect(accumulatedValues).toBe(batchSize);
expect(allCompacted[0].value).toBe(60);
expect(allCompacted[1].value).toBe(40);
});
test('batch compaction split events in hourly insight periods by type and workflow', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject();
const workflow1 = await createWorkflow({}, project);
const workflow2 = await createWorkflow({}, project);
const batchSize = 100;
let timestamp = DateTime.utc().startOf('hour');
for (let i = 0; i < batchSize / 4; i++) {
await createRawInsightsEvent(workflow1, { type: 'success', value: 1, timestamp });
timestamp = timestamp.plus({ minute: 1 });
}
for (let i = 0; i < batchSize / 4; i++) {
await createRawInsightsEvent(workflow1, { type: 'failure', value: 1, timestamp });
timestamp = timestamp.plus({ minute: 1 });
}
for (let i = 0; i < batchSize / 4; i++) {
await createRawInsightsEvent(workflow2, { type: 'runtime_ms', value: 1200, timestamp });
timestamp = timestamp.plus({ minute: 1 });
}
for (let i = 0; i < batchSize / 4; i++) {
await createRawInsightsEvent(workflow2, { type: 'time_saved_min', value: 3, timestamp });
timestamp = timestamp.plus({ minute: 1 });
}
// ACT
await insightsService.compactInsights();
// ASSERT
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({
order: { metaId: 'ASC', periodStart: 'ASC' },
});
// Expect 2 insights for workflow 1 (for success and failure)
// and 3 for workflow 2 (2 period starts for runtime_ms and 1 for time_saved_min)
expect(allCompacted).toHaveLength(5);
const metaIds = allCompacted.map((event) => event.metaId);
// meta id are ordered. first 2 are for workflow 1, last 3 are for workflow 2
const uniqueMetaIds = [metaIds[0], metaIds[2]];
const workflow1Insights = allCompacted.filter((event) => event.metaId === uniqueMetaIds[0]);
const workflow2Insights = allCompacted.filter((event) => event.metaId === uniqueMetaIds[1]);
expect(workflow1Insights).toHaveLength(2);
expect(workflow2Insights).toHaveLength(3);
const successInsights = workflow1Insights.find((event) => event.type === 'success');
const failureInsights = workflow1Insights.find((event) => event.type === 'failure');
expect(successInsights).toBeTruthy();
expect(failureInsights).toBeTruthy();
// success and failure insights should have the value matching the number or raw events (because value = 1)
expect(successInsights!.value).toBe(25);
expect(failureInsights!.value).toBe(25);
const runtimeMsEvents = workflow2Insights.filter((event) => event.type === 'runtime_ms');
const timeSavedMinEvents = workflow2Insights.find((event) => event.type === 'time_saved_min');
expect(runtimeMsEvents).toHaveLength(2);
// The last 10 minutes of the first hour
expect(runtimeMsEvents[0].value).toBe(1200 * 10);
// The first 15 minutes of the second hour
expect(runtimeMsEvents[1].value).toBe(1200 * 15);
expect(timeSavedMinEvents).toBeTruthy();
expect(timeSavedMinEvents!.value).toBe(3 * 25);
});
test('should return the number of compacted events', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
const batchSize = 100;
let timestamp = DateTime.utc(2000, 1, 1, 0, 0);
for (let i = 0; i < batchSize; i++) {
await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp });
// create 60 events per hour
timestamp = timestamp.plus({ minute: 1 });
}
// ACT
const numberOfCompactedData = await insightsService.compactRawToHour();
// ASSERT
expect(numberOfCompactedData).toBe(100);
});
test('works with data in the compacted table', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
const batchSize = 100;
let timestamp = DateTime.utc().startOf('hour');
// Create an existing compacted event for the first hour
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 10,
periodUnit: 'hour',
periodStart: timestamp,
});
const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>();
for (let i = 0; i < batchSize; i++) {
events.push({ type: 'success', value: 1, timestamp });
timestamp = timestamp.plus({ minute: 1 });
}
await createRawInsightsEvents(workflow, events);
// ACT
await insightsService.compactInsights();
// ASSERT
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0);
expect(accumulatedValues).toBe(batchSize + 10);
expect(allCompacted[0].value).toBe(70);
expect(allCompacted[1].value).toBe(40);
});
test('works with data bigger than the batch size', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
// spy on the compactRawToHour method to check if it's called multiple times
const rawToHourSpy = jest.spyOn(insightsService, 'compactRawToHour');
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
const batchSize = 600;
let timestamp = DateTime.utc().startOf('hour');
const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>();
for (let i = 0; i < batchSize; i++) {
events.push({ type: 'success', value: 1, timestamp });
timestamp = timestamp.plus({ minute: 1 });
}
await createRawInsightsEvents(workflow, events);
// ACT
await insightsService.compactInsights();
// ASSERT
expect(rawToHourSpy).toHaveBeenCalledTimes(3);
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0);
expect(accumulatedValues).toBe(batchSize);
});
test('compaction is running on schedule', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
// spy on the compactInsights method to check if it's called
insightsService.compactInsights = jest.fn();
// ACT
// advance by 1 hour and 1 minute
jest.advanceTimersByTime(1000 * 60 * 60);
// ASSERT
expect(insightsService.compactInsights).toHaveBeenCalledTimes(1);
});
});
describe('compactHourToDay', () => {
type TestData = {
name: string;
periodStarts: DateTime[];
batches: number[];
};
test.each<TestData>([
{
name: 'compact into 2 rows',
periodStarts: [
DateTime.utc(2000, 1, 1, 0, 0),
DateTime.utc(2000, 1, 1, 23, 59),
DateTime.utc(2000, 1, 2, 1, 0),
],
batches: [2, 1],
},
{
name: 'compact into 3 rows',
periodStarts: [
DateTime.utc(2000, 1, 1, 0, 0),
DateTime.utc(2000, 1, 1, 23, 59),
DateTime.utc(2000, 1, 2, 0, 0),
DateTime.utc(2000, 1, 2, 23, 59),
DateTime.utc(2000, 1, 3, 23, 59),
],
batches: [2, 2, 1],
},
])('$name', async ({ periodStarts, batches }) => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
// create before so we can create the raw events in parallel
await createMetadata(workflow);
for (const periodStart of periodStarts) {
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'hour',
periodStart,
});
}
// ACT
const compactedRows = await insightsService.compactHourToDay();
// ASSERT
expect(compactedRows).toBe(periodStarts.length);
await expect(insightsRawRepository.count()).resolves.toBe(0);
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
expect(allCompacted).toHaveLength(batches.length);
for (const [index, compacted] of allCompacted.entries()) {
expect(compacted.value).toBe(batches[index]);
}
});
});
});

View File

@@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import type { DateTime } from 'luxon';
import type { IWorkflowBase } from 'n8n-workflow';
@@ -7,8 +8,10 @@ import { SharedWorkflowRepository } from '@/databases/repositories/shared-workfl
import { InsightsMetadata } from '../../entities/insights-metadata';
import { InsightsRaw } from '../../entities/insights-raw';
import { InsightsByPeriodRepository } from '../../repositories/insights-by-period.repository';
import { InsightsMetadataRepository } from '../../repositories/insights-metadata.repository';
import { InsightsRawRepository } from '../../repositories/insights-raw.repository';
import { InsightsByPeriod } from '../insights-by-period';
async function getWorkflowSharing(workflow: IWorkflowBase) {
return await Container.get(SharedWorkflowRepository).find({
@@ -16,6 +19,7 @@ async function getWorkflowSharing(workflow: IWorkflowBase) {
relations: { project: true },
});
}
export const { type: dbType } = Container.get(GlobalConfig).database;
export async function createMetadata(workflow: WorkflowEntity) {
const insightsMetadataRepository = Container.get(InsightsMetadataRepository);
@@ -62,3 +66,49 @@ export async function createRawInsightsEvent(
}
return await insightsRawRepository.save(event);
}
export async function createRawInsightsEvents(
workflow: WorkflowEntity,
parametersArray: Array<{
type: InsightsRaw['type'];
value: number;
timestamp?: DateTime;
}>,
) {
const insightsRawRepository = Container.get(InsightsRawRepository);
const metadata = await createMetadata(workflow);
const events = parametersArray.map((parameters) => {
const event = new InsightsRaw();
event.metaId = metadata.metaId;
event.type = parameters.type;
event.value = parameters.value;
if (parameters.timestamp) {
event.timestamp = parameters.timestamp.toUTC().toJSDate();
}
return event;
});
await insightsRawRepository.save(events);
}
export async function createCompactedInsightsEvent(
workflow: WorkflowEntity,
parameters: {
type: InsightsByPeriod['type'];
value: number;
periodUnit: InsightsByPeriod['periodUnit'];
periodStart: DateTime;
},
) {
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const metadata = await createMetadata(workflow);
const event = new InsightsByPeriod();
event.metaId = metadata.metaId;
event.type = parameters.type;
event.value = parameters.value;
event.periodUnit = parameters.periodUnit;
event.periodStart = parameters.periodStart.toUTC().startOf(parameters.periodUnit).toJSDate();
return await insightsByPeriodRepository.save(event);
}

View File

@@ -0,0 +1,18 @@
import { Config, Env } from '@n8n/config/src/decorators';
@Config
export class InsightsConfig {
/**
* The interval in minutes at which the insights data should be compacted.
* Default: 60
*/
@Env('N8N_INSIGHTS_COMPACTION_INTERVAL_MINUTES')
compactionIntervalMinutes: number = 60;
/**
* The number of raw insights data to compact in a single batch.
* Default: 500
*/
@Env('N8N_INSIGHTS_COMPACTION_BATCH_SIZE')
compactionBatchSize: number = 500;
}

View File

@@ -1,13 +1,20 @@
import { Service } from '@n8n/di';
import { Container, Service } from '@n8n/di';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import { UnexpectedError } from 'n8n-workflow';
import type { ExecutionStatus, IRun, WorkflowExecuteMode } from 'n8n-workflow';
import { UnexpectedError } from 'n8n-workflow';
import { SharedWorkflow } from '@/databases/entities/shared-workflow';
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { OnShutdown } from '@/decorators/on-shutdown';
import { InsightsMetadata } from '@/modules/insights/entities/insights-metadata';
import { InsightsRaw } from '@/modules/insights/entities/insights-raw';
import { InsightsConfig } from './insights.config';
import { InsightsByPeriodRepository } from './repositories/insights-by-period.repository';
import { InsightsRawRepository } from './repositories/insights-raw.repository';
const config = Container.get(InsightsConfig);
const shouldSkipStatus: Record<ExecutionStatus, boolean> = {
success: false,
crashed: false,
@@ -35,7 +42,27 @@ const shouldSkipMode: Record<WorkflowExecuteMode, boolean> = {
@Service()
export class InsightsService {
constructor(private readonly sharedWorkflowRepository: SharedWorkflowRepository) {}
private compactInsightsTimer: NodeJS.Timer | undefined;
constructor(
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly insightsByPeriodRepository: InsightsByPeriodRepository,
private readonly insightsRawRepository: InsightsRawRepository,
) {
const intervalMilliseconds = config.compactionIntervalMinutes * 60 * 1000;
this.compactInsightsTimer = setInterval(
async () => await this.compactInsights(),
intervalMilliseconds,
);
}
@OnShutdown()
shutdown() {
if (this.compactInsightsTimer !== undefined) {
clearInterval(this.compactInsightsTimer);
this.compactInsightsTimer = undefined;
}
}
async workflowExecuteAfterHandler(ctx: ExecutionLifecycleHooks, fullRunData: IRun) {
if (shouldSkipStatus[fullRunData.status] || shouldSkipMode[fullRunData.mode]) {
@@ -107,4 +134,48 @@ export class InsightsService {
}
});
}
async compactInsights() {
let numberOfCompactedRawData: number;
// Compact raw data to hourly aggregates
do {
numberOfCompactedRawData = await this.compactRawToHour();
} while (numberOfCompactedRawData > 0);
let numberOfCompactedHourData: number;
// Compact hourly data to daily aggregates
do {
numberOfCompactedHourData = await this.compactHourToDay();
} while (numberOfCompactedHourData > 0);
}
// Compacts raw data to hourly aggregates
async compactRawToHour() {
// Build the query to gather raw insights data for the batch
const batchQuery = this.insightsRawRepository.getRawInsightsBatchQuery(
config.compactionBatchSize,
);
return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({
sourceBatchQuery: batchQuery.getSql(),
sourceTableName: this.insightsRawRepository.metadata.tableName,
periodUnit: 'hour',
});
}
// Compacts hourly data to daily aggregates
async compactHourToDay() {
// get hour data query for batching
const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery(
'hour',
config.compactionBatchSize,
);
return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({
sourceBatchQuery: batchQuery.getSql(),
periodUnit: 'day',
});
}
}

View File

@@ -1,11 +1,164 @@
import { Service } from '@n8n/di';
import { GlobalConfig } from '@n8n/config';
import { Container, Service } from '@n8n/di';
import { DataSource, Repository } from '@n8n/typeorm';
import { sql } from '@/utils/sql';
import { InsightsByPeriod } from '../entities/insights-by-period';
import type { PeriodUnits } from '../entities/insights-shared';
import { PeriodUnitToNumber } from '../entities/insights-shared';
const dbType = Container.get(GlobalConfig).database.type;
@Service()
export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
constructor(dataSource: DataSource) {
super(InsightsByPeriod, dataSource.manager);
}
private escapeField(fieldName: string) {
return this.manager.connection.driver.escape(fieldName);
}
private getPeriodFilterExpr(periodUnit: PeriodUnits) {
const daysAgo = periodUnit === 'day' ? 90 : 180;
// Database-specific period start expression to filter out data to compact by days matching the periodUnit
let periodStartExpr = `date('now', '-${daysAgo} days')`;
if (dbType === 'postgresdb') {
periodStartExpr = `CURRENT_DATE - INTERVAL '${daysAgo} day'`;
} else if (dbType === 'mysqldb' || dbType === 'mariadb') {
periodStartExpr = `DATE_SUB(CURRENT_DATE, INTERVAL ${daysAgo} DAY)`;
}
return periodStartExpr;
}
private getPeriodStartExpr(periodUnit: PeriodUnits) {
// Database-specific period start expression to truncate timestamp to the periodUnit
// SQLite by default
let periodStartExpr = `strftime('%Y-%m-%d ${periodUnit === 'hour' ? '%H' : '00'}:00:00.000', periodStart)`;
if (dbType === 'mysqldb' || dbType === 'mariadb') {
periodStartExpr =
periodUnit === 'hour'
? "DATE_FORMAT(periodStart, '%Y-%m-%d %H:00:00')"
: "DATE_FORMAT(periodStart, '%Y-%m-%d 00:00:00')";
} else if (dbType === 'postgresdb') {
periodStartExpr = `DATE_TRUNC('${periodUnit}', ${this.escapeField('periodStart')})`;
}
return periodStartExpr;
}
getPeriodInsightsBatchQuery(periodUnit: PeriodUnits, compactionBatchSize: number) {
// Build the query to gather period insights data for the batch
const batchQuery = this.createQueryBuilder()
.select(
['id', 'metaId', 'type', 'periodStart', 'value'].map((fieldName) =>
this.escapeField(fieldName),
),
)
.where(`${this.escapeField('periodUnit')} = ${PeriodUnitToNumber[periodUnit]}`)
.andWhere(`${this.escapeField('periodStart')} < ${this.getPeriodFilterExpr('day')}`)
.orderBy(this.escapeField('periodStart'), 'ASC')
.limit(compactionBatchSize);
return batchQuery;
}
getAggregationQuery(periodUnit: PeriodUnits) {
// Get the start period expression depending on the period unit and database type
const periodStartExpr = this.getPeriodStartExpr(periodUnit);
// Function to get the aggregation query
const aggregationQuery = this.manager
.createQueryBuilder()
.select(this.escapeField('metaId'))
.addSelect(this.escapeField('type'))
.addSelect(PeriodUnitToNumber[periodUnit].toString(), 'periodUnit')
.addSelect(periodStartExpr, 'periodStart')
.addSelect(`SUM(${this.escapeField('value')})`, 'value')
.from('rows_to_compact', 'rtc')
.groupBy(this.escapeField('metaId'))
.addGroupBy(this.escapeField('type'))
.addGroupBy(periodStartExpr);
return aggregationQuery;
}
async compactSourceDataIntoInsightPeriod({
sourceBatchQuery, // Query to get batch source data. Must return those fields: 'id', 'metaId', 'type', 'periodStart', 'value'
sourceTableName = this.metadata.tableName, // Repository references for table operations
periodUnit,
}: {
sourceBatchQuery: string;
sourceTableName?: string;
periodUnit: PeriodUnits;
}): Promise<number> {
// Create temp table that only exists in this transaction for rows to compact
const getBatchAndStoreInTemporaryTable = sql`
CREATE TEMPORARY TABLE rows_to_compact AS
${sourceBatchQuery};
`;
const countBatch = sql`
SELECT COUNT(*) ${this.escapeField('rowsInBatch')} FROM rows_to_compact;
`;
const targetColumnNamesStr = ['metaId', 'type', 'periodUnit', 'periodStart']
.map((param) => this.escapeField(param))
.join(', ');
const targetColumnNamesWithValue = `${targetColumnNamesStr}, value`;
// Function to get the aggregation query
const aggregationQuery = this.getAggregationQuery(periodUnit);
// Insert or update aggregated data
const insertQueryBase = sql`
INSERT INTO ${this.metadata.tableName}
(${targetColumnNamesWithValue})
${aggregationQuery.getSql()}
`;
// Database-specific duplicate key logic
let deduplicateQuery: string;
if (dbType === 'mysqldb' || dbType === 'mariadb') {
deduplicateQuery = sql`
ON DUPLICATE KEY UPDATE value = value + VALUES(value)`;
} else {
deduplicateQuery = sql`
ON CONFLICT(${targetColumnNamesStr})
DO UPDATE SET value = ${this.metadata.tableName}.value + excluded.value
RETURNING *`;
}
const upsertEvents = sql`
${insertQueryBase}
${deduplicateQuery}
`;
// Delete the processed rows
const deleteBatch = sql`
DELETE FROM ${sourceTableName}
WHERE id IN (SELECT id FROM rows_to_compact);
`;
// Clean up
const dropTemporaryTable = sql`
DROP TABLE rows_to_compact;
`;
const result = await this.manager.transaction(async (trx) => {
await trx.query(getBatchAndStoreInTemporaryTable);
await trx.query<Array<{ type: any; value: number }>>(upsertEvents);
const rowsInBatch = await trx.query<[{ rowsInBatch: number | string }]>(countBatch);
await trx.query(deleteBatch);
await trx.query(dropTemporaryTable);
return Number(rowsInBatch[0].rowsInBatch);
});
return result;
}
}

View File

@@ -8,4 +8,18 @@ export class InsightsRawRepository extends Repository<InsightsRaw> {
constructor(dataSource: DataSource) {
super(InsightsRaw, dataSource.manager);
}
getRawInsightsBatchQuery(compactionBatchSize: number) {
// Build the query to gather raw insights data for the batch
const batchQuery = this.createQueryBuilder()
.select(
['id', 'metaId', 'type', 'value'].map((fieldName) =>
this.manager.connection.driver.escape(fieldName),
),
)
.addSelect('timestamp', 'periodStart')
.orderBy('timestamp', 'ASC')
.limit(compactionBatchSize);
return batchQuery;
}
}