feat(API): Add day to week compaction for insights dashboard (#14165)

This commit is contained in:
Guillaume Jacquart
2025-03-26 15:38:47 +01:00
committed by GitHub
parent f6517664dd
commit db99974cca
4 changed files with 212 additions and 35 deletions

View File

@@ -579,7 +579,6 @@ describe('compaction', () => {
])('$name', async ({ periodStarts, batches }) => { ])('$name', async ({ periodStarts, batches }) => {
// ARRANGE // ARRANGE
const insightsService = Container.get(InsightsService); const insightsService = Container.get(InsightsService);
const insightsRawRepository = Container.get(InsightsRawRepository);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject(); const project = await createTeamProject();
@@ -600,13 +599,125 @@ describe('compaction', () => {
// ASSERT // ASSERT
expect(compactedRows).toBe(periodStarts.length); expect(compactedRows).toBe(periodStarts.length);
await expect(insightsRawRepository.count()).resolves.toBe(0); const hourInsights = (await insightsByPeriodRepository.find()).filter(
(insight) => insight.periodUnit !== 'day',
);
expect(hourInsights).toBeEmptyArray();
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
expect(allCompacted).toHaveLength(batches.length); expect(allCompacted).toHaveLength(batches.length);
for (const [index, compacted] of allCompacted.entries()) { for (const [index, compacted] of allCompacted.entries()) {
expect(compacted.value).toBe(batches[index]); expect(compacted.value).toBe(batches[index]);
} }
}); });
test('recent insight periods should not be compacted', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
// create before so we can create the raw events in parallel
await createMetadata(workflow);
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'hour',
periodStart: DateTime.utc().minus({ day: 79 }).startOf('hour'),
});
// ACT
const compactedRows = await insightsService.compactHourToDay();
// ASSERT
expect(compactedRows).toBe(0);
});
});
describe('compactDayToWeek', () => {
type TestData = {
name: string;
periodStarts: DateTime[];
batches: number[];
};
test.each<TestData>([
{
name: 'compact into 2 rows',
periodStarts: [
// 2000-01-03 is a Monday
DateTime.utc(2000, 1, 3, 0, 0),
DateTime.utc(2000, 1, 5, 23, 59),
DateTime.utc(2000, 1, 11, 1, 0),
],
batches: [2, 1],
},
{
name: 'compact into 3 rows',
periodStarts: [
// 2000-01-03 is a Monday
DateTime.utc(2000, 1, 3, 0, 0),
DateTime.utc(2000, 1, 4, 23, 59),
DateTime.utc(2000, 1, 11, 0, 0),
DateTime.utc(2000, 1, 12, 23, 59),
DateTime.utc(2000, 1, 18, 23, 59),
],
batches: [2, 2, 1],
},
])('$name', async ({ periodStarts, batches }) => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createMetadata(workflow);
for (const periodStart of periodStarts) {
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'day',
periodStart,
});
}
// ACT
const compactedRows = await insightsService.compactDayToWeek();
// ASSERT
expect(compactedRows).toBe(periodStarts.length);
const hourAndDayInsights = (await insightsByPeriodRepository.find()).filter(
(insight) => insight.periodUnit !== 'week',
);
expect(hourAndDayInsights).toBeEmptyArray();
const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } });
expect(allCompacted).toHaveLength(batches.length);
for (const [index, compacted] of allCompacted.entries()) {
expect(compacted.periodStart.getDay()).toBe(1);
expect(compacted.value).toBe(batches[index]);
}
});
test('recent insight periods should not be compacted', async () => {
// ARRANGE
const insightsService = Container.get(InsightsService);
const project = await createTeamProject();
const workflow = await createWorkflow({}, project);
await createMetadata(workflow);
await createCompactedInsightsEvent(workflow, {
type: 'success',
value: 1,
periodUnit: 'day',
periodStart: DateTime.utc().minus({ day: 179 }).startOf('day'),
});
// ACT
const compactedRows = await insightsService.compactDayToWeek();
// ASSERT
expect(compactedRows).toBe(0);
});
}); });
}); });

View File

@@ -1,5 +1,6 @@
import { GlobalConfig } from '@n8n/config'; import { GlobalConfig } from '@n8n/config';
import { Container, Service } from '@n8n/di'; import { Container, Service } from '@n8n/di';
import type { SelectQueryBuilder } from '@n8n/typeorm';
import { DataSource, Repository } from '@n8n/typeorm'; import { DataSource, Repository } from '@n8n/typeorm';
import { z } from 'zod'; import { z } from 'zod';
@@ -31,36 +32,42 @@ export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
return this.manager.connection.driver.escape(fieldName); return this.manager.connection.driver.escape(fieldName);
} }
private getPeriodFilterExpr(periodUnit: PeriodUnit) { private getPeriodFilterExpr(maxAgeInDays = 0) {
const daysAgo = periodUnit === 'day' ? 90 : 180;
// Database-specific period start expression to filter out data to compact by days matching the periodUnit // Database-specific period start expression to filter out data to compact by days matching the periodUnit
let periodStartExpr = `date('now', '-${daysAgo} days')`; let periodStartExpr = `date('now', '-${maxAgeInDays} days')`;
if (dbType === 'postgresdb') { if (dbType === 'postgresdb') {
periodStartExpr = `CURRENT_DATE - INTERVAL '${daysAgo} day'`; periodStartExpr = `CURRENT_DATE - INTERVAL '${maxAgeInDays} day'`;
} else if (dbType === 'mysqldb' || dbType === 'mariadb') { } else if (dbType === 'mysqldb' || dbType === 'mariadb') {
periodStartExpr = `DATE_SUB(CURRENT_DATE, INTERVAL ${daysAgo} DAY)`; periodStartExpr = `DATE_SUB(CURRENT_DATE, INTERVAL ${maxAgeInDays} DAY)`;
} }
return periodStartExpr; return periodStartExpr;
} }
private getPeriodStartExpr(periodUnit: PeriodUnit) { private getPeriodStartExpr(periodUnitToCompactInto: PeriodUnit) {
// Database-specific period start expression to truncate timestamp to the periodUnit // Database-specific period start expression to truncate timestamp to the periodUnit
// SQLite by default // SQLite by default
let periodStartExpr = `strftime('%Y-%m-%d ${periodUnit === 'hour' ? '%H' : '00'}:00:00.000', periodStart)`; let periodStartExpr =
periodUnitToCompactInto === 'week'
? "strftime('%Y-%m-%d 00:00:00.000', date(periodStart, 'weekday 0', '-6 days'))"
: `strftime('%Y-%m-%d ${periodUnitToCompactInto === 'hour' ? '%H' : '00'}:00:00.000', periodStart)`;
if (dbType === 'mysqldb' || dbType === 'mariadb') { if (dbType === 'mysqldb' || dbType === 'mariadb') {
periodStartExpr = periodStartExpr =
periodUnit === 'hour' periodUnitToCompactInto === 'week'
? "DATE_FORMAT(periodStart, '%Y-%m-%d %H:00:00')" ? "DATE_FORMAT(DATE_SUB(periodStart, INTERVAL WEEKDAY(periodStart) DAY), '%Y-%m-%d 00:00:00')"
: "DATE_FORMAT(periodStart, '%Y-%m-%d 00:00:00')"; : `DATE_FORMAT(periodStart, '%Y-%m-%d ${periodUnitToCompactInto === 'hour' ? '%H' : '00'}:00:00')`;
} else if (dbType === 'postgresdb') { } else if (dbType === 'postgresdb') {
periodStartExpr = `DATE_TRUNC('${periodUnit}', ${this.escapeField('periodStart')})`; periodStartExpr = `DATE_TRUNC('${periodUnitToCompactInto}', ${this.escapeField('periodStart')})`;
} }
return periodStartExpr; return periodStartExpr;
} }
getPeriodInsightsBatchQuery(periodUnit: PeriodUnit, compactionBatchSize: number) { getPeriodInsightsBatchQuery({
periodUnitToCompactFrom,
compactionBatchSize,
maxAgeInDays,
}: { periodUnitToCompactFrom: PeriodUnit; compactionBatchSize: number; maxAgeInDays: number }) {
// Build the query to gather period insights data for the batch // Build the query to gather period insights data for the batch
const batchQuery = this.createQueryBuilder() const batchQuery = this.createQueryBuilder()
.select( .select(
@@ -68,11 +75,18 @@ export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
this.escapeField(fieldName), this.escapeField(fieldName),
), ),
) )
.where(`${this.escapeField('periodUnit')} = ${PeriodUnitToNumber[periodUnit]}`) .where(`${this.escapeField('periodUnit')} = ${PeriodUnitToNumber[periodUnitToCompactFrom]}`)
.andWhere(`${this.escapeField('periodStart')} < ${this.getPeriodFilterExpr('day')}`) .andWhere(`${this.escapeField('periodStart')} < ${this.getPeriodFilterExpr(maxAgeInDays)}`)
.orderBy(this.escapeField('periodStart'), 'ASC') .orderBy(this.escapeField('periodStart'), 'ASC')
.limit(compactionBatchSize); .limit(compactionBatchSize);
return batchQuery;
return batchQuery as SelectQueryBuilder<{
id: number;
metaId: number;
type: string;
value: number;
periodStart: Date;
}>;
} }
getAggregationQuery(periodUnit: PeriodUnit) { getAggregationQuery(periodUnit: PeriodUnit) {
@@ -95,19 +109,39 @@ export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
return aggregationQuery; return aggregationQuery;
} }
/**
* Compacts source data into the target period unit
*/
async compactSourceDataIntoInsightPeriod({ async compactSourceDataIntoInsightPeriod({
sourceBatchQuery, // Query to get batch source data. Must return those fields: 'id', 'metaId', 'type', 'periodStart', 'value' sourceBatchQuery,
sourceTableName = this.metadata.tableName, // Repository references for table operations sourceTableName = this.metadata.tableName,
periodUnit, periodUnitToCompactInto,
}: { }: {
sourceBatchQuery: string; /**
* Query builder to get batch source data. Must return these fields: 'id', 'metaId', 'type', 'periodStart', 'value'.
*/
sourceBatchQuery: SelectQueryBuilder<{
id: number;
metaId: number;
type: string;
value: number;
periodStart: Date;
}>;
/**
* The source table name to get source data from.
*/
sourceTableName?: string; sourceTableName?: string;
periodUnit: PeriodUnit;
/**
* The new period unit to compact the data into.
*/
periodUnitToCompactInto: PeriodUnit;
}): Promise<number> { }): Promise<number> {
// Create temp table that only exists in this transaction for rows to compact // Create temp table that only exists in this transaction for rows to compact
const getBatchAndStoreInTemporaryTable = sql` const getBatchAndStoreInTemporaryTable = sql`
CREATE TEMPORARY TABLE rows_to_compact AS CREATE TEMPORARY TABLE rows_to_compact AS
${sourceBatchQuery}; ${sourceBatchQuery.getSql()};
`; `;
const countBatch = sql` const countBatch = sql`
@@ -120,7 +154,7 @@ export class InsightsByPeriodRepository extends Repository<InsightsByPeriod> {
const targetColumnNamesWithValue = `${targetColumnNamesStr}, value`; const targetColumnNamesWithValue = `${targetColumnNamesStr}, value`;
// Function to get the aggregation query // Function to get the aggregation query
const aggregationQuery = this.getAggregationQuery(periodUnit); const aggregationQuery = this.getAggregationQuery(periodUnitToCompactInto);
// Insert or update aggregated data // Insert or update aggregated data
const insertQueryBase = sql` const insertQueryBase = sql`

View File

@@ -11,7 +11,14 @@ export class InsightsRawRepository extends Repository<InsightsRaw> {
getRawInsightsBatchQuery(compactionBatchSize: number) { getRawInsightsBatchQuery(compactionBatchSize: number) {
// Build the query to gather raw insights data for the batch // Build the query to gather raw insights data for the batch
const batchQuery = this.createQueryBuilder() const batchQuery = this.manager
.createQueryBuilder<{
id: number;
metaId: number;
type: string;
value: number;
periodStart: Date;
}>(InsightsRaw, 'insightsRaw')
.select( .select(
['id', 'metaId', 'type', 'value'].map((fieldName) => ['id', 'metaId', 'type', 'value'].map((fieldName) =>
this.manager.connection.driver.escape(fieldName), this.manager.connection.driver.escape(fieldName),
@@ -20,6 +27,7 @@ export class InsightsRawRepository extends Repository<InsightsRaw> {
.addSelect('timestamp', 'periodStart') .addSelect('timestamp', 'periodStart')
.orderBy('timestamp', 'ASC') .orderBy('timestamp', 'ASC')
.limit(compactionBatchSize); .limit(compactionBatchSize);
return batchQuery; return batchQuery;
} }
} }

View File

@@ -49,6 +49,10 @@ const shouldSkipMode: Record<WorkflowExecuteMode, boolean> = {
@Service() @Service()
export class InsightsService { export class InsightsService {
private readonly maxAgeInDaysForHourlyData = 90;
private readonly maxAgeInDaysForDailyData = 180;
private compactInsightsTimer: NodeJS.Timer | undefined; private compactInsightsTimer: NodeJS.Timer | undefined;
constructor( constructor(
@@ -163,6 +167,12 @@ export class InsightsService {
do { do {
numberOfCompactedHourData = await this.compactHourToDay(); numberOfCompactedHourData = await this.compactHourToDay();
} while (numberOfCompactedHourData > 0); } while (numberOfCompactedHourData > 0);
let numberOfCompactedDayData: number;
// Compact daily data to weekly aggregates
do {
numberOfCompactedDayData = await this.compactDayToWeek();
} while (numberOfCompactedDayData > 0);
} }
// Compacts raw data to hourly aggregates // Compacts raw data to hourly aggregates
@@ -173,28 +183,42 @@ export class InsightsService {
); );
return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({ return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({
sourceBatchQuery: batchQuery.getSql(), sourceBatchQuery: batchQuery,
sourceTableName: this.insightsRawRepository.metadata.tableName, sourceTableName: this.insightsRawRepository.metadata.tableName,
periodUnit: 'hour', periodUnitToCompactInto: 'hour',
}); });
} }
// Compacts hourly data to daily aggregates // Compacts hourly data to daily aggregates
async compactHourToDay() { async compactHourToDay() {
// get hour data query for batching // get hour data query for batching
const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery( const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery({
'hour', periodUnitToCompactFrom: 'hour',
config.compactionBatchSize, compactionBatchSize: config.compactionBatchSize,
); maxAgeInDays: this.maxAgeInDaysForHourlyData,
});
return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({ return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({
sourceBatchQuery: batchQuery.getSql(), sourceBatchQuery: batchQuery,
periodUnit: 'day', periodUnitToCompactInto: 'day',
});
}
// Compacts daily data to weekly aggregates
async compactDayToWeek() {
// get daily data query for batching
const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery({
periodUnitToCompactFrom: 'day',
compactionBatchSize: config.compactionBatchSize,
maxAgeInDays: this.maxAgeInDaysForDailyData,
});
return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({
sourceBatchQuery: batchQuery,
periodUnitToCompactInto: 'week',
}); });
} }
// TODO: add return type once rebased on master and InsightsSummary is
// available
async getInsightsSummary(): Promise<InsightsSummary> { async getInsightsSummary(): Promise<InsightsSummary> {
const rows = await this.insightsByPeriodRepository.getPreviousAndCurrentPeriodTypeAggregates(); const rows = await this.insightsByPeriodRepository.getPreviousAndCurrentPeriodTypeAggregates();