feat(core): Add production root executions (#14845)

Co-authored-by: Guillaume Jacquart <jacquart.guillaume@gmail.com>
Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
Marc Littlemore
2025-04-29 11:32:47 +01:00
committed by GitHub
parent 9d2a65b3cb
commit 7f89244304
13 changed files with 279 additions and 16 deletions

View File

@@ -16,6 +16,9 @@ export class WorkflowStatistics {
@Column()
count: number;
@Column()
rootCount: number;
@Column(datetimeColumnType)
latestEvent: Date;

View File

@@ -0,0 +1,22 @@
import type { ReversibleMigration, MigrationContext } from '@/databases/types';
const columnName = 'rootCount';
const tableName = 'workflow_statistics';
export class AddWorkflowStatisticsRootCount1745587087521 implements ReversibleMigration {
async up({ escape, runQuery }: MigrationContext) {
const escapedTableName = escape.tableName(tableName);
const escapedColumnName = escape.columnName(columnName);
await runQuery(
`ALTER TABLE ${escapedTableName} ADD COLUMN ${escapedColumnName} INTEGER DEFAULT 0`,
);
}
async down({ escape, runQuery }: MigrationContext) {
const escapedTableName = escape.tableName(tableName);
const escapedColumnName = escape.columnName(columnName);
await runQuery(`ALTER TABLE ${escapedTableName} DROP COLUMN ${escapedColumnName}`);
}
}

View File

@@ -84,6 +84,7 @@ import { CreateFolderTable1738709609940 } from '../common/1738709609940-CreateFo
import { CreateAnalyticsTables1739549398681 } from '../common/1739549398681-CreateAnalyticsTables';
import { RenameAnalyticsToInsights1741167584277 } from '../common/1741167584277-RenameAnalyticsToInsights';
import { AddScopesColumnToApiKeys1742918400000 } from '../common/1742918400000-AddScopesColumnToApiKeys';
import { AddWorkflowStatisticsRootCount1745587087521 } from '../common/1745587087521-AddWorkflowStatisticsRootCount';
import { UpdateParentFolderIdColumn1740445074052 } from '../mysqldb/1740445074052-UpdateParentFolderIdColumn';
export const mysqlMigrations: Migration[] = [
@@ -172,4 +173,5 @@ export const mysqlMigrations: Migration[] = [
UpdateParentFolderIdColumn1740445074052,
RenameAnalyticsToInsights1741167584277,
AddScopesColumnToApiKeys1742918400000,
AddWorkflowStatisticsRootCount1745587087521,
];

View File

@@ -84,6 +84,7 @@ import { CreateFolderTable1738709609940 } from '../common/1738709609940-CreateFo
import { CreateAnalyticsTables1739549398681 } from '../common/1739549398681-CreateAnalyticsTables';
import { RenameAnalyticsToInsights1741167584277 } from '../common/1741167584277-RenameAnalyticsToInsights';
import { AddScopesColumnToApiKeys1742918400000 } from '../common/1742918400000-AddScopesColumnToApiKeys';
import { AddWorkflowStatisticsRootCount1745587087521 } from '../common/1745587087521-AddWorkflowStatisticsRootCount';
export const postgresMigrations: Migration[] = [
InitialMigration1587669153312,
@@ -170,4 +171,5 @@ export const postgresMigrations: Migration[] = [
UpdateParentFolderIdColumn1740445074052,
RenameAnalyticsToInsights1741167584277,
AddScopesColumnToApiKeys1742918400000,
AddWorkflowStatisticsRootCount1745587087521,
];

View File

@@ -81,6 +81,7 @@ import { CreateTestCaseExecutionTable1736947513045 } from '../common/17369475130
import { AddErrorColumnsToTestRuns1737715421462 } from '../common/1737715421462-AddErrorColumnsToTestRuns';
import { CreateAnalyticsTables1739549398681 } from '../common/1739549398681-CreateAnalyticsTables';
import { RenameAnalyticsToInsights1741167584277 } from '../common/1741167584277-RenameAnalyticsToInsights';
import { AddWorkflowStatisticsRootCount1745587087521 } from '../common/1745587087521-AddWorkflowStatisticsRootCount';
const sqliteMigrations: Migration[] = [
InitialMigration1588102412422,
@@ -164,6 +165,7 @@ const sqliteMigrations: Migration[] = [
UpdateParentFolderIdColumn1740445074052,
RenameAnalyticsToInsights1741167584277,
AddScopesColumnToApiKeys1742918400000,
AddWorkflowStatisticsRootCount1745587087521,
];
export { sqliteMigrations };

View File

@@ -5,6 +5,8 @@ import { mock, mockClear } from 'jest-mock-extended';
import { StatisticsNames, WorkflowStatistics } from '@/databases/entities/workflow-statistics';
import { WorkflowStatisticsRepository } from '@/databases/repositories/workflow-statistics.repository';
import { mockEntityManager } from '@test/mocking';
import { createWorkflow } from '@test-integration/db/workflows';
import * as testDb from '@test-integration/test-db';
describe('insertWorkflowStatistics', () => {
const entityManager = mockEntityManager(WorkflowStatistics);
@@ -51,3 +53,69 @@ describe('insertWorkflowStatistics', () => {
expect(insertionResult).toBe('failed');
});
});
describe('upsertWorkflowStatistics', () => {
let repository: WorkflowStatisticsRepository;
beforeAll(async () => {
Container.reset();
await testDb.init();
repository = Container.get(WorkflowStatisticsRepository);
});
afterAll(async () => {
await testDb.terminate();
});
beforeEach(async () => {
await testDb.truncate(['WorkflowStatistics']);
});
test('Successfully inserts data when it is not yet present', async () => {
// ARRANGE
const workflow = await createWorkflow({});
// ACT
const upsertResult = await repository.upsertWorkflowStatistics(
StatisticsNames.productionSuccess,
workflow.id,
true,
);
// ASSERT
expect(upsertResult).toBe('insert');
const insertedData = await repository.find();
expect(insertedData).toHaveLength(1);
expect(insertedData[0].workflowId).toBe(workflow.id);
expect(insertedData[0].name).toBe(StatisticsNames.productionSuccess);
expect(insertedData[0].count).toBe(1);
expect(insertedData[0].rootCount).toBe(1);
});
test('Successfully updates data when it is already present', async () => {
// ARRANGE
const workflow = await createWorkflow({});
await repository.insert({
workflowId: workflow.id,
name: StatisticsNames.productionSuccess,
count: 1,
rootCount: 1,
latestEvent: new Date(),
});
// ACT
const result = await repository.upsertWorkflowStatistics(
StatisticsNames.productionSuccess,
workflow.id,
false,
);
// ASSERT
expect(result).toBe('update');
const updatedData = await repository.find();
expect(updatedData).toHaveLength(1);
expect(updatedData[0].workflowId).toBe(workflow.id);
expect(updatedData[0].name).toBe(StatisticsNames.productionSuccess);
expect(updatedData[0].count).toBe(2);
expect(updatedData[0].rootCount).toBe(1);
});
});

View File

@@ -19,6 +19,10 @@ export class LicenseMetricsRepository extends Repository<LicenseMetrics> {
return this.manager.connection.driver.escape(`${tablePrefix}${name}`);
}
toColumnName(name: string) {
return this.manager.connection.driver.escape(name);
}
async getLicenseRenewalMetrics() {
type Row = {
enabled_user_count: string | number;
@@ -27,6 +31,7 @@ export class LicenseMetricsRepository extends Repository<LicenseMetrics> {
total_workflow_count: string | number;
total_credentials_count: string | number;
production_executions_count: string | number;
production_root_executions_count: string | number;
manual_executions_count: string | number;
};
@@ -43,6 +48,7 @@ export class LicenseMetricsRepository extends Repository<LicenseMetrics> {
total_workflow_count: totalWorkflows,
total_credentials_count: totalCredentials,
production_executions_count: productionExecutions,
production_root_executions_count: productionRootExecutions,
manual_executions_count: manualExecutions,
},
] = (await this.query(`
@@ -53,6 +59,7 @@ export class LicenseMetricsRepository extends Repository<LicenseMetrics> {
(SELECT COUNT(*) FROM ${workflowTable}) AS total_workflow_count,
(SELECT COUNT(*) FROM ${credentialTable}) AS total_credentials_count,
(SELECT SUM(count) FROM ${workflowStatsTable} WHERE name IN ('production_success', 'production_error')) AS production_executions_count,
(SELECT SUM(${this.toColumnName('rootCount')}) FROM ${workflowStatsTable} WHERE name IN ('production_success', 'production_error')) AS production_root_executions_count,
(SELECT SUM(count) FROM ${workflowStatsTable} WHERE name IN ('manual_success', 'manual_error')) AS manual_executions_count;
`)) as Row[];
@@ -66,6 +73,7 @@ export class LicenseMetricsRepository extends Repository<LicenseMetrics> {
totalWorkflows: toNumber(totalWorkflows),
totalCredentials: toNumber(totalCredentials),
productionExecutions: toNumber(productionExecutions),
productionRootExecutions: toNumber(productionRootExecutions),
manualExecutions: toNumber(manualExecutions),
};
}

View File

@@ -35,6 +35,7 @@ export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics>
workflowId,
name: eventName,
count: 1,
rootCount: 1,
latestEvent: new Date(),
});
return 'insert';
@@ -51,16 +52,20 @@ export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics>
async upsertWorkflowStatistics(
eventName: StatisticsNames,
workflowId: string,
isRootExecution: boolean,
): Promise<StatisticsUpsertResult> {
const dbType = this.globalConfig.database.type;
const { tableName } = this.metadata;
try {
if (dbType === 'sqlite') {
await this.query(
`INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent")
VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP)
`INSERT INTO "${tableName}" ("count", "rootCount", "name", "workflowId", "latestEvent")
VALUES (1, ${isRootExecution ? '1' : '0'}, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP)
ON CONFLICT (workflowId, name)
DO UPDATE SET count = count + 1, latestEvent = CURRENT_TIMESTAMP`,
DO UPDATE SET
count = count + 1,
rootCount = ${isRootExecution ? 'rootCount + 1' : 'rootCount'},
latestEvent = CURRENT_TIMESTAMP`,
);
// SQLite does not offer a reliable way to know whether or not an insert or update happened.
// We'll use a naive approach in this case. Query again after and it might cause us to miss the
@@ -73,13 +78,19 @@ export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics>
},
});
return counter?.count === 1 ? 'insert' : 'failed';
return (counter?.count ?? 0) > 1 ? 'update' : counter?.count === 1 ? 'insert' : 'failed';
} else if (dbType === 'postgresdb') {
const upsertRootCount = isRootExecution
? `"${tableName}"."rootCount" + 1`
: `"${tableName}"."rootCount"`;
const queryResult = (await this.query(
`INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent")
VALUES (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP)
`INSERT INTO "${tableName}" ("count", "rootCount", "name", "workflowId", "latestEvent")
VALUES (1, ${isRootExecution ? '1' : '0'}, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP)
ON CONFLICT ("name", "workflowId")
DO UPDATE SET "count" = "${tableName}"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP
DO UPDATE SET
"count" = "${tableName}"."count" + 1,
"rootCount" = ${upsertRootCount},
"latestEvent" = CURRENT_TIMESTAMP
RETURNING *;`,
)) as Array<{
count: number;
@@ -87,10 +98,13 @@ export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics>
return queryResult[0].count === 1 ? 'insert' : 'update';
} else {
const queryResult = (await this.query(
`INSERT INTO \`${tableName}\` (count, name, workflowId, latestEvent)
VALUES (1, "${eventName}", "${workflowId}", NOW())
`INSERT INTO \`${tableName}\` (count, rootCount, name, workflowId, latestEvent)
VALUES (1, ${isRootExecution ? '1' : '0'}, "${eventName}", "${workflowId}", NOW())
ON DUPLICATE KEY
UPDATE count = count + 1, latestEvent = NOW();`,
UPDATE
count = count + 1,
rootCount = ${isRootExecution ? 'rootCount + 1' : 'rootCount'},
latestEvent = NOW();`,
)) as {
affectedRows: number;
};
@@ -98,6 +112,7 @@ export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics>
return queryResult.affectedRows === 1 ? 'insert' : 'update';
}
} catch (error) {
console.log('error', error);
if (error instanceof QueryFailedError) return 'failed';
throw error;
}

View File

@@ -46,6 +46,7 @@ describe('LicenseMetricsService', () => {
totalUsers: 400,
totalCredentials: 500,
productionExecutions: 600,
productionRootExecutions: 550,
manualExecutions: 700,
};
@@ -60,6 +61,10 @@ describe('LicenseMetricsService', () => {
{ name: 'totalUsers', value: mockRenewalMetrics.totalUsers },
{ name: 'totalCredentials', value: mockRenewalMetrics.totalCredentials },
{ name: 'productionExecutions', value: mockRenewalMetrics.productionExecutions },
{
name: 'productionRootExecutions',
value: mockRenewalMetrics.productionRootExecutions,
},
{ name: 'manualExecutions', value: mockRenewalMetrics.manualExecutions },
{ name: 'activeWorkflowTriggers', value: mockActiveTriggerCount },
]);

View File

@@ -18,6 +18,7 @@ export class LicenseMetricsService {
totalUsers,
totalCredentials,
productionExecutions,
productionRootExecutions,
manualExecutions,
} = await this.licenseMetricsRepository.getLicenseRenewalMetrics();
@@ -30,6 +31,7 @@ export class LicenseMetricsService {
{ name: 'totalUsers', value: totalUsers },
{ name: 'totalCredentials', value: totalCredentials },
{ name: 'productionExecutions', value: productionExecutions },
{ name: 'productionRootExecutions', value: productionRootExecutions },
{ name: 'manualExecutions', value: manualExecutions },
{ name: 'activeWorkflowTriggers', value: activeTriggerCount },
];

View File

@@ -8,7 +8,13 @@ import {
} from '@n8n/typeorm';
import { mocked } from 'jest-mock';
import { mock } from 'jest-mock-extended';
import type { INode, IRun, WorkflowExecuteMode } from 'n8n-workflow';
import type { IWorkflowBase } from 'n8n-workflow';
import {
type ExecutionStatus,
type INode,
type IRun,
type WorkflowExecuteMode,
} from 'n8n-workflow';
import config from '@/config';
import type { Project } from '@/databases/entities/project';
@@ -24,6 +30,7 @@ import { mockInstance } from '@test/mocking';
describe('WorkflowStatisticsService', () => {
const fakeUser = mock<User>({ id: 'abcde-fghij' });
const fakeProject = mock<Project>({ id: '12345-67890', type: 'personal' });
const fakeWorkflow = mock<IWorkflowBase>({ id: '1' });
const ownershipService = mockInstance(OwnershipService);
const userService = mockInstance(UserService);
const globalConfig = Container.get(GlobalConfig);
@@ -70,6 +77,85 @@ describe('WorkflowStatisticsService', () => {
};
describe('workflowExecutionCompleted', () => {
const rootCountRegex = /"?rootCount"?\s*=\s*(?:"?\w+"?\.)?"?rootCount"?\s*\+\s*1/;
test.each<WorkflowExecuteMode>(['cli', 'error', 'retry', 'trigger', 'webhook', 'evaluation'])(
'should upsert with root executions for execution mode %s',
async (mode) => {
// Call the function with a production success result, ensure metrics hook gets called
const runData: IRun = {
finished: true,
status: 'success',
data: { resultData: { runData: {} } },
mode,
startedAt: new Date(),
};
await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData);
expect(entityManager.query).toHaveBeenCalledWith(
expect.stringMatching(rootCountRegex),
undefined,
);
},
);
test.each<WorkflowExecuteMode>(['manual', 'integrated', 'internal'])(
'should upsert without root executions for execution mode %s',
async (mode) => {
const runData: IRun = {
finished: true,
status: 'success',
data: { resultData: { runData: {} } },
mode,
startedAt: new Date(),
};
await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData);
expect(entityManager.query).toHaveBeenCalledWith(
expect.not.stringMatching(rootCountRegex),
undefined,
);
},
);
test.each<ExecutionStatus>(['success', 'crashed', 'error'])(
'should upsert with root executions for execution status %s',
async (status) => {
const runData: IRun = {
finished: true,
status,
data: { resultData: { runData: {} } },
mode: 'trigger',
startedAt: new Date(),
};
await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData);
expect(entityManager.query).toHaveBeenCalledWith(
expect.stringMatching(rootCountRegex),
undefined,
);
},
);
test.each<ExecutionStatus>(['canceled', 'new', 'running', 'unknown', 'waiting'])(
'should upsert without root executions for execution status %s',
async (status) => {
const runData: IRun = {
finished: true,
status,
data: { resultData: { runData: {} } },
mode: 'trigger',
startedAt: new Date(),
};
await workflowStatisticsService.workflowExecutionCompleted(fakeWorkflow, runData);
expect(entityManager.query).toHaveBeenCalledWith(
expect.not.stringMatching(rootCountRegex),
undefined,
);
},
);
test('should create metrics for production successes', async () => {
// Call the function with a production success result, ensure metrics hook gets called
const workflow = {

View File

@@ -1,6 +1,12 @@
import { Service } from '@n8n/di';
import { Logger } from 'n8n-core';
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import type {
ExecutionStatus,
INode,
IRun,
IWorkflowBase,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { StatisticsNames } from '@/databases/entities/workflow-statistics';
import { WorkflowStatisticsRepository } from '@/databases/repositories/workflow-statistics.repository';
@@ -10,6 +16,35 @@ import { TypedEmitter } from '@/typed-emitter';
import { OwnershipService } from './ownership.service';
const isStatusRootExecution = {
success: true,
crashed: true,
error: true,
canceled: false,
new: false,
running: false,
unknown: false,
waiting: false,
} satisfies Record<ExecutionStatus, boolean>;
const isModeRootExecution = {
cli: true,
error: true,
retry: true,
trigger: true,
webhook: true,
evaluation: true,
// sub workflows
integrated: false,
// error workflows
internal: false,
manual: false,
} satisfies Record<WorkflowExecuteMode, boolean>;
type WorkflowStatisticsEvents = {
nodeFetchedData: { workflowId: string; node: INode };
workflowExecutionCompleted: { workflowData: IWorkflowBase; fullRunData: IRun };
@@ -52,11 +87,13 @@ export class WorkflowStatisticsService extends TypedEmitter<WorkflowStatisticsEv
async workflowExecutionCompleted(workflowData: IWorkflowBase, runData: IRun): Promise<void> {
// Determine the name of the statistic
const finished = runData.finished ? runData.finished : false;
const isSuccess = runData.status === 'success';
const manual = runData.mode === 'manual';
let name: StatisticsNames;
const isRootExecution =
isModeRootExecution[runData.mode] && isStatusRootExecution[runData.status];
if (finished) {
if (isSuccess) {
if (manual) name = StatisticsNames.manualSuccess;
else name = StatisticsNames.productionSuccess;
} else {
@@ -69,7 +106,11 @@ export class WorkflowStatisticsService extends TypedEmitter<WorkflowStatisticsEv
if (!workflowId) return;
try {
const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId);
const upsertResult = await this.repository.upsertWorkflowStatistics(
name,
workflowId,
isRootExecution,
);
if (name === StatisticsNames.productionSuccess && upsertResult === 'insert') {
const project = await this.ownershipService.getWorkflowProjectCached(workflowId);

View File

@@ -60,6 +60,11 @@ describe('LicenseMetricsRepository', () => {
StatisticsNames.manualError,
secondWorkflow.id,
),
workflowStatisticsRepository.upsertWorkflowStatistics(
StatisticsNames.productionSuccess,
secondWorkflow.id,
true,
),
]);
const metrics = await licenseMetricsRepository.getLicenseRenewalMetrics();
@@ -70,7 +75,8 @@ describe('LicenseMetricsRepository', () => {
totalCredentials: 2,
totalWorkflows: 5,
activeWorkflows: 3,
productionExecutions: 2,
productionExecutions: 3,
productionRootExecutions: 3,
manualExecutions: 2,
});
});
@@ -87,6 +93,7 @@ describe('LicenseMetricsRepository', () => {
totalWorkflows: 3,
activeWorkflows: 3,
productionExecutions: 0, // not NaN
productionRootExecutions: 0, // not NaN
manualExecutions: 0, // not NaN
});
});