diff --git a/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts b/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts index 842f1e2515..2d7c73338e 100644 --- a/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts +++ b/packages/cli/src/modules/insights/__tests__/insights-compaction.service.test.ts @@ -282,11 +282,11 @@ describe('compaction', () => { const workflow = await createWorkflow({}, project); // create 100 more events than the batch size (500) - const batchSize = 600; + const numberOfEvents = 600; let timestamp = DateTime.utc().startOf('hour'); const events = Array<{ type: 'success'; value: number; timestamp: DateTime }>(); - for (let i = 0; i < batchSize; i++) { + for (let i = 0; i < numberOfEvents; i++) { events.push({ type: 'success', value: 1, timestamp }); timestamp = timestamp.plus({ minute: 1 }); } @@ -296,13 +296,13 @@ describe('compaction', () => { await insightsCompactionService.compactInsights(); // ASSERT - // compaction batch size is 500, so rawToHour should be called 3 times: - // 1st call: 500 events, 2nd call: 100 events, and third call that returns nothing - expect(rawToHourSpy).toHaveBeenCalledTimes(3); + // compaction batch size is 500, so rawToHour should be called 2 times: + // 1st call: 500 events, 2nd call: 100 events + expect(rawToHourSpy).toHaveBeenCalledTimes(2); await expect(insightsRawRepository.count()).resolves.toBe(0); const allCompacted = await insightsByPeriodRepository.find({ order: { periodStart: 1 } }); const accumulatedValues = allCompacted.reduce((acc, event) => acc + event.value, 0); - expect(accumulatedValues).toBe(batchSize); + expect(accumulatedValues).toBe(numberOfEvents); }); }); diff --git a/packages/cli/src/modules/insights/insights-collection.service.ts b/packages/cli/src/modules/insights/insights-collection.service.ts index eb65cdfe01..409b575fcd 100644 --- a/packages/cli/src/modules/insights/insights-collection.service.ts +++ b/packages/cli/src/modules/insights/insights-collection.service.ts @@ -100,6 +100,7 @@ export class InsightsCollectionService { // Wait for all in-progress asynchronous flushes // Flush any remaining events + this.logger.debug('Flushing remaining insights before shutdown'); await Promise.all([...this.flushesInProgress, this.flushEvents()]); } @@ -144,18 +145,21 @@ export class InsightsCollectionService { } if (!this.isAsynchronouslySavingInsights) { + this.logger.debug('Flushing insights synchronously (shutdown in progress)'); // If we are not asynchronously saving insights, we need to flush the events await this.flushEvents(); } // If the buffer is full, flush the events asynchronously if (this.bufferedInsights.size >= this.insightsConfig.flushBatchSize) { + this.logger.debug(`Buffer is full (${this.bufferedInsights.size} insights), flushing events`); // Fire and forget flush to avoid blocking the workflow execute after handler void this.flushEvents(); } } private async saveInsightsMetadataAndRaw(insightsRawToInsertBuffer: Set) { + this.logger.debug(`Flushing ${insightsRawToInsertBuffer.size} insights`); const workflowIdNames: Map = new Map(); for (const event of insightsRawToInsertBuffer) { @@ -188,6 +192,7 @@ export class InsightsCollectionService { return acc; }, [] as InsightsMetadata[]); + this.logger.debug(`Saving ${metadataToUpsert.length} insights metadata for workflows`); await this.insightsMetadataRepository.upsert(metadataToUpsert, ['workflowId']); const upsertMetadata = await this.insightsMetadataRepository.findBy({ @@ -215,6 +220,7 @@ export class InsightsCollectionService { events.push(insight); } + this.logger.debug(`Inserting ${events.length} insights raw`); await this.insightsRawRepository.insert(events); } diff --git a/packages/cli/src/modules/insights/insights-compaction.service.ts b/packages/cli/src/modules/insights/insights-compaction.service.ts index 47c852d864..4061849deb 100644 --- a/packages/cli/src/modules/insights/insights-compaction.service.ts +++ b/packages/cli/src/modules/insights/insights-compaction.service.ts @@ -44,21 +44,27 @@ export class InsightsCompactionService { // Compact raw data to hourly aggregates do { + this.logger.debug('Compacting raw data to hourly aggregates'); numberOfCompactedRawData = await this.compactRawToHour(); - } while (numberOfCompactedRawData > 0); + this.logger.debug(`Compacted ${numberOfCompactedRawData} raw data to hourly aggregates`); + } while (numberOfCompactedRawData === this.insightsConfig.compactionBatchSize); let numberOfCompactedHourData: number; // Compact hourly data to daily aggregates do { + this.logger.debug('Compacting hourly data to daily aggregates'); numberOfCompactedHourData = await this.compactHourToDay(); - } while (numberOfCompactedHourData > 0); + this.logger.debug(`Compacted ${numberOfCompactedHourData} hourly data to daily aggregates`); + } while (numberOfCompactedHourData === this.insightsConfig.compactionBatchSize); let numberOfCompactedDayData: number; // Compact daily data to weekly aggregates do { + this.logger.debug('Compacting daily data to weekly aggregates'); numberOfCompactedDayData = await this.compactDayToWeek(); - } while (numberOfCompactedDayData > 0); + this.logger.debug(`Compacted ${numberOfCompactedDayData} daily data to weekly aggregates`); + } while (numberOfCompactedDayData === this.insightsConfig.compactionBatchSize); } /**