diff --git a/packages/cli/src/modules/insights/__tests__/insights.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights.service.test.ts index 0805d190c2..f529be4a3a 100644 --- a/packages/cli/src/modules/insights/__tests__/insights.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights.service.test.ts @@ -579,7 +579,6 @@ describe('compaction', () => { ])('$name', async ({ periodStarts, batches }) => { // ARRANGE const insightsService = Container.get(InsightsService); - const insightsRawRepository = Container.get(InsightsRawRepository); const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); const project = await createTeamProject(); @@ -600,13 +599,125 @@ describe('compaction', () => { // ASSERT expect(compactedRows).toBe(periodStarts.length); - await expect(insightsRawRepository.count()).resolves.toBe(0); + const hourInsights = (await insightsByPeriodRepository.find()).filter( + (insight) => insight.periodUnit !== 'day', + ); + expect(hourInsights).toBeEmptyArray(); const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); expect(allCompacted).toHaveLength(batches.length); for (const [index, compacted] of allCompacted.entries()) { expect(compacted.value).toBe(batches[index]); } }); + + test('recent insight periods should not be compacted', async () => { + // ARRANGE + const insightsService = Container.get(InsightsService); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + // create before so we can create the raw events in parallel + await createMetadata(workflow); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'hour', + periodStart: DateTime.utc().minus({ day: 79 }).startOf('hour'), + }); + + // ACT + const compactedRows = await insightsService.compactHourToDay(); + + // ASSERT + expect(compactedRows).toBe(0); + }); + }); + + describe('compactDayToWeek', () => { + type TestData = { + name: string; + periodStarts: DateTime[]; + batches: number[]; + }; + + test.each([ + { + name: 'compact into 2 rows', + periodStarts: [ + // 2000-01-03 is a Monday + DateTime.utc(2000, 1, 3, 0, 0), + DateTime.utc(2000, 1, 5, 23, 59), + DateTime.utc(2000, 1, 11, 1, 0), + ], + batches: [2, 1], + }, + { + name: 'compact into 3 rows', + periodStarts: [ + // 2000-01-03 is a Monday + DateTime.utc(2000, 1, 3, 0, 0), + DateTime.utc(2000, 1, 4, 23, 59), + DateTime.utc(2000, 1, 11, 0, 0), + DateTime.utc(2000, 1, 12, 23, 59), + DateTime.utc(2000, 1, 18, 23, 59), + ], + batches: [2, 2, 1], + }, + ])('$name', async ({ periodStarts, batches }) => { + // ARRANGE + const insightsService = Container.get(InsightsService); + const insightsByPeriodRepository = Container.get(InsightsByPeriodRepository); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + + await createMetadata(workflow); + for (const periodStart of periodStarts) { + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'day', + periodStart, + }); + } + + // ACT + const compactedRows = await insightsService.compactDayToWeek(); + + // ASSERT + expect(compactedRows).toBe(periodStarts.length); + const hourAndDayInsights = (await insightsByPeriodRepository.find()).filter( + (insight) => insight.periodUnit !== 'week', + ); + expect(hourAndDayInsights).toBeEmptyArray(); + const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); + expect(allCompacted).toHaveLength(batches.length); + for (const [index, compacted] of allCompacted.entries()) { + expect(compacted.periodStart.getDay()).toBe(1); + expect(compacted.value).toBe(batches[index]); + } + }); + + test('recent insight periods should not be compacted', async () => { + // ARRANGE + const insightsService = Container.get(InsightsService); + + const project = await createTeamProject(); + const workflow = await createWorkflow({}, project); + await createMetadata(workflow); + await createCompactedInsightsEvent(workflow, { + type: 'success', + value: 1, + periodUnit: 'day', + periodStart: DateTime.utc().minus({ day: 179 }).startOf('day'), + }); + + // ACT + const compactedRows = await insightsService.compactDayToWeek(); + + // ASSERT + expect(compactedRows).toBe(0); + }); }); }); diff --git a/packages/cli/src/modules/insights/database/repositories/insights-by-period.repository.ts b/packages/cli/src/modules/insights/database/repositories/insights-by-period.repository.ts index c4caa960f4..2fe18d1366 100644 --- a/packages/cli/src/modules/insights/database/repositories/insights-by-period.repository.ts +++ b/packages/cli/src/modules/insights/database/repositories/insights-by-period.repository.ts @@ -1,5 +1,6 @@ import { GlobalConfig } from '@n8n/config'; import { Container, Service } from '@n8n/di'; +import type { SelectQueryBuilder } from '@n8n/typeorm'; import { DataSource, Repository } from '@n8n/typeorm'; import { z } from 'zod'; @@ -31,36 +32,42 @@ export class InsightsByPeriodRepository extends Repository { return this.manager.connection.driver.escape(fieldName); } - private getPeriodFilterExpr(periodUnit: PeriodUnit) { - const daysAgo = periodUnit === 'day' ? 90 : 180; + private getPeriodFilterExpr(maxAgeInDays = 0) { // Database-specific period start expression to filter out data to compact by days matching the periodUnit - let periodStartExpr = `date('now', '-${daysAgo} days')`; + let periodStartExpr = `date('now', '-${maxAgeInDays} days')`; if (dbType === 'postgresdb') { - periodStartExpr = `CURRENT_DATE - INTERVAL '${daysAgo} day'`; + periodStartExpr = `CURRENT_DATE - INTERVAL '${maxAgeInDays} day'`; } else if (dbType === 'mysqldb' || dbType === 'mariadb') { - periodStartExpr = `DATE_SUB(CURRENT_DATE, INTERVAL ${daysAgo} DAY)`; + periodStartExpr = `DATE_SUB(CURRENT_DATE, INTERVAL ${maxAgeInDays} DAY)`; } return periodStartExpr; } - private getPeriodStartExpr(periodUnit: PeriodUnit) { + private getPeriodStartExpr(periodUnitToCompactInto: PeriodUnit) { // Database-specific period start expression to truncate timestamp to the periodUnit // SQLite by default - let periodStartExpr = `strftime('%Y-%m-%d ${periodUnit === 'hour' ? '%H' : '00'}:00:00.000', periodStart)`; + let periodStartExpr = + periodUnitToCompactInto === 'week' + ? "strftime('%Y-%m-%d 00:00:00.000', date(periodStart, 'weekday 0', '-6 days'))" + : `strftime('%Y-%m-%d ${periodUnitToCompactInto === 'hour' ? '%H' : '00'}:00:00.000', periodStart)`; if (dbType === 'mysqldb' || dbType === 'mariadb') { periodStartExpr = - periodUnit === 'hour' - ? "DATE_FORMAT(periodStart, '%Y-%m-%d %H:00:00')" - : "DATE_FORMAT(periodStart, '%Y-%m-%d 00:00:00')"; + periodUnitToCompactInto === 'week' + ? "DATE_FORMAT(DATE_SUB(periodStart, INTERVAL WEEKDAY(periodStart) DAY), '%Y-%m-%d 00:00:00')" + : `DATE_FORMAT(periodStart, '%Y-%m-%d ${periodUnitToCompactInto === 'hour' ? '%H' : '00'}:00:00')`; } else if (dbType === 'postgresdb') { - periodStartExpr = `DATE_TRUNC('${periodUnit}', ${this.escapeField('periodStart')})`; + periodStartExpr = `DATE_TRUNC('${periodUnitToCompactInto}', ${this.escapeField('periodStart')})`; } return periodStartExpr; } - getPeriodInsightsBatchQuery(periodUnit: PeriodUnit, compactionBatchSize: number) { + getPeriodInsightsBatchQuery({ + periodUnitToCompactFrom, + compactionBatchSize, + maxAgeInDays, + }: { periodUnitToCompactFrom: PeriodUnit; compactionBatchSize: number; maxAgeInDays: number }) { // Build the query to gather period insights data for the batch const batchQuery = this.createQueryBuilder() .select( @@ -68,11 +75,18 @@ export class InsightsByPeriodRepository extends Repository { this.escapeField(fieldName), ), ) - .where(`${this.escapeField('periodUnit')} = ${PeriodUnitToNumber[periodUnit]}`) - .andWhere(`${this.escapeField('periodStart')} < ${this.getPeriodFilterExpr('day')}`) + .where(`${this.escapeField('periodUnit')} = ${PeriodUnitToNumber[periodUnitToCompactFrom]}`) + .andWhere(`${this.escapeField('periodStart')} < ${this.getPeriodFilterExpr(maxAgeInDays)}`) .orderBy(this.escapeField('periodStart'), 'ASC') .limit(compactionBatchSize); - return batchQuery; + + return batchQuery as SelectQueryBuilder<{ + id: number; + metaId: number; + type: string; + value: number; + periodStart: Date; + }>; } getAggregationQuery(periodUnit: PeriodUnit) { @@ -95,19 +109,39 @@ export class InsightsByPeriodRepository extends Repository { return aggregationQuery; } + /** + * Compacts source data into the target period unit + */ async compactSourceDataIntoInsightPeriod({ - sourceBatchQuery, // Query to get batch source data. Must return those fields: 'id', 'metaId', 'type', 'periodStart', 'value' - sourceTableName = this.metadata.tableName, // Repository references for table operations - periodUnit, + sourceBatchQuery, + sourceTableName = this.metadata.tableName, + periodUnitToCompactInto, }: { - sourceBatchQuery: string; + /** + * Query builder to get batch source data. Must return these fields: 'id', 'metaId', 'type', 'periodStart', 'value'. + */ + sourceBatchQuery: SelectQueryBuilder<{ + id: number; + metaId: number; + type: string; + value: number; + periodStart: Date; + }>; + + /** + * The source table name to get source data from. + */ sourceTableName?: string; - periodUnit: PeriodUnit; + + /** + * The new period unit to compact the data into. + */ + periodUnitToCompactInto: PeriodUnit; }): Promise { // Create temp table that only exists in this transaction for rows to compact const getBatchAndStoreInTemporaryTable = sql` CREATE TEMPORARY TABLE rows_to_compact AS - ${sourceBatchQuery}; + ${sourceBatchQuery.getSql()}; `; const countBatch = sql` @@ -120,7 +154,7 @@ export class InsightsByPeriodRepository extends Repository { const targetColumnNamesWithValue = `${targetColumnNamesStr}, value`; // Function to get the aggregation query - const aggregationQuery = this.getAggregationQuery(periodUnit); + const aggregationQuery = this.getAggregationQuery(periodUnitToCompactInto); // Insert or update aggregated data const insertQueryBase = sql` diff --git a/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts b/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts index 05eba9ff16..22e114cb8d 100644 --- a/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts +++ b/packages/cli/src/modules/insights/database/repositories/insights-raw.repository.ts @@ -11,7 +11,14 @@ export class InsightsRawRepository extends Repository { getRawInsightsBatchQuery(compactionBatchSize: number) { // Build the query to gather raw insights data for the batch - const batchQuery = this.createQueryBuilder() + const batchQuery = this.manager + .createQueryBuilder<{ + id: number; + metaId: number; + type: string; + value: number; + periodStart: Date; + }>(InsightsRaw, 'insightsRaw') .select( ['id', 'metaId', 'type', 'value'].map((fieldName) => this.manager.connection.driver.escape(fieldName), @@ -20,6 +27,7 @@ export class InsightsRawRepository extends Repository { .addSelect('timestamp', 'periodStart') .orderBy('timestamp', 'ASC') .limit(compactionBatchSize); + return batchQuery; } } diff --git a/packages/cli/src/modules/insights/insights.service.ts b/packages/cli/src/modules/insights/insights.service.ts index 1bcd06e916..54ef08b533 100644 --- a/packages/cli/src/modules/insights/insights.service.ts +++ b/packages/cli/src/modules/insights/insights.service.ts @@ -49,6 +49,10 @@ const shouldSkipMode: Record = { @Service() export class InsightsService { + private readonly maxAgeInDaysForHourlyData = 90; + + private readonly maxAgeInDaysForDailyData = 180; + private compactInsightsTimer: NodeJS.Timer | undefined; constructor( @@ -163,6 +167,12 @@ export class InsightsService { do { numberOfCompactedHourData = await this.compactHourToDay(); } while (numberOfCompactedHourData > 0); + + let numberOfCompactedDayData: number; + // Compact daily data to weekly aggregates + do { + numberOfCompactedDayData = await this.compactDayToWeek(); + } while (numberOfCompactedDayData > 0); } // Compacts raw data to hourly aggregates @@ -173,28 +183,42 @@ export class InsightsService { ); return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({ - sourceBatchQuery: batchQuery.getSql(), + sourceBatchQuery: batchQuery, sourceTableName: this.insightsRawRepository.metadata.tableName, - periodUnit: 'hour', + periodUnitToCompactInto: 'hour', }); } // Compacts hourly data to daily aggregates async compactHourToDay() { // get hour data query for batching - const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery( - 'hour', - config.compactionBatchSize, - ); + const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery({ + periodUnitToCompactFrom: 'hour', + compactionBatchSize: config.compactionBatchSize, + maxAgeInDays: this.maxAgeInDaysForHourlyData, + }); return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({ - sourceBatchQuery: batchQuery.getSql(), - periodUnit: 'day', + sourceBatchQuery: batchQuery, + periodUnitToCompactInto: 'day', + }); + } + + // Compacts daily data to weekly aggregates + async compactDayToWeek() { + // get daily data query for batching + const batchQuery = this.insightsByPeriodRepository.getPeriodInsightsBatchQuery({ + periodUnitToCompactFrom: 'day', + compactionBatchSize: config.compactionBatchSize, + maxAgeInDays: this.maxAgeInDaysForDailyData, + }); + + return await this.insightsByPeriodRepository.compactSourceDataIntoInsightPeriod({ + sourceBatchQuery: batchQuery, + periodUnitToCompactInto: 'week', }); } - // TODO: add return type once rebased on master and InsightsSummary is - // available async getInsightsSummary(): Promise { const rows = await this.insightsByPeriodRepository.getPreviousAndCurrentPeriodTypeAggregates();