perf(core): Batch raw insights save and add metadata cache (#14261)

Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
Guillaume Jacquart
2025-04-07 18:18:11 +02:00
committed by GitHub
parent 1c2d48f7ee
commit 60afb46094
15 changed files with 918 additions and 68 deletions

View File

@@ -0,0 +1,20 @@
{
"$schema": "../scenario.schema.json",
"name": "MultipleWebhooks",
"description": "10 simple webhooks that respond with a 200 status code",
"scenarioData": {
"workflowFiles": [
"multiple-webhooks1.json",
"multiple-webhooks2.json",
"multiple-webhooks3.json",
"multiple-webhooks4.json",
"multiple-webhooks5.json",
"multiple-webhooks6.json",
"multiple-webhooks7.json",
"multiple-webhooks8.json",
"multiple-webhooks9.json",
"multiple-webhooks10.json"
]
},
"scriptPath": "multiple-webhooks.script.js"
}

View File

@@ -0,0 +1,19 @@
import http from 'k6/http';
import { check } from 'k6';
const apiBaseUrl = __ENV.API_BASE_URL;
export default function () {
const urls = Array(10)
.fill()
.map((_, i) => `${apiBaseUrl}/webhook/multiple-webhook${i + 1}`);
const res = http.batch(urls);
for (let i = 0; i < res.length; i++) {
// Check if the response status is 200
check(res[i], {
'is status 200': (r) => r.status === 200,
});
}
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 1",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook1", "options": {} },
"id": "b239bc6c-ea40-438d-bb14-5bce03599685",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "14226baf-ea56-43d7-bc0d-eea841746441"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "0322386c-ec84-4d38-b895-c206b574f31c",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 10",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook10", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 2",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook2", "options": {} },
"id": "c44ce610-086c-45a6-b347-c7b5df5365ed",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "edc1994b-7cde-4aed-b077-fbc7f21b0f48"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "b4579dbf-d9c5-4b57-8e1f-883d91e8726b",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 3",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook3", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 4",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook4", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 5",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook5", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 6",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook6", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 7",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook7", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 8",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook8", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -0,0 +1,25 @@
{
"createdAt": "2024-08-06T12:19:51.268Z",
"updatedAt": "2024-08-06T12:20:45.000Z",
"name": "Multiple Webhook 9",
"active": true,
"nodes": [
{
"parameters": { "path": "multiple-webhook9", "options": {} },
"id": "34ac4500-9a29-4f4f-a604-134aa2cb2889",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [760, 400],
"webhookId": "47fe25ac-1376-4ee7-b9de-3fff1d49281c"
}
],
"connections": {},
"settings": { "executionOrder": "v1" },
"staticData": null,
"meta": { "templateCredsSetupCompleted": true, "responseMode": "lastNode", "options": {} },
"pinData": {},
"versionId": "106d4d2c-49c0-45b8-8421-99cc0c8b1589",
"triggerCount": 1,
"tags": []
}

View File

@@ -1,11 +1,19 @@
import { Container } from '@n8n/di';
import { In, type EntityManager } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { DateTime } from 'luxon';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import type { ExecutionStatus, IRun, WorkflowExecuteMode } from 'n8n-workflow';
import type { Logger } from 'n8n-core';
import { type ExecutionLifecycleHooks } from 'n8n-core';
import {
createDeferredPromise,
type ExecutionStatus,
type IRun,
type WorkflowExecuteMode,
} from 'n8n-workflow';
import type { Project } from '@/databases/entities/project';
import type { WorkflowEntity } from '@/databases/entities/workflow-entity';
import type { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import type { IWorkflowDb } from '@/interfaces';
import type { TypeUnit } from '@/modules/insights/database/entities/insights-shared';
import { InsightsMetadataRepository } from '@/modules/insights/database/repositories/insights-metadata.repository';
@@ -86,7 +94,9 @@ describe('workflowExecuteAfterHandler', () => {
});
// ACT
const now = DateTime.utc().toJSDate();
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT
const metadata = await insightsMetadataRepository.findOneBy({ workflowId: workflow.id });
@@ -114,6 +124,10 @@ describe('workflowExecuteAfterHandler', () => {
value: stoppedAt.diff(startedAt).toMillis(),
}),
);
// expect timestamp to be close to workflow execution start
for (const insight of allInsights) {
expect(insight.timestamp.getTime() / 1000).toBeCloseTo(now.getTime() / 1000, 0);
}
if (status === 'success') {
expect(allInsights).toContainEqual(
expect.objectContaining({
@@ -145,6 +159,7 @@ describe('workflowExecuteAfterHandler', () => {
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT
const metadata = await insightsMetadataRepository.findOneBy({ workflowId: workflow.id });
@@ -171,6 +186,7 @@ describe('workflowExecuteAfterHandler', () => {
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT
const metadata = await insightsMetadataRepository.findOneBy({ workflowId: workflow.id });
@@ -200,6 +216,7 @@ describe('workflowExecuteAfterHandler', () => {
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT
const metadata = await insightsMetadataRepository.findOneBy({ workflowId: workflow.id });
@@ -235,24 +252,421 @@ describe('workflowExecuteAfterHandler', () => {
}),
);
});
});
test("throws UnexpectedError if the execution's workflow has no owner", async () => {
describe('workflowExecuteAfterHandler - cacheMetadata', () => {
let insightsService: InsightsService;
let entityManagerMock = mock<EntityManager>();
const sharedWorkflowRepositoryMock: jest.Mocked<SharedWorkflowRepository> = {
manager: entityManagerMock,
} as unknown as jest.Mocked<SharedWorkflowRepository>;
const insightsRawRepository: jest.Mocked<InsightsRawRepository> = mock<InsightsRawRepository>();
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
mode: 'webhook',
status: 'success',
startedAt: startedAt.toJSDate(),
stoppedAt: stoppedAt.toJSDate(),
});
// Mock the transaction function
const trxMock = {
find: jest.fn(),
findBy: jest.fn(),
upsert: jest.fn(),
insert: jest.fn(),
};
entityManagerMock.transaction.mockImplementation(
jest.fn(async (runInTransaction: (entityManager: EntityManager) => Promise<void>) => {
await runInTransaction(trxMock as unknown as EntityManager);
}) as unknown as EntityManager['transaction'],
);
beforeAll(async () => {
insightsService = new InsightsService(
sharedWorkflowRepositoryMock,
Container.get(InsightsByPeriodRepository),
insightsRawRepository,
mock<Logger>(),
);
});
let project: Project;
let workflow: IWorkflowDb & WorkflowEntity;
beforeEach(async () => {
project = await createTeamProject();
workflow = await createWorkflow({}, project);
trxMock.find = jest.fn().mockResolvedValue([
{
workflow,
workflowId: workflow.id,
projectId: 'project-id',
project: { name: 'project-name' },
},
]);
trxMock.findBy = jest.fn().mockResolvedValue([
{
metaId: 'meta-id',
workflowId: workflow.id,
workflowName: workflow.name,
projectId: 'project-id',
projectName: 'project-name',
},
]);
});
test('reuses cached metadata for subsequent executions of the same workflow', async () => {
// ARRANGE
const workflow = await createWorkflow({});
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
mode: 'webhook',
status: 'success',
startedAt: startedAt.toJSDate(),
stoppedAt: stoppedAt.toJSDate(),
const ctx = mock<ExecutionLifecycleHooks>({
workflowData: { ...workflow, settings: undefined },
});
// ACT & ASSERT
await expect(insightsService.workflowExecuteAfterHandler(ctx, run)).rejects.toThrowError(
`Could not find an owner for the workflow with the name '${workflow.name}' and the id '${workflow.id}'`,
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT
expect(trxMock.find).toHaveBeenCalledWith(expect.anything(), {
where: { workflowId: In([workflow.id]), role: 'workflow:owner' },
relations: { project: true },
});
expect(trxMock.upsert).toHaveBeenCalledWith(
expect.anything(),
expect.arrayContaining([
{
workflowId: workflow.id,
workflowName: workflow.name,
projectId: 'project-id',
projectName: 'project-name',
},
]),
['workflowId'],
);
// ACT AGAIN with the same workflow
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT AGAIN
trxMock.find.mockClear();
trxMock.upsert.mockClear();
expect(trxMock.find).not.toHaveBeenCalled();
expect(trxMock.upsert).not.toHaveBeenCalled();
});
test('updates cached metadata if workflow details change', async () => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT
expect(trxMock.find).toHaveBeenCalled();
expect(trxMock.upsert).toHaveBeenCalled();
// Change the workflow name
workflow.name = 'new-workflow-name';
// ACT AGAIN with the same workflow
await insightsService.workflowExecuteAfterHandler(ctx, run);
await insightsService.flushEvents();
// ASSERT AGAIN
expect(trxMock.find).toHaveBeenCalledWith(expect.anything(), {
where: { workflowId: In([workflow.id]), role: 'workflow:owner' },
relations: { project: true },
});
expect(trxMock.upsert).toHaveBeenCalledWith(
expect.anything(),
expect.arrayContaining([
{
workflowId: workflow.id,
workflowName: workflow.name,
projectId: 'project-id',
projectName: 'project-name',
},
]),
['workflowId'],
);
});
});
describe('workflowExecuteAfterHandler - flushEvents', () => {
let project: Project;
let workflow: IWorkflowDb & WorkflowEntity;
let insightsService: InsightsService;
let entityManagerMock = mock<EntityManager>();
const sharedWorkflowRepositoryMock: jest.Mocked<SharedWorkflowRepository> = {
manager: entityManagerMock,
} as unknown as jest.Mocked<SharedWorkflowRepository>;
const logger = mock<Logger>({
scoped: jest.fn().mockReturnValue(
mock<Logger>({
error: jest.fn(),
}),
),
});
const startedAt = DateTime.utc();
const stoppedAt = startedAt.plus({ seconds: 5 });
const run = mock<IRun>({
mode: 'trigger',
status: 'success',
startedAt: startedAt.toJSDate(),
stoppedAt: stoppedAt.toJSDate(),
});
// Mock the transaction function
const trxMock = {
find: jest.fn(),
findBy: jest.fn(),
upsert: jest.fn(),
insert: jest.fn(),
};
entityManagerMock.transaction.mockImplementation(
jest.fn(async (runInTransaction: (entityManager: EntityManager) => Promise<void>) => {
await runInTransaction(trxMock as unknown as EntityManager);
}) as unknown as EntityManager['transaction'],
);
beforeAll(async () => {
insightsService = new InsightsService(
sharedWorkflowRepositoryMock,
mock<InsightsByPeriodRepository>(),
mock<InsightsRawRepository>(),
logger,
);
});
beforeEach(async () => {
project = await createTeamProject();
workflow = await createWorkflow({}, project);
trxMock.find = jest.fn().mockResolvedValue([
{
workflow,
workflowId: workflow.id,
projectId: 'project-id',
project: { name: 'project-name' },
},
]);
trxMock.findBy = jest.fn().mockResolvedValue([
{
metaId: 'meta-id',
workflowId: workflow.id,
workflowName: workflow.name,
projectId: 'project-id',
projectName: 'project-name',
},
]);
});
test('flushes events to the database once buffer is full', async () => {
// ARRANGE
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
// ACT
for (let i = 0; i < 333; i++) {
await insightsService.workflowExecuteAfterHandler(ctx, run);
}
// await for the next tick to ensure the flush is called
await new Promise(process.nextTick);
// ASSERT
expect(trxMock.insert).not.toHaveBeenCalled();
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
// ASSERT
// await for the next tick to ensure the flush is called
await new Promise(process.nextTick);
expect(trxMock.insert).toHaveBeenCalled();
});
test('flushes events to the database after a timeout', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
insightsService.scheduleFlushing();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
try {
// ACT
for (let i = 0; i < 33; i++) {
await insightsService.workflowExecuteAfterHandler(ctx, run);
}
// ASSERT
expect(trxMock.insert).not.toHaveBeenCalled();
// ACT
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
} finally {
jest.useRealTimers();
}
});
test('reschedule flush on flushing end', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
insightsService.scheduleFlushing();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
try {
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
// // ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await jest.advanceTimersByTimeAsync(31 * 1000);
expect(trxMock.insert).toHaveBeenCalledTimes(2);
} finally {
jest.useRealTimers();
}
});
test('flushes events to the database on shutdown', async () => {
// ARRANGE
trxMock.insert.mockClear();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
// ACT
for (let i = 0; i < 10; i++) {
await insightsService.workflowExecuteAfterHandler(ctx, run);
}
await insightsService.shutdown();
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
// Check that last insert call contains 30 events
const lastCallArgs = trxMock.insert.mock.calls.at(-1);
expect(lastCallArgs?.[1]).toHaveLength(30);
});
test('flushes events synchronously while shutting down', async () => {
// ARRANGE
// reset insights async flushing
insightsService.scheduleFlushing();
trxMock.insert.mockClear();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
// ACT
for (let i = 0; i < 10; i++) {
await insightsService.workflowExecuteAfterHandler(ctx, run);
}
void insightsService.shutdown();
// trigger a workflow after shutdown
await insightsService.workflowExecuteAfterHandler(ctx, run);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(2);
// Check that last insert call contains 3 events (the synchronous flush after shutdown)
let callArgs = trxMock.insert.mock.calls.at(-1);
expect(callArgs?.[1]).toHaveLength(3);
// ACT
// await for the next tick to ensure the flush is called
await new Promise(process.nextTick);
// Check that the one before that contains 30 events (the shutdown flush)
callArgs = trxMock.insert.mock.calls.at(-2);
expect(callArgs?.[1]).toHaveLength(30);
});
test('restore buffer events on flushing error', async () => {
// ARRANGE
jest.useFakeTimers();
trxMock.insert.mockClear();
trxMock.insert.mockRejectedValueOnce(new Error('Test error'));
insightsService.scheduleFlushing();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
try {
// ACT
await insightsService.workflowExecuteAfterHandler(ctx, run);
await jest.advanceTimersByTimeAsync(31 * 1000);
// ASSERT
expect(trxMock.insert).toHaveBeenCalledTimes(1);
const insertArgs = trxMock.insert.mock.calls.at(-1);
// ACT
await insightsService.flushEvents();
expect(trxMock.insert).toHaveBeenCalledTimes(2);
const newInsertArgs = trxMock.insert.mock.calls.at(-1);
// Check that last insert call contains the same 3 insights as previous failed flush
expect(newInsertArgs?.[1]).toHaveLength(3);
expect(newInsertArgs?.[1]).toEqual(insertArgs?.[1]);
} finally {
jest.useRealTimers();
}
});
test('waits for ongoing flush during shutdown', async () => {
// ARRANGE
const config = Container.get(InsightsConfig);
config.flushBatchSize = 10;
insightsService.scheduleFlushing();
trxMock.insert.mockClear();
const ctx = mock<ExecutionLifecycleHooks>({ workflowData: workflow });
// Flush will hang until we manually resolve it
const { resolve: flushResolve, promise: flushPromise } = createDeferredPromise();
// First flush will "hang" (simulate long save)
trxMock.insert.mockImplementationOnce(async () => {
await flushPromise;
});
// Each `workflowExecuteAfterHandler` adds 3 insights;
// we call it 4 times to exceed the flushBatchSize (10)
for (let i = 0; i < config.flushBatchSize / 3; i++) {
await insightsService.workflowExecuteAfterHandler(ctx, run);
}
// ACT
const shutdownPromise = insightsService.shutdown();
// At this point, shutdown should be waiting for ongoing flushes
let shutdownResolved = false;
void shutdownPromise.then(() => (shutdownResolved = true));
// Give shutdown a tick to reach the `await Promise.all(...)`
await new Promise(setImmediate);
// ASSERT
// shutdown should still be waiting for remaining flushes
expect(shutdownResolved).toBe(false);
// ACT
// Now resolve the hanging flush and await shutdown
flushResolve();
await shutdownPromise;
// ASSERT
expect(shutdownResolved).toBe(true);
expect(trxMock.insert).toHaveBeenCalledTimes(1);
});
});
@@ -294,7 +708,11 @@ describe('compaction', () => {
// create before so we can create the raw events in parallel
await createMetadata(workflow);
for (const timestamp of timestamps) {
await createRawInsightsEvent(workflow, { type: 'success', value: 1, timestamp });
await createRawInsightsEvent(workflow, {
type: 'success',
value: 1,
timestamp,
});
}
// ACT

View File

@@ -29,4 +29,18 @@ export class InsightsConfig {
*/
@Env('N8N_INSIGHTS_COMPACTION_DAILY_TO_WEEKLY_THRESHOLD_DAYS')
compactionDailyToWeeklyThresholdDays: number = 180;
/**
* The maximum number of insights data to keep in the buffer before flushing.
* Default: 1000
*/
@Env('N8N_INSIGHTS_FLUSH_BATCH_SIZE')
flushBatchSize: number = 1000;
/**
* The interval in seconds at which the insights data should be flushed to the database.
* Default: 30
*/
@Env('N8N_INSIGHTS_FLUSH_INTERVAL_SECONDS')
flushIntervalSeconds: number = 30;
}

View File

@@ -1,5 +1,8 @@
import type { InsightsSummary } from '@n8n/api-types';
import { Container, Service } from '@n8n/di';
import { In } from '@n8n/typeorm';
import { DateTime } from 'luxon';
import { Logger } from 'n8n-core';
import type { ExecutionLifecycleHooks } from 'n8n-core';
import {
UnexpectedError,
@@ -51,16 +54,34 @@ const shouldSkipMode: Record<WorkflowExecuteMode, boolean> = {
manual: true,
};
type BufferedInsight = Pick<InsightsRaw, 'type' | 'value' | 'timestamp'> & {
workflowId: string;
workflowName: string;
};
@Service()
export class InsightsService {
private readonly cachedMetadata: Map<string, InsightsMetadata> = new Map();
private compactInsightsTimer: NodeJS.Timer | undefined;
private bufferedInsights: Set<BufferedInsight> = new Set();
private flushInsightsRawBufferTimer: NodeJS.Timer | undefined;
private isAsynchronouslySavingInsights = true;
private flushesInProgress: Set<Promise<void>> = new Set();
constructor(
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly insightsByPeriodRepository: InsightsByPeriodRepository,
private readonly insightsRawRepository: InsightsRawRepository,
private readonly logger: Logger,
) {
this.logger = this.logger.scoped('insights');
this.initializeCompaction();
this.scheduleFlushing();
}
initializeCompaction() {
@@ -74,12 +95,41 @@ export class InsightsService {
);
}
scheduleFlushing() {
this.isAsynchronouslySavingInsights = true;
this.disposeFlushing();
this.flushInsightsRawBufferTimer = setTimeout(
async () => await this.flushEvents(),
config.flushIntervalSeconds * 1000,
);
}
disposeFlushing() {
if (this.flushInsightsRawBufferTimer !== undefined) {
clearInterval(this.flushInsightsRawBufferTimer);
this.flushInsightsRawBufferTimer = undefined;
}
}
@OnShutdown()
shutdown() {
async shutdown() {
if (this.compactInsightsTimer !== undefined) {
clearInterval(this.compactInsightsTimer);
this.compactInsightsTimer = undefined;
}
if (this.flushInsightsRawBufferTimer !== undefined) {
clearInterval(this.flushInsightsRawBufferTimer);
this.flushInsightsRawBufferTimer = undefined;
}
// Prevent new insights from being added to the buffer (and never flushed)
// when remaining workflows are handled during shutdown
this.isAsynchronouslySavingInsights = false;
// Wait for all in-progress asynchronous flushes
// Flush any remaining events
await Promise.all([...this.flushesInProgress, this.flushEvents()]);
}
async workflowExecuteAfterHandler(ctx: ExecutionLifecycleHooks, fullRunData: IRun) {
@@ -89,70 +139,149 @@ export class InsightsService {
const status = fullRunData.status === 'success' ? 'success' : 'failure';
const commonWorkflowData = {
workflowId: ctx.workflowData.id,
workflowName: ctx.workflowData.name,
timestamp: DateTime.utc().toJSDate(),
};
// success or failure event
this.bufferedInsights.add({
...commonWorkflowData,
type: status,
value: 1,
});
// run time event
if (fullRunData.stoppedAt) {
const value = fullRunData.stoppedAt.getTime() - fullRunData.startedAt.getTime();
this.bufferedInsights.add({
...commonWorkflowData,
type: 'runtime_ms',
value,
});
}
// time saved event
if (status === 'success' && ctx.workflowData.settings?.timeSavedPerExecution) {
this.bufferedInsights.add({
...commonWorkflowData,
type: 'time_saved_min',
value: ctx.workflowData.settings.timeSavedPerExecution,
});
}
if (!this.isAsynchronouslySavingInsights) {
// If we are not asynchronously saving insights, we need to flush the events
await this.flushEvents();
}
// If the buffer is full, flush the events asynchronously
if (this.bufferedInsights.size >= config.flushBatchSize) {
// Fire and forget flush to avoid blocking the workflow execute after handler
void this.flushEvents();
}
}
async saveInsightsMetadataAndRaw(insightsRawToInsertBuffer: Set<BufferedInsight>) {
const workflowIdNames: Map<string, string> = new Map();
for (const event of insightsRawToInsertBuffer) {
workflowIdNames.set(event.workflowId, event.workflowName);
}
await this.sharedWorkflowRepository.manager.transaction(async (trx) => {
const sharedWorkflow = await trx.findOne(SharedWorkflow, {
where: { workflowId: ctx.workflowData.id, role: 'workflow:owner' },
const sharedWorkflows = await trx.find(SharedWorkflow, {
where: { workflowId: In([...workflowIdNames.keys()]), role: 'workflow:owner' },
relations: { project: true },
});
if (!sharedWorkflow) {
throw new UnexpectedError(
`Could not find an owner for the workflow with the name '${ctx.workflowData.name}' and the id '${ctx.workflowData.id}'`,
);
}
// Upsert metadata for the workflows that are not already in the cache or have
// different project or workflow names
const metadataToUpsert = sharedWorkflows.reduce((acc, workflow) => {
const cachedMetadata = this.cachedMetadata.get(workflow.workflowId);
if (
!cachedMetadata ||
cachedMetadata.projectId !== workflow.projectId ||
cachedMetadata.projectName !== workflow.project.name ||
cachedMetadata.workflowName !== workflowIdNames.get(workflow.workflowId)
) {
const metadata = new InsightsMetadata();
metadata.projectId = workflow.projectId;
metadata.projectName = workflow.project.name;
metadata.workflowId = workflow.workflowId;
metadata.workflowName = workflowIdNames.get(workflow.workflowId)!;
await trx.upsert(
InsightsMetadata,
{
workflowId: ctx.workflowData.id,
workflowName: ctx.workflowData.name,
projectId: sharedWorkflow.projectId,
projectName: sharedWorkflow.project.name,
},
['workflowId'],
);
const metadata = await trx.findOneBy(InsightsMetadata, {
workflowId: ctx.workflowData.id,
acc.push(metadata);
}
return acc;
}, [] as InsightsMetadata[]);
await trx.upsert(InsightsMetadata, metadataToUpsert, ['workflowId']);
const upsertMetadata = await trx.findBy(InsightsMetadata, {
workflowId: In(metadataToUpsert.map((m) => m.workflowId)),
});
if (!metadata) {
// This can't happen, we just wrote the metadata in the same
// transaction.
throw new UnexpectedError(
`Could not find metadata for the workflow with the id '${ctx.workflowData.id}'`,
);
for (const metadata of upsertMetadata) {
this.cachedMetadata.set(metadata.workflowId, metadata);
}
// success or failure event
{
const event = new InsightsRaw();
event.metaId = metadata.metaId;
event.type = status;
event.value = 1;
await trx.insert(InsightsRaw, event);
const events: InsightsRaw[] = [];
for (const event of insightsRawToInsertBuffer) {
const insight = new InsightsRaw();
const metadata = this.cachedMetadata.get(event.workflowId);
if (!metadata) {
// could not find shared workflow for this insight (not supposed to happen)
throw new UnexpectedError(
`Could not find shared workflow for insight with workflowId ${event.workflowId}`,
);
}
insight.metaId = metadata.metaId;
insight.type = event.type;
insight.value = event.value;
insight.timestamp = event.timestamp;
events.push(insight);
}
// run time event
if (fullRunData.stoppedAt) {
const value = fullRunData.stoppedAt.getTime() - fullRunData.startedAt.getTime();
const event = new InsightsRaw();
event.metaId = metadata.metaId;
event.type = 'runtime_ms';
event.value = value;
await trx.insert(InsightsRaw, event);
}
// time saved event
if (status === 'success' && ctx.workflowData.settings?.timeSavedPerExecution) {
const event = new InsightsRaw();
event.metaId = metadata.metaId;
event.type = 'time_saved_min';
event.value = ctx.workflowData.settings.timeSavedPerExecution;
await trx.insert(InsightsRaw, event);
}
await trx.insert(InsightsRaw, events);
});
}
async flushEvents() {
// Prevent flushing if there are no events to flush
if (this.bufferedInsights.size === 0) {
return;
}
// Stop timer to prevent concurrent flush from timer
this.disposeFlushing();
// Copy the buffer to a new set to avoid concurrent modification
// while we are flushing the events
const bufferedInsightsToFlush = new Set(this.bufferedInsights);
this.bufferedInsights.clear();
let flushPromise: Promise<void> | undefined = undefined;
flushPromise = (async () => {
try {
await this.saveInsightsMetadataAndRaw(bufferedInsightsToFlush);
} catch (e) {
this.logger.error('Error while saving insights metadata and raw data', { error: e });
for (const event of bufferedInsightsToFlush) {
this.bufferedInsights.add(event);
}
} finally {
this.scheduleFlushing();
this.flushesInProgress.delete(flushPromise!);
}
})();
// Add the flush promise to the set of flushes in progress for shutdown await
this.flushesInProgress.add(flushPromise);
await flushPromise;
}
async compactInsights() {
let numberOfCompactedRawData: number;