refactor(core): Refactor WorkflowStatistics code (no-changelog) (#6617)

refactor(core): Refactor WorkflowStatistics code
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-07-18 11:28:24 +02:00
committed by GitHub
parent e7091d6726
commit f4a18ba87d
20 changed files with 432 additions and 571 deletions

View File

@@ -655,24 +655,10 @@ export interface IWorkflowExecuteProcess {
workflowExecute: WorkflowExecute; workflowExecute: WorkflowExecute;
} }
export interface IWorkflowStatisticsCounts {
productionSuccess: number;
productionError: number;
manualSuccess: number;
manualError: number;
}
export interface IWorkflowStatisticsDataLoaded { export interface IWorkflowStatisticsDataLoaded {
dataLoaded: boolean; dataLoaded: boolean;
} }
export interface IWorkflowStatisticsTimestamps {
productionSuccess: Date | null;
productionError: Date | null;
manualSuccess: Date | null;
manualError: Date | null;
}
export type WhereClause = Record<string, { [key: string]: string | FindOperator<unknown> }>; export type WhereClause = Record<string, { [key: string]: string | FindOperator<unknown> }>;
// ---------------------------------- // ----------------------------------

View File

@@ -27,6 +27,7 @@ import { Telemetry } from '@/telemetry';
import type { AuthProviderType } from '@db/entities/AuthIdentity'; import type { AuthProviderType } from '@db/entities/AuthIdentity';
import { RoleService } from './role/role.service'; import { RoleService } from './role/role.service';
import { eventBus } from './eventbus'; import { eventBus } from './eventbus';
import { EventsService } from '@/services/events.service';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import { N8N_VERSION } from '@/constants'; import { N8N_VERSION } from '@/constants';
import { NodeTypes } from './NodeTypes'; import { NodeTypes } from './NodeTypes';
@@ -58,7 +59,15 @@ export class InternalHooks implements IInternalHooksClass {
private nodeTypes: NodeTypes, private nodeTypes: NodeTypes,
private roleService: RoleService, private roleService: RoleService,
private executionRepository: ExecutionRepository, private executionRepository: ExecutionRepository,
) {} eventsService: EventsService,
) {
eventsService.on('telemetry.onFirstProductionWorkflowSuccess', async (metrics) =>
this.onFirstProductionWorkflowSuccess(metrics),
);
eventsService.on('telemetry.onFirstWorkflowDataLoad', async (metrics) =>
this.onFirstWorkflowDataLoad(metrics),
);
}
async init(instanceId: string) { async init(instanceId: string) {
this.instanceId = instanceId; this.instanceId = instanceId;

View File

@@ -97,10 +97,10 @@ import {
TagsController, TagsController,
TranslationController, TranslationController,
UsersController, UsersController,
WorkflowStatisticsController,
} from '@/controllers'; } from '@/controllers';
import { executionsController } from '@/executions/executions.controller'; import { executionsController } from '@/executions/executions.controller';
import { workflowStatsController } from '@/api/workflowStats.api';
import { isApiEnabled, loadPublicApiVersions } from '@/PublicApi'; import { isApiEnabled, loadPublicApiVersions } from '@/PublicApi';
import { import {
getInstanceBaseUrl, getInstanceBaseUrl,
@@ -136,7 +136,6 @@ import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBu
import { licenseController } from './license/license.controller'; import { licenseController } from './license/license.controller';
import { Push, setupPushServer, setupPushHandler } from '@/push'; import { Push, setupPushServer, setupPushHandler } from '@/push';
import { setupAuthMiddlewares } from './middlewares'; import { setupAuthMiddlewares } from './middlewares';
import { initEvents } from './events';
import { import {
getLdapLoginLabel, getLdapLoginLabel,
handleLdapInit, handleLdapInit,
@@ -383,9 +382,6 @@ export class Server extends AbstractServer {
saml_enabled: isSamlCurrentAuthenticationMethod(), saml_enabled: isSamlCurrentAuthenticationMethod(),
}; };
// Set up event handling
initEvents();
if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') {
const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials'); const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials');
await reloadNodesAndCredentials(this.loadNodesAndCredentials, this.nodeTypes, this.push); await reloadNodesAndCredentials(this.loadNodesAndCredentials, this.nodeTypes, this.push);
@@ -496,6 +492,7 @@ export class Server extends AbstractServer {
}), }),
Container.get(SamlController), Container.get(SamlController),
Container.get(SourceControlController), Container.get(SourceControlController),
Container.get(WorkflowStatisticsController),
]; ];
if (isLdapEnabled()) { if (isLdapEnabled()) {
@@ -604,11 +601,6 @@ export class Server extends AbstractServer {
// ---------------------------------------- // ----------------------------------------
this.app.use(`/${this.restEndpoint}/license`, licenseController); this.app.use(`/${this.restEndpoint}/license`, licenseController);
// ----------------------------------------
// Workflow Statistics
// ----------------------------------------
this.app.use(`/${this.restEndpoint}/workflow-stats`, workflowStatsController);
// ---------------------------------------- // ----------------------------------------
// SAML // SAML
// ---------------------------------------- // ----------------------------------------

View File

@@ -17,8 +17,9 @@ import type express from 'express';
import get from 'lodash/get'; import get from 'lodash/get';
import stream from 'stream'; import stream from 'stream';
import { promisify } from 'util'; import { promisify } from 'util';
import { Container } from 'typedi';
import { BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core'; import { BinaryDataManager, NodeExecuteFunctions } from 'n8n-core';
import type { import type {
IBinaryData, IBinaryData,
@@ -60,7 +61,7 @@ import { ActiveExecutions } from '@/ActiveExecutions';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi'; import { EventsService } from '@/services/events.service';
const pipeline = promisify(stream.pipeline); const pipeline = promisify(stream.pipeline);
@@ -243,7 +244,7 @@ export async function executeWebhook(
NodeExecuteFunctions, NodeExecuteFunctions,
executionMode, executionMode,
); );
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflow.id, workflowStartNode); Container.get(EventsService).emit('nodeFetchedData', workflow.id, workflowStartNode);
} catch (err) { } catch (err) {
// Send error response to webhook caller // Send error response to webhook caller
const errorMessage = 'Workflow Webhook Error: Workflow could not be started!'; const errorMessage = 'Workflow Webhook Error: Workflow could not be started!';

View File

@@ -15,7 +15,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable func-names */ /* eslint-disable func-names */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { BinaryDataManager, eventEmitter, UserSettings, WorkflowExecute } from 'n8n-core'; import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import type { import type {
IDataObject, IDataObject,
@@ -43,6 +43,7 @@ import {
} from 'n8n-workflow'; } from 'n8n-workflow';
import pick from 'lodash/pick'; import pick from 'lodash/pick';
import { Container } from 'typedi';
import type { FindOptionsWhere } from 'typeorm'; import type { FindOptionsWhere } from 'typeorm';
import { LessThanOrEqual, In } from 'typeorm'; import { LessThanOrEqual, In } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils'; import { DateUtils } from 'typeorm/util/DateUtils';
@@ -67,10 +68,10 @@ import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker'; import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services'; import { WorkflowsService } from './workflows/workflows.services';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata'; import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata';
import { ExecutionRepository } from '@db/repositories'; import { ExecutionRepository } from '@db/repositories';
import { EventsService } from '@/services/events.service';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@@ -273,6 +274,7 @@ export async function saveExecutionMetadata(
* *
*/ */
function hookFunctionsPush(): IWorkflowExecuteHooks { function hookFunctionsPush(): IWorkflowExecuteHooks {
const pushInstance = Container.get(Push);
return { return {
nodeExecuteBefore: [ nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> { async function (this: WorkflowHooks, nodeName: string): Promise<void> {
@@ -289,7 +291,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id, workflowId: this.workflowData.id,
}); });
const pushInstance = Container.get(Push);
pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId); pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId);
}, },
], ],
@@ -307,7 +308,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id, workflowId: this.workflowData.id,
}); });
const pushInstance = Container.get(Push);
pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId); pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId);
}, },
], ],
@@ -324,7 +324,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
if (sessionId === undefined) { if (sessionId === undefined) {
return; return;
} }
const pushInstance = Container.get(Push);
pushInstance.send( pushInstance.send(
'executionStarted', 'executionStarted',
{ {
@@ -390,7 +389,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
retryOf, retryOf,
}; };
const pushInstance = Container.get(Push);
pushInstance.send('executionFinished', sendData, sessionId); pushInstance.send('executionFinished', sendData, sessionId);
}, },
], ],
@@ -399,7 +397,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks { export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks {
const externalHooks = Container.get(ExternalHooks); const externalHooks = Container.get(ExternalHooks);
return { return {
workflowExecuteBefore: [ workflowExecuteBefore: [
async function (this: WorkflowHooks, workflow: Workflow): Promise<void> { async function (this: WorkflowHooks, workflow: Workflow): Promise<void> {
@@ -514,23 +511,17 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
* *
*/ */
function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
const internalHooks = Container.get(InternalHooks);
const eventsService = Container.get(EventsService);
return { return {
nodeExecuteBefore: [ nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> { async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void Container.get(InternalHooks).onNodeBeforeExecute( void internalHooks.onNodeBeforeExecute(this.executionId, this.workflowData, nodeName);
this.executionId,
this.workflowData,
nodeName,
);
}, },
], ],
nodeExecuteAfter: [ nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> { async function (this: WorkflowHooks, nodeName: string): Promise<void> {
void Container.get(InternalHooks).onNodePostExecute( void internalHooks.onNodePostExecute(this.executionId, this.workflowData, nodeName);
this.executionId,
this.workflowData,
nodeName,
);
}, },
], ],
workflowExecuteBefore: [], workflowExecuteBefore: [],
@@ -711,17 +702,13 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
); );
} }
} finally { } finally {
eventEmitter.emit( eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData);
eventEmitter.types.workflowExecutionCompleted,
this.workflowData,
fullRunData,
);
} }
}, },
], ],
nodeFetchedData: [ nodeFetchedData: [
async (workflowId: string, node: INode) => { async (workflowId: string, node: INode) => {
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node); eventsService.emit('nodeFetchedData', workflowId, node);
}, },
], ],
}; };
@@ -734,6 +721,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
* *
*/ */
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const eventsService = Container.get(EventsService);
return { return {
nodeExecuteBefore: [], nodeExecuteBefore: [],
nodeExecuteAfter: [], nodeExecuteAfter: [],
@@ -834,17 +822,13 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
this.retryOf, this.retryOf,
); );
} finally { } finally {
eventEmitter.emit( eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData);
eventEmitter.types.workflowExecutionCompleted,
this.workflowData,
fullRunData,
);
} }
}, },
], ],
nodeFetchedData: [ nodeFetchedData: [
async (workflowId: string, node: INode) => { async (workflowId: string, node: INode) => {
eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node); eventsService.emit('nodeFetchedData', workflowId, node);
}, },
], ],
}; };
@@ -951,10 +935,12 @@ async function executeWorkflow(
parentWorkflowSettings?: IWorkflowSettings; parentWorkflowSettings?: IWorkflowSettings;
}, },
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> { ): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const internalHooks = Container.get(InternalHooks);
const externalHooks = Container.get(ExternalHooks); const externalHooks = Container.get(ExternalHooks);
await externalHooks.init(); await externalHooks.init();
const nodeTypes = Container.get(NodeTypes); const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions);
const workflowData = const workflowData =
options.loadedWorkflowData ?? options.loadedWorkflowData ??
@@ -984,10 +970,10 @@ async function executeWorkflow(
executionId = executionId =
options.parentExecutionId !== undefined options.parentExecutionId !== undefined
? options.parentExecutionId ? options.parentExecutionId
: await Container.get(ActiveExecutions).add(runData); : await activeExecutions.add(runData);
} }
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData); void internalHooks.onWorkflowBeforeExecute(executionId || '', runData);
let data; let data;
try { try {
@@ -1077,7 +1063,7 @@ async function executeWorkflow(
} }
// remove execution from active executions // remove execution from active executions
Container.get(ActiveExecutions).remove(executionId, fullRunData); activeExecutions.remove(executionId, fullRunData);
await Container.get(ExecutionRepository).updateExistingExecution( await Container.get(ExecutionRepository).updateExistingExecution(
executionId, executionId,
@@ -1092,21 +1078,16 @@ async function executeWorkflow(
await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);
void Container.get(InternalHooks).onWorkflowPostExecute( void internalHooks.onWorkflowPostExecute(executionId, workflowData, data, additionalData.userId);
executionId,
workflowData,
data,
additionalData.userId,
);
if (data.finished === true) { if (data.finished === true) {
// Workflow did finish successfully // Workflow did finish successfully
Container.get(ActiveExecutions).remove(executionId, data); activeExecutions.remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main; return returnData!.data!.main;
} }
Container.get(ActiveExecutions).remove(executionId, data); activeExecutions.remove(executionId, data);
// Workflow did fail // Workflow did fail
const { error } = data.data.resultData; const { error } = data.data.resultData;
// eslint-disable-next-line @typescript-eslint/no-throw-literal // eslint-disable-next-line @typescript-eslint/no-throw-literal

View File

@@ -1,187 +0,0 @@
import type { User } from '@db/entities/User';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import express from 'express';
import { LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
import type {
IWorkflowStatisticsCounts,
IWorkflowStatisticsDataLoaded,
IWorkflowStatisticsTimestamps,
} from '@/Interfaces';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { getLogger } from '../Logger';
import type { ExecutionRequest } from '../requests';
export const workflowStatsController = express.Router();
// Helper function that validates the ID, return a flag stating whether the request is allowed
async function checkWorkflowId(workflowId: string, user: User): Promise<boolean> {
// Check permissions
const shared = await Db.collections.SharedWorkflow.findOne({
relations: ['workflow'],
where: whereClause({
user,
entityType: 'workflow',
entityId: workflowId,
}),
});
if (!shared) {
LoggerProxy.verbose('User attempted to read a workflow without permissions', {
workflowId,
userId: user.id,
});
return false;
}
return true;
}
/**
* Initialize Logger if needed
*/
workflowStatsController.use((req, res, next) => {
try {
LoggerProxy.getInstance();
} catch (error) {
LoggerProxy.init(getLogger());
}
next();
});
/**
* Check that the workflow ID is valid and allowed to be read by the user
*/
workflowStatsController.use(async (req: ExecutionRequest.Get, res, next) => {
const allowed = await checkWorkflowId(req.params.id, req.user);
if (allowed) {
next();
} else {
// Otherwise, make and return an error
const response = new ResponseHelper.NotFoundError(`Workflow ${req.params.id} does not exist.`);
next(response);
}
});
/**
* GET /workflow-stats/:id/counts/
*/
workflowStatsController.get(
'/:id/counts/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsCounts> => {
// Get counts from DB
const workflowId = req.params.id;
// Find the stats for this workflow
const stats = await Db.collections.WorkflowStatistics.find({
select: ['count', 'name'],
where: {
workflowId,
},
});
const data: IWorkflowStatisticsCounts = {
productionSuccess: 0,
productionError: 0,
manualSuccess: 0,
manualError: 0,
};
// There will be a maximum of 4 stats (currently)
stats.forEach(({ count, name }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = count;
break;
case StatisticsNames.manualSuccess:
data.manualSuccess = count;
break;
case StatisticsNames.productionError:
data.productionError = count;
break;
case StatisticsNames.productionSuccess:
data.productionSuccess = count;
}
});
return data;
}),
);
/**
* GET /workflow-stats/:id/times/
*/
workflowStatsController.get(
'/:id/times/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsTimestamps> => {
// Get times from DB
const workflowId = req.params.id;
// Find the stats for this workflow
const stats = await Db.collections.WorkflowStatistics.find({
select: ['latestEvent', 'name'],
where: {
workflowId,
},
});
const data: IWorkflowStatisticsTimestamps = {
productionSuccess: null,
productionError: null,
manualSuccess: null,
manualError: null,
};
// There will be a maximum of 4 stats (currently)
stats.forEach(({ latestEvent, name }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = latestEvent;
break;
case StatisticsNames.manualSuccess:
data.manualSuccess = latestEvent;
break;
case StatisticsNames.productionError:
data.productionError = latestEvent;
break;
case StatisticsNames.productionSuccess:
data.productionSuccess = latestEvent;
}
});
return data;
}),
);
/**
* GET /workflow-stats/:id/data-loaded/
*/
workflowStatsController.get(
'/:id/data-loaded/',
ResponseHelper.send(async (req: ExecutionRequest.Get): Promise<IWorkflowStatisticsDataLoaded> => {
// Get flag
const workflowId = req.params.id;
// Get the flag
const stats = await Db.collections.WorkflowStatistics.findOne({
select: ['latestEvent'],
where: {
workflowId,
name: StatisticsNames.dataLoaded,
},
});
const data: IWorkflowStatisticsDataLoaded = {
dataLoaded: stats ? true : false,
};
return data;
}),
);

View File

@@ -10,7 +10,6 @@ import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper'; import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils'; import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils';
import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand'; import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi'; import { Container } from 'typedi';
@@ -36,9 +35,6 @@ export class Execute extends BaseCommand {
await super.init(); await super.init();
await this.initBinaryManager(); await this.initBinaryManager();
await this.initExternalHooks(); await this.initExternalHooks();
// Add event handlers
initEvents();
} }
async run() { async run() {

View File

@@ -15,7 +15,6 @@ import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { User } from '@db/entities/User'; import type { User } from '@db/entities/User';
import { getInstanceOwner } from '@/UserManagement/UserManagementHelper'; import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils'; import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand'; import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi'; import { Container } from 'typedi';
import type { import type {
@@ -183,9 +182,6 @@ export class ExecuteBatch extends BaseCommand {
await super.init(); await super.init();
await this.initBinaryManager(); await this.initBinaryManager();
await this.initExternalHooks(); await this.initExternalHooks();
// Add event handlers
initEvents();
} }
async run() { async run() {

View File

@@ -8,3 +8,4 @@ export { PasswordResetController } from './passwordReset.controller';
export { TagsController } from './tags.controller'; export { TagsController } from './tags.controller';
export { TranslationController } from './translation.controller'; export { TranslationController } from './translation.controller';
export { UsersController } from './users.controller'; export { UsersController } from './users.controller';
export { WorkflowStatisticsController } from './workflowStatistics.controller';

View File

@@ -0,0 +1,124 @@
import { Service } from 'typedi';
import { Response, NextFunction } from 'express';
import { ILogger } from 'n8n-workflow';
import { Get, Middleware, RestController } from '@/decorators';
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { SharedWorkflowRepository, WorkflowStatisticsRepository } from '@db/repositories';
import { ExecutionRequest } from '@/requests';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import { NotFoundError } from '@/ResponseHelper';
import type { IWorkflowStatisticsDataLoaded } from '@/Interfaces';
interface WorkflowStatisticsData<T> {
productionSuccess: T;
productionError: T;
manualSuccess: T;
manualError: T;
}
@Service()
@RestController('/workflow-stats')
export class WorkflowStatisticsController {
constructor(
private sharedWorkflowRepository: SharedWorkflowRepository,
private workflowStatisticsRepository: WorkflowStatisticsRepository,
private readonly logger: ILogger,
) {}
/**
* Check that the workflow ID is valid and allowed to be read by the user
*/
// TODO: move this into a new decorator `@ValidateWorkflowPermission`
@Middleware()
async hasWorkflowAccess(req: ExecutionRequest.Get, res: Response, next: NextFunction) {
const { user } = req;
const workflowId = req.params.id;
const allowed = await this.sharedWorkflowRepository.exist({
relations: ['workflow'],
where: whereClause({
user,
entityType: 'workflow',
entityId: workflowId,
}),
});
if (allowed) {
next();
} else {
this.logger.verbose('User attempted to read a workflow without permissions', {
workflowId,
userId: user.id,
});
// Otherwise, make and return an error
throw new NotFoundError(`Workflow ${workflowId} does not exist.`);
}
}
@Get('/:id/counts/')
async getCounts(req: ExecutionRequest.Get): Promise<WorkflowStatisticsData<number>> {
return this.getData(req.params.id, 'count', 0);
}
@Get('/:id/times/')
async getTimes(req: ExecutionRequest.Get): Promise<WorkflowStatisticsData<Date | null>> {
return this.getData(req.params.id, 'latestEvent', null);
}
@Get('/:id/data-loaded/')
async getDataLoaded(req: ExecutionRequest.Get): Promise<IWorkflowStatisticsDataLoaded> {
// Get flag
const workflowId = req.params.id;
// Get the flag
const stats = await this.workflowStatisticsRepository.findOne({
select: ['latestEvent'],
where: {
workflowId,
name: StatisticsNames.dataLoaded,
},
});
return {
dataLoaded: stats ? true : false,
};
}
private async getData<
C extends 'count' | 'latestEvent',
D = WorkflowStatistics[C] extends number ? 0 : null,
>(workflowId: string, columnName: C, defaultValue: WorkflowStatistics[C] | D) {
const stats = await this.workflowStatisticsRepository.find({
select: [columnName, 'name'],
where: { workflowId },
});
const data: WorkflowStatisticsData<WorkflowStatistics[C] | D> = {
productionSuccess: defaultValue,
productionError: defaultValue,
manualSuccess: defaultValue,
manualError: defaultValue,
};
stats.forEach(({ name, [columnName]: value }) => {
switch (name) {
case StatisticsNames.manualError:
data.manualError = value;
break;
case StatisticsNames.manualSuccess:
data.manualSuccess = value;
break;
case StatisticsNames.productionError:
data.productionError = value;
break;
case StatisticsNames.productionSuccess:
data.productionSuccess = value;
}
});
return data;
}
}

View File

@@ -1,10 +1,94 @@
import { Service } from 'typedi'; import { Service } from 'typedi';
import { DataSource, Repository } from 'typeorm'; import { DataSource, QueryFailedError, Repository } from 'typeorm';
import config from '@/config';
import type { StatisticsNames } from '../entities/WorkflowStatistics';
import { WorkflowStatistics } from '../entities/WorkflowStatistics'; import { WorkflowStatistics } from '../entities/WorkflowStatistics';
type StatisticsInsertResult = 'insert' | 'failed';
type StatisticsUpsertResult = StatisticsInsertResult | 'update';
@Service() @Service()
export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics> { export class WorkflowStatisticsRepository extends Repository<WorkflowStatistics> {
private readonly dbType = config.getEnv('database.type');
constructor(dataSource: DataSource) { constructor(dataSource: DataSource) {
super(WorkflowStatistics, dataSource.manager); super(WorkflowStatistics, dataSource.manager);
} }
async insertWorkflowStatistics(
eventName: StatisticsNames,
workflowId: string,
): Promise<StatisticsInsertResult> {
// Try to insert the data loaded statistic
try {
await this.insert({
workflowId,
name: eventName,
count: 1,
latestEvent: new Date(),
});
return 'insert';
} catch (error) {
// if it's a duplicate key error then that's fine, otherwise throw the error
if (!(error instanceof QueryFailedError)) {
throw error;
}
// If it is a query failed error, we return
return 'failed';
}
}
async upsertWorkflowStatistics(
eventName: StatisticsNames,
workflowId: string,
): Promise<StatisticsUpsertResult> {
const { tableName } = this.metadata;
try {
if (this.dbType === 'sqlite') {
await this.query(
`INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent")
VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP)
ON CONFLICT (workflowId, name)
DO UPDATE SET count = count + 1, latestEvent = CURRENT_TIMESTAMP`,
);
// SQLite does not offer a reliable way to know whether or not an insert or update happened.
// We'll use a naive approach in this case. Query again after and it might cause us to miss the
// first production execution sometimes due to concurrency, but it's the only way.
const counter = await this.findOne({
select: ['count'],
where: {
name: eventName,
workflowId,
},
});
return counter?.count === 1 ? 'insert' : 'failed';
} else if (this.dbType === 'postgresdb') {
const queryResult = (await this.query(
`INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent")
VALUES (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP)
ON CONFLICT ("name", "workflowId")
DO UPDATE SET "count" = "${tableName}"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP
RETURNING *;`,
)) as Array<{
count: number;
}>;
return queryResult[0].count === 1 ? 'insert' : 'update';
} else {
const queryResult = (await this.query(
`INSERT INTO \`${tableName}\` (count, name, workflowId, latestEvent)
VALUES (1, "${eventName}", "${workflowId}", NOW())
ON DUPLICATE KEY
UPDATE count = count + 1, latestEvent = NOW();`,
)) as {
affectedRows: number;
};
// MySQL returns 2 affected rows on update
return queryResult.affectedRows === 1 ? 'insert' : 'update';
}
} catch (error) {
if (error instanceof QueryFailedError) return 'failed';
throw error;
}
}
} }

View File

@@ -3,9 +3,6 @@ import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow';
import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses'; import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses';
import type { DateTime } from 'luxon'; import type { DateTime } from 'luxon';
import { Push } from '@/push'; import { Push } from '@/push';
import type { IPushDataExecutionRecovered } from '../../Interfaces';
import { workflowExecutionCompleted } from '../../events/WorkflowStatistics';
import { eventBus } from './MessageEventBus';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks'; import { InternalHooks } from '@/InternalHooks';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData';
@@ -193,16 +190,12 @@ export async function recoverExecutionDataFromEventLogMessages(
// execute workflowExecuteAfter hook to trigger error workflow // execute workflowExecuteAfter hook to trigger error workflow
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]); await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]);
// calling workflowExecutionCompleted directly because the eventEmitter is not up yet at this point const push = Container.get(Push);
await workflowExecutionCompleted(executionEntry.workflowData, iRunData);
// wait for UI to be back up and send the execution data // wait for UI to be back up and send the execution data
eventBus.once('editorUiConnected', function handleUiBackUp() { push.once('editorUiConnected', function handleUiBackUp() {
// add a small timeout to make sure the UI is back up // add a small timeout to make sure the UI is back up
setTimeout(() => { setTimeout(() => {
Container.get(Push).send('executionRecovered', { push.send('executionRecovered', { executionId });
executionId,
} as IPushDataExecutionRecovered);
}, 1000); }, 1000);
}); });
} }

View File

@@ -1,187 +0,0 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import config from '@/config';
import { UserService } from '@/user/user.service';
const enum StatisticsUpsertResult {
insert = 'insert',
update = 'update',
failed = 'failed',
}
async function upsertWorkflowStatistics(
eventName: StatisticsNames,
workflowId: string,
): Promise<StatisticsUpsertResult> {
const dbType = config.getEnv('database.type');
const { tableName } = Db.collections.WorkflowStatistics.metadata;
try {
if (dbType === 'sqlite') {
await Db.collections.WorkflowStatistics.query(
`INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent")
VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP)
ON CONFLICT (workflowId, name)
DO UPDATE SET count = count + 1, latestEvent = CURRENT_TIMESTAMP`,
);
// SQLite does not offer a reliable way to know whether or not an insert or update happened.
// We'll use a naive approach in this case. Query again after and it might cause us to miss the
// first production execution sometimes due to concurrency, but it's the only way.
const counter = await Db.collections.WorkflowStatistics.findOne({
select: ['count'],
where: {
name: eventName,
workflowId,
},
});
if (counter?.count === 1) {
return StatisticsUpsertResult.insert;
}
return StatisticsUpsertResult.update;
} else if (dbType === 'postgresdb') {
const queryResult = (await Db.collections.WorkflowStatistics.query(
`INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent")
VALUES (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP)
ON CONFLICT ("name", "workflowId")
DO UPDATE SET "count" = "${tableName}"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP
RETURNING *;`,
)) as Array<{
count: number;
}>;
if (queryResult[0].count === 1) {
return StatisticsUpsertResult.insert;
}
return StatisticsUpsertResult.update;
} else {
const queryResult = (await Db.collections.WorkflowStatistics.query(
`INSERT INTO \`${tableName}\` (count, name, workflowId, latestEvent)
VALUES (1, "${eventName}", "${workflowId}", NOW())
ON DUPLICATE KEY
UPDATE count = count + 1, latestEvent = NOW();`,
)) as {
affectedRows: number;
};
if (queryResult.affectedRows === 1) {
return StatisticsUpsertResult.insert;
}
// MySQL returns 2 affected rows on update
return StatisticsUpsertResult.update;
}
} catch (error) {
if (error instanceof QueryFailedError) return StatisticsUpsertResult.failed;
throw error;
}
}
export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
runData: IRun,
): Promise<void> {
// Determine the name of the statistic
const finished = runData.finished ? runData.finished : false;
const manual = runData.mode === 'manual';
let name: StatisticsNames;
if (finished) {
if (manual) name = StatisticsNames.manualSuccess;
else name = StatisticsNames.productionSuccess;
} else {
if (manual) name = StatisticsNames.manualError;
else name = StatisticsNames.productionError;
}
// Get the workflow id
const workflowId = workflowData.id;
if (!workflowId) return;
try {
const upsertResult = await upsertWorkflowStatistics(name, workflowId);
if (
name === StatisticsNames.productionSuccess &&
upsertResult === StatisticsUpsertResult.insert
) {
const owner = await getWorkflowOwner(workflowId);
const metrics = {
user_id: owner.id,
workflow_id: workflowId,
};
if (!owner.settings?.userActivated) {
await UserService.updateUserSettings(owner.id, {
firstSuccessfulWorkflowId: workflowId,
userActivated: true,
});
}
// Send the metrics
await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics);
}
} catch (error) {
LoggerProxy.verbose('Unable to fire first workflow success telemetry event');
}
}
export async function nodeFetchedData(
workflowId: string | undefined | null,
node: INode,
): Promise<void> {
if (!workflowId) return;
const hasLoadedDataPreviously = await Db.collections.WorkflowStatistics.findOne({
select: ['count'],
where: {
workflowId,
name: StatisticsNames.dataLoaded,
},
});
if (hasLoadedDataPreviously) {
return;
}
// Try to insert the data loaded statistic
try {
await Db.collections.WorkflowStatistics.createQueryBuilder('workflowStatistics')
.insert()
.values({
workflowId,
name: StatisticsNames.dataLoaded,
count: 1,
latestEvent: new Date(),
})
.orIgnore()
.execute();
} catch (error) {
LoggerProxy.warn('Failed saving loaded data statistics');
}
// Compile the metrics since this was a new data loaded event
const owner = await getWorkflowOwner(workflowId);
let metrics = {
user_id: owner.id,
workflow_id: workflowId,
node_type: node.type,
node_id: node.id,
};
// This is probably naive but I can't see a way for a node to have multiple credentials attached so..
if (node.credentials) {
Object.entries(node.credentials).forEach(([credName, credDetails]) => {
metrics = Object.assign(metrics, {
credential_type: credName,
credential_id: credDetails.id,
});
});
}
// Send metrics to posthog
await Container.get(InternalHooks).onFirstWorkflowDataLoad(metrics);
}

View File

@@ -1,14 +0,0 @@
import { eventEmitter } from 'n8n-core';
import { nodeFetchedData, workflowExecutionCompleted } from './WorkflowStatistics';
export function initEvents() {
if ('SKIP_STATISTICS_EVENTS' in process.env) return;
// Check for undefined as during testing these functions end up undefined for some reason
if (nodeFetchedData) {
eventEmitter.on(eventEmitter.types.nodeFetchedData, nodeFetchedData);
}
if (workflowExecutionCompleted) {
eventEmitter.on(eventEmitter.types.workflowExecutionCompleted, workflowExecutionCompleted);
}
}

View File

@@ -1,6 +1,5 @@
import { jsonStringify, LoggerProxy as Logger } from 'n8n-workflow'; import { jsonStringify, LoggerProxy as Logger } from 'n8n-workflow';
import type { IPushDataType } from '@/Interfaces'; import type { IPushDataType } from '@/Interfaces';
import { eventBus } from '../eventbus';
export abstract class AbstractPush<T> { export abstract class AbstractPush<T> {
protected connections: Record<string, T> = {}; protected connections: Record<string, T> = {};
@@ -11,7 +10,6 @@ export abstract class AbstractPush<T> {
protected add(sessionId: string, connection: T): void { protected add(sessionId: string, connection: T): void {
const { connections } = this; const { connections } = this;
Logger.debug('Add editor-UI session', { sessionId }); Logger.debug('Add editor-UI session', { sessionId });
eventBus.emit('editorUiConnected', sessionId);
const existingConnection = connections[sessionId]; const existingConnection = connections[sessionId];
if (existingConnection) { if (existingConnection) {

View File

@@ -1,3 +1,4 @@
import { EventEmitter } from 'events';
import { ServerResponse } from 'http'; import { ServerResponse } from 'http';
import type { Server } from 'http'; import type { Server } from 'http';
import type { Socket } from 'net'; import type { Socket } from 'net';
@@ -16,7 +17,7 @@ import type { IPushDataType } from '@/Interfaces';
const useWebSockets = config.getEnv('push.backend') === 'websocket'; const useWebSockets = config.getEnv('push.backend') === 'websocket';
@Service() @Service()
export class Push { export class Push extends EventEmitter {
private backend = useWebSockets ? new WebSocketPush() : new SSEPush(); private backend = useWebSockets ? new WebSocketPush() : new SSEPush();
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
@@ -27,6 +28,7 @@ export class Push {
} else { } else {
res.status(401).send('Unauthorized'); res.status(401).send('Unauthorized');
} }
this.emit('editorUiConnected', req.query.sessionId);
} }
send<D>(type: IPushDataType, data: D, sessionId: string | undefined = undefined) { send<D>(type: IPushDataType, data: D, sessionId: string | undefined = undefined) {

View File

@@ -0,0 +1,123 @@
import { EventEmitter } from 'events';
import { Service } from 'typedi';
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { WorkflowStatisticsRepository } from '@db/repositories';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { UserService } from '@/user/user.service';
@Service()
export class EventsService extends EventEmitter {
constructor(private repository: WorkflowStatisticsRepository) {
super({ captureRejections: true });
if ('SKIP_STATISTICS_EVENTS' in process.env) return;
this.on('nodeFetchedData', async (workflowId, node) => this.nodeFetchedData(workflowId, node));
this.on('workflowExecutionCompleted', async (workflowData, runData) =>
this.workflowExecutionCompleted(workflowData, runData),
);
}
async workflowExecutionCompleted(workflowData: IWorkflowBase, runData: IRun): Promise<void> {
// Determine the name of the statistic
const finished = runData.finished ? runData.finished : false;
const manual = runData.mode === 'manual';
let name: StatisticsNames;
if (finished) {
if (manual) name = StatisticsNames.manualSuccess;
else name = StatisticsNames.productionSuccess;
} else {
if (manual) name = StatisticsNames.manualError;
else name = StatisticsNames.productionError;
}
// Get the workflow id
const workflowId = workflowData.id;
if (!workflowId) return;
try {
const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId);
if (name === 'production_success' && upsertResult === 'insert') {
const owner = await getWorkflowOwner(workflowId);
const metrics = {
user_id: owner.id,
workflow_id: workflowId,
};
if (!owner.settings?.userActivated) {
await UserService.updateUserSettings(owner.id, {
firstSuccessfulWorkflowId: workflowId,
userActivated: true,
});
}
// Send the metrics
this.emit('telemetry.onFirstProductionWorkflowSuccess', metrics);
}
} catch (error) {
LoggerProxy.verbose('Unable to fire first workflow success telemetry event');
}
}
async nodeFetchedData(workflowId: string | undefined | null, node: INode): Promise<void> {
if (!workflowId) return;
const insertResult = await this.repository.insertWorkflowStatistics(
StatisticsNames.dataLoaded,
workflowId,
);
if (insertResult === 'failed') return;
// Compile the metrics since this was a new data loaded event
const owner = await getWorkflowOwner(workflowId);
let metrics = {
user_id: owner.id,
workflow_id: workflowId,
node_type: node.type,
node_id: node.id,
};
// This is probably naive but I can't see a way for a node to have multiple credentials attached so..
if (node.credentials) {
Object.entries(node.credentials).forEach(([credName, credDetails]) => {
metrics = Object.assign(metrics, {
credential_type: credName,
credential_id: credDetails.id,
});
});
}
// Send metrics to posthog
this.emit('telemetry.onFirstWorkflowDataLoad', metrics);
}
}
export declare interface EventsService {
on(
event: 'nodeFetchedData',
listener: (workflowId: string | undefined | null, node: INode) => void,
): this;
on(
event: 'workflowExecutionCompleted',
listener: (workflowData: IWorkflowBase, runData: IRun) => void,
): this;
on(
event: 'telemetry.onFirstProductionWorkflowSuccess',
listener: (metrics: { user_id: string; workflow_id: string }) => void,
): this;
on(
event: 'telemetry.onFirstWorkflowDataLoad',
listener: (metrics: {
user_id: string;
workflow_id: string;
node_type: string;
node_id: string;
credential_type?: string;
credential_id?: string;
}) => void,
): this;
}

View File

@@ -1,75 +1,61 @@
import type { IRun, WorkflowExecuteMode } from 'n8n-workflow'; import type { IRun, WorkflowExecuteMode, ILogger } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow';
import {
QueryFailedError,
type DataSource,
type EntityManager,
type EntityMetadata,
} from 'typeorm';
import { mocked } from 'jest-mock';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import config from '@/config'; import config from '@/config';
import * as Db from '@/Db'; import type { User } from '@db/entities/User';
import { User } from '@db/entities/User';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics'; import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
import type { WorkflowStatisticsRepository } from '@db/repositories'; import { WorkflowStatisticsRepository } from '@db/repositories';
import { nodeFetchedData, workflowExecutionCompleted } from '@/events/WorkflowStatistics'; import { EventsService } from '@/services/events.service';
import * as UserManagementHelper from '@/UserManagement/UserManagementHelper';
import { getLogger } from '@/Logger';
import { InternalHooks } from '@/InternalHooks';
import { mockInstance } from '../integration/shared/utils/';
import { UserService } from '@/user/user.service'; import { UserService } from '@/user/user.service';
import { WorkflowEntity } from '@db/entities/WorkflowEntity'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
jest.mock('@/Db', () => { jest.mock('@/UserManagement/UserManagementHelper', () => ({ getWorkflowOwner: jest.fn() }));
return {
collections: {
WorkflowStatistics: mock<WorkflowStatisticsRepository>({
metadata: { tableName: 'workflow_statistics' },
}),
},
};
});
jest.spyOn(UserService, 'updateUserSettings').mockImplementation(); describe('EventsService', () => {
describe('Events', () => {
const dbType = config.getEnv('database.type'); const dbType = config.getEnv('database.type');
const fakeUser = Object.assign(new User(), { id: 'abcde-fghij' }); const fakeUser = mock<User>({ id: 'abcde-fghij' });
const internalHooks = mockInstance(InternalHooks);
jest.spyOn(UserManagementHelper, 'getWorkflowOwner').mockResolvedValue(fakeUser); const entityManager = mock<EntityManager>();
const dataSource = mock<DataSource>({
manager: entityManager,
getMetadata: () =>
mock<EntityMetadata>({
tableName: 'workflow_statistics',
}),
});
Object.assign(entityManager, { connection: dataSource });
const workflowStatisticsRepository = Db.collections.WorkflowStatistics as ReturnType< LoggerProxy.init(mock<ILogger>());
typeof mock<WorkflowStatisticsRepository>
>;
beforeAll(() => {
config.set('diagnostics.enabled', true); config.set('diagnostics.enabled', true);
config.set('deployment.type', 'n8n-testing'); config.set('deployment.type', 'n8n-testing');
LoggerProxy.init(getLogger()); mocked(getWorkflowOwner).mockResolvedValue(fakeUser);
}); const updateUserSettingsMock = jest.spyOn(UserService, 'updateUserSettings').mockImplementation();
afterAll(() => { const eventsService = new EventsService(new WorkflowStatisticsRepository(dataSource));
jest.clearAllTimers();
jest.useRealTimers(); const onFirstProductionWorkflowSuccess = jest.fn();
}); const onFirstWorkflowDataLoad = jest.fn();
eventsService.on('telemetry.onFirstProductionWorkflowSuccess', onFirstProductionWorkflowSuccess);
eventsService.on('telemetry.onFirstWorkflowDataLoad', onFirstWorkflowDataLoad);
beforeEach(() => { beforeEach(() => {
if (dbType === 'sqlite') { jest.clearAllMocks();
workflowStatisticsRepository.findOne.mockClear();
} else {
workflowStatisticsRepository.query.mockClear();
}
internalHooks.onFirstProductionWorkflowSuccess.mockClear();
internalHooks.onFirstWorkflowDataLoad.mockClear();
}); });
const mockDBCall = (count = 1) => { const mockDBCall = (count = 1) => {
if (dbType === 'sqlite') { if (dbType === 'sqlite') {
workflowStatisticsRepository.findOne.mockResolvedValueOnce( entityManager.findOne.mockResolvedValueOnce(mock<WorkflowStatistics>({ count }));
mock<WorkflowStatistics>({ count }),
);
} else { } else {
const result = dbType === 'postgresdb' ? [{ count }] : { affectedRows: count }; const result = dbType === 'postgresdb' ? [{ count }] : { affectedRows: count };
workflowStatisticsRepository.query.mockImplementationOnce(async (query) => entityManager.query.mockImplementationOnce(async (query) =>
query.startsWith('INSERT INTO') ? result : null, query.startsWith('INSERT INTO') ? result : null,
); );
} }
@@ -96,9 +82,10 @@ describe('Events', () => {
}; };
mockDBCall(); mockDBCall();
await workflowExecutionCompleted(workflow, runData); await eventsService.workflowExecutionCompleted(workflow, runData);
expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(1); expect(updateUserSettingsMock).toHaveBeenCalledTimes(1);
expect(internalHooks.onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, { expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(1);
expect(onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, {
user_id: fakeUser.id, user_id: fakeUser.id,
workflow_id: workflow.id, workflow_id: workflow.id,
}); });
@@ -122,8 +109,8 @@ describe('Events', () => {
mode: 'internal' as WorkflowExecuteMode, mode: 'internal' as WorkflowExecuteMode,
startedAt: new Date(), startedAt: new Date(),
}; };
await workflowExecutionCompleted(workflow, runData); await eventsService.workflowExecutionCompleted(workflow, runData);
expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0); expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(0);
}); });
test('should not send metrics for updated entries', async () => { test('should not send metrics for updated entries', async () => {
@@ -145,8 +132,8 @@ describe('Events', () => {
startedAt: new Date(), startedAt: new Date(),
}; };
mockDBCall(2); mockDBCall(2);
await workflowExecutionCompleted(workflow, runData); await eventsService.workflowExecutionCompleted(workflow, runData);
expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0); expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(0);
}); });
}); });
@@ -162,9 +149,9 @@ describe('Events', () => {
position: [0, 0] as [number, number], position: [0, 0] as [number, number],
parameters: {}, parameters: {},
}; };
await nodeFetchedData(workflowId, node); await eventsService.nodeFetchedData(workflowId, node);
expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(1); expect(onFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(internalHooks.onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, { expect(onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: fakeUser.id, user_id: fakeUser.id,
workflow_id: workflowId, workflow_id: workflowId,
node_type: node.type, node_type: node.type,
@@ -189,9 +176,9 @@ describe('Events', () => {
}, },
}, },
}; };
await nodeFetchedData(workflowId, node); await eventsService.nodeFetchedData(workflowId, node);
expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(1); expect(onFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(internalHooks.onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, { expect(onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: fakeUser.id, user_id: fakeUser.id,
workflow_id: workflowId, workflow_id: workflowId,
node_type: node.type, node_type: node.type,
@@ -203,15 +190,7 @@ describe('Events', () => {
test('should not send metrics for entries that already have the flag set', async () => { test('should not send metrics for entries that already have the flag set', async () => {
// Fetch data for workflow 2 which is set up to not be altered in the mocks // Fetch data for workflow 2 which is set up to not be altered in the mocks
workflowStatisticsRepository.findOne.mockImplementationOnce(async () => { entityManager.insert.mockRejectedValueOnce(new QueryFailedError('', undefined, ''));
return {
count: 1,
name: StatisticsNames.dataLoaded,
latestEvent: new Date(),
workflowId: '2',
workflow: new WorkflowEntity(),
};
});
const workflowId = '1'; const workflowId = '1';
const node = { const node = {
id: 'abcde', id: 'abcde',
@@ -221,8 +200,8 @@ describe('Events', () => {
position: [0, 0] as [number, number], position: [0, 0] as [number, number],
parameters: {}, parameters: {},
}; };
await nodeFetchedData(workflowId, node); await eventsService.nodeFetchedData(workflowId, node);
expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(0); expect(onFirstWorkflowDataLoad).toBeCalledTimes(0);
}); });
}); });
}); });

View File

@@ -1,15 +0,0 @@
import EventEmitter from 'events';
interface EventTypes {
nodeFetchedData: string;
workflowExecutionCompleted: string;
}
class N8NEventEmitter extends EventEmitter {
types: EventTypes = {
nodeFetchedData: 'nodeFetchedData',
workflowExecutionCompleted: 'workflowExecutionCompleted',
};
}
export const eventEmitter = new N8NEventEmitter();

View File

@@ -1,4 +1,3 @@
import { eventEmitter } from './EventEmitter';
import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import * as UserSettings from './UserSettings'; import * as UserSettings from './UserSettings';
@@ -14,5 +13,5 @@ export * from './LoadNodeParameterOptions';
export * from './LoadNodeListSearch'; export * from './LoadNodeListSearch';
export * from './NodeExecuteFunctions'; export * from './NodeExecuteFunctions';
export * from './WorkflowExecute'; export * from './WorkflowExecute';
export { eventEmitter, NodeExecuteFunctions, UserSettings }; export { NodeExecuteFunctions, UserSettings };
export * from './errors'; export * from './errors';