diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index c57ee5df27..8aa6856663 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -655,24 +655,10 @@ export interface IWorkflowExecuteProcess { workflowExecute: WorkflowExecute; } -export interface IWorkflowStatisticsCounts { - productionSuccess: number; - productionError: number; - manualSuccess: number; - manualError: number; -} - export interface IWorkflowStatisticsDataLoaded { dataLoaded: boolean; } -export interface IWorkflowStatisticsTimestamps { - productionSuccess: Date | null; - productionError: Date | null; - manualSuccess: Date | null; - manualError: Date | null; -} - export type WhereClause = Record }>; // ---------------------------------- diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index c148e6c4d1..913bad4b0b 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -27,6 +27,7 @@ import { Telemetry } from '@/telemetry'; import type { AuthProviderType } from '@db/entities/AuthIdentity'; import { RoleService } from './role/role.service'; import { eventBus } from './eventbus'; +import { EventsService } from '@/services/events.service'; import type { User } from '@db/entities/User'; import { N8N_VERSION } from '@/constants'; import { NodeTypes } from './NodeTypes'; @@ -58,7 +59,15 @@ export class InternalHooks implements IInternalHooksClass { private nodeTypes: NodeTypes, private roleService: RoleService, private executionRepository: ExecutionRepository, - ) {} + eventsService: EventsService, + ) { + eventsService.on('telemetry.onFirstProductionWorkflowSuccess', async (metrics) => + this.onFirstProductionWorkflowSuccess(metrics), + ); + eventsService.on('telemetry.onFirstWorkflowDataLoad', async (metrics) => + this.onFirstWorkflowDataLoad(metrics), + ); + } async init(instanceId: string) { this.instanceId = instanceId; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 32083baef2..ac82739c05 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -97,10 +97,10 @@ import { TagsController, TranslationController, UsersController, + WorkflowStatisticsController, } from '@/controllers'; import { executionsController } from '@/executions/executions.controller'; -import { workflowStatsController } from '@/api/workflowStats.api'; import { isApiEnabled, loadPublicApiVersions } from '@/PublicApi'; import { getInstanceBaseUrl, @@ -136,7 +136,6 @@ import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBu import { licenseController } from './license/license.controller'; import { Push, setupPushServer, setupPushHandler } from '@/push'; import { setupAuthMiddlewares } from './middlewares'; -import { initEvents } from './events'; import { getLdapLoginLabel, handleLdapInit, @@ -383,9 +382,6 @@ export class Server extends AbstractServer { saml_enabled: isSamlCurrentAuthenticationMethod(), }; - // Set up event handling - initEvents(); - if (inDevelopment && process.env.N8N_DEV_RELOAD === 'true') { const { reloadNodesAndCredentials } = await import('@/ReloadNodesAndCredentials'); await reloadNodesAndCredentials(this.loadNodesAndCredentials, this.nodeTypes, this.push); @@ -496,6 +492,7 @@ export class Server extends AbstractServer { }), Container.get(SamlController), Container.get(SourceControlController), + Container.get(WorkflowStatisticsController), ]; if (isLdapEnabled()) { @@ -604,11 +601,6 @@ export class Server extends AbstractServer { // ---------------------------------------- this.app.use(`/${this.restEndpoint}/license`, licenseController); - // ---------------------------------------- - // Workflow Statistics - // ---------------------------------------- - this.app.use(`/${this.restEndpoint}/workflow-stats`, workflowStatsController); - // ---------------------------------------- // SAML // ---------------------------------------- diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 3d007cf574..c286d12a9e 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -17,8 +17,9 @@ import type express from 'express'; import get from 'lodash/get'; import stream from 'stream'; import { promisify } from 'util'; +import { Container } from 'typedi'; -import { BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core'; +import { BinaryDataManager, NodeExecuteFunctions } from 'n8n-core'; import type { IBinaryData, @@ -60,7 +61,7 @@ import { ActiveExecutions } from '@/ActiveExecutions'; import type { User } from '@db/entities/User'; import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; -import { Container } from 'typedi'; +import { EventsService } from '@/services/events.service'; const pipeline = promisify(stream.pipeline); @@ -243,7 +244,7 @@ export async function executeWebhook( NodeExecuteFunctions, executionMode, ); - eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflow.id, workflowStartNode); + Container.get(EventsService).emit('nodeFetchedData', workflow.id, workflowStartNode); } catch (err) { // Send error response to webhook caller const errorMessage = 'Workflow Webhook Error: Workflow could not be started!'; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 2ad83109fc..448a8dc3fa 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -15,7 +15,7 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable func-names */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { BinaryDataManager, eventEmitter, UserSettings, WorkflowExecute } from 'n8n-core'; +import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core'; import type { IDataObject, @@ -43,6 +43,7 @@ import { } from 'n8n-workflow'; import pick from 'lodash/pick'; +import { Container } from 'typedi'; import type { FindOptionsWhere } from 'typeorm'; import { LessThanOrEqual, In } from 'typeorm'; import { DateUtils } from 'typeorm/util/DateUtils'; @@ -67,10 +68,10 @@ import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; import { findSubworkflowStart, isWorkflowIdValid } from '@/utils'; import { PermissionChecker } from './UserManagement/PermissionChecker'; import { WorkflowsService } from './workflows/workflows.services'; -import { Container } from 'typedi'; import { InternalHooks } from '@/InternalHooks'; import type { ExecutionMetadata } from '@db/entities/ExecutionMetadata'; import { ExecutionRepository } from '@db/repositories'; +import { EventsService } from '@/services/events.service'; const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType'); @@ -273,6 +274,7 @@ export async function saveExecutionMetadata( * */ function hookFunctionsPush(): IWorkflowExecuteHooks { + const pushInstance = Container.get(Push); return { nodeExecuteBefore: [ async function (this: WorkflowHooks, nodeName: string): Promise { @@ -289,7 +291,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); - const pushInstance = Container.get(Push); pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId); }, ], @@ -307,7 +308,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { workflowId: this.workflowData.id, }); - const pushInstance = Container.get(Push); pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId); }, ], @@ -324,7 +324,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { if (sessionId === undefined) { return; } - const pushInstance = Container.get(Push); pushInstance.send( 'executionStarted', { @@ -390,7 +389,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { retryOf, }; - const pushInstance = Container.get(Push); pushInstance.send('executionFinished', sendData, sessionId); }, ], @@ -399,7 +397,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks { const externalHooks = Container.get(ExternalHooks); - return { workflowExecuteBefore: [ async function (this: WorkflowHooks, workflow: Workflow): Promise { @@ -514,23 +511,17 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx * */ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { + const internalHooks = Container.get(InternalHooks); + const eventsService = Container.get(EventsService); return { nodeExecuteBefore: [ async function (this: WorkflowHooks, nodeName: string): Promise { - void Container.get(InternalHooks).onNodeBeforeExecute( - this.executionId, - this.workflowData, - nodeName, - ); + void internalHooks.onNodeBeforeExecute(this.executionId, this.workflowData, nodeName); }, ], nodeExecuteAfter: [ async function (this: WorkflowHooks, nodeName: string): Promise { - void Container.get(InternalHooks).onNodePostExecute( - this.executionId, - this.workflowData, - nodeName, - ); + void internalHooks.onNodePostExecute(this.executionId, this.workflowData, nodeName); }, ], workflowExecuteBefore: [], @@ -711,17 +702,13 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { ); } } finally { - eventEmitter.emit( - eventEmitter.types.workflowExecutionCompleted, - this.workflowData, - fullRunData, - ); + eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData); } }, ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node); + eventsService.emit('nodeFetchedData', workflowId, node); }, ], }; @@ -734,6 +721,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { * */ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { + const eventsService = Container.get(EventsService); return { nodeExecuteBefore: [], nodeExecuteAfter: [], @@ -834,17 +822,13 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { this.retryOf, ); } finally { - eventEmitter.emit( - eventEmitter.types.workflowExecutionCompleted, - this.workflowData, - fullRunData, - ); + eventsService.emit('workflowExecutionCompleted', this.workflowData, fullRunData); } }, ], nodeFetchedData: [ async (workflowId: string, node: INode) => { - eventEmitter.emit(eventEmitter.types.nodeFetchedData, workflowId, node); + eventsService.emit('nodeFetchedData', workflowId, node); }, ], }; @@ -951,10 +935,12 @@ async function executeWorkflow( parentWorkflowSettings?: IWorkflowSettings; }, ): Promise | IWorkflowExecuteProcess> { + const internalHooks = Container.get(InternalHooks); const externalHooks = Container.get(ExternalHooks); await externalHooks.init(); const nodeTypes = Container.get(NodeTypes); + const activeExecutions = Container.get(ActiveExecutions); const workflowData = options.loadedWorkflowData ?? @@ -984,10 +970,10 @@ async function executeWorkflow( executionId = options.parentExecutionId !== undefined ? options.parentExecutionId - : await Container.get(ActiveExecutions).add(runData); + : await activeExecutions.add(runData); } - void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData); + void internalHooks.onWorkflowBeforeExecute(executionId || '', runData); let data; try { @@ -1077,7 +1063,7 @@ async function executeWorkflow( } // remove execution from active executions - Container.get(ActiveExecutions).remove(executionId, fullRunData); + activeExecutions.remove(executionId, fullRunData); await Container.get(ExecutionRepository).updateExistingExecution( executionId, @@ -1092,21 +1078,16 @@ async function executeWorkflow( await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); - void Container.get(InternalHooks).onWorkflowPostExecute( - executionId, - workflowData, - data, - additionalData.userId, - ); + void internalHooks.onWorkflowPostExecute(executionId, workflowData, data, additionalData.userId); if (data.finished === true) { // Workflow did finish successfully - Container.get(ActiveExecutions).remove(executionId, data); + activeExecutions.remove(executionId, data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); return returnData!.data!.main; } - Container.get(ActiveExecutions).remove(executionId, data); + activeExecutions.remove(executionId, data); // Workflow did fail const { error } = data.data.resultData; // eslint-disable-next-line @typescript-eslint/no-throw-literal diff --git a/packages/cli/src/api/workflowStats.api.ts b/packages/cli/src/api/workflowStats.api.ts deleted file mode 100644 index ed16888efe..0000000000 --- a/packages/cli/src/api/workflowStats.api.ts +++ /dev/null @@ -1,187 +0,0 @@ -import type { User } from '@db/entities/User'; -import { whereClause } from '@/UserManagement/UserManagementHelper'; -import express from 'express'; -import { LoggerProxy } from 'n8n-workflow'; -import * as Db from '@/Db'; -import * as ResponseHelper from '@/ResponseHelper'; -import type { - IWorkflowStatisticsCounts, - IWorkflowStatisticsDataLoaded, - IWorkflowStatisticsTimestamps, -} from '@/Interfaces'; -import { StatisticsNames } from '@db/entities/WorkflowStatistics'; -import { getLogger } from '../Logger'; -import type { ExecutionRequest } from '../requests'; - -export const workflowStatsController = express.Router(); - -// Helper function that validates the ID, return a flag stating whether the request is allowed -async function checkWorkflowId(workflowId: string, user: User): Promise { - // Check permissions - const shared = await Db.collections.SharedWorkflow.findOne({ - relations: ['workflow'], - where: whereClause({ - user, - entityType: 'workflow', - entityId: workflowId, - }), - }); - - if (!shared) { - LoggerProxy.verbose('User attempted to read a workflow without permissions', { - workflowId, - userId: user.id, - }); - return false; - } - return true; -} - -/** - * Initialize Logger if needed - */ -workflowStatsController.use((req, res, next) => { - try { - LoggerProxy.getInstance(); - } catch (error) { - LoggerProxy.init(getLogger()); - } - - next(); -}); - -/** - * Check that the workflow ID is valid and allowed to be read by the user - */ -workflowStatsController.use(async (req: ExecutionRequest.Get, res, next) => { - const allowed = await checkWorkflowId(req.params.id, req.user); - if (allowed) { - next(); - } else { - // Otherwise, make and return an error - const response = new ResponseHelper.NotFoundError(`Workflow ${req.params.id} does not exist.`); - next(response); - } -}); - -/** - * GET /workflow-stats/:id/counts/ - */ -workflowStatsController.get( - '/:id/counts/', - ResponseHelper.send(async (req: ExecutionRequest.Get): Promise => { - // Get counts from DB - const workflowId = req.params.id; - - // Find the stats for this workflow - const stats = await Db.collections.WorkflowStatistics.find({ - select: ['count', 'name'], - where: { - workflowId, - }, - }); - - const data: IWorkflowStatisticsCounts = { - productionSuccess: 0, - productionError: 0, - manualSuccess: 0, - manualError: 0, - }; - - // There will be a maximum of 4 stats (currently) - stats.forEach(({ count, name }) => { - switch (name) { - case StatisticsNames.manualError: - data.manualError = count; - break; - - case StatisticsNames.manualSuccess: - data.manualSuccess = count; - break; - - case StatisticsNames.productionError: - data.productionError = count; - break; - - case StatisticsNames.productionSuccess: - data.productionSuccess = count; - } - }); - - return data; - }), -); - -/** - * GET /workflow-stats/:id/times/ - */ -workflowStatsController.get( - '/:id/times/', - ResponseHelper.send(async (req: ExecutionRequest.Get): Promise => { - // Get times from DB - const workflowId = req.params.id; - - // Find the stats for this workflow - const stats = await Db.collections.WorkflowStatistics.find({ - select: ['latestEvent', 'name'], - where: { - workflowId, - }, - }); - - const data: IWorkflowStatisticsTimestamps = { - productionSuccess: null, - productionError: null, - manualSuccess: null, - manualError: null, - }; - - // There will be a maximum of 4 stats (currently) - stats.forEach(({ latestEvent, name }) => { - switch (name) { - case StatisticsNames.manualError: - data.manualError = latestEvent; - break; - - case StatisticsNames.manualSuccess: - data.manualSuccess = latestEvent; - break; - - case StatisticsNames.productionError: - data.productionError = latestEvent; - break; - - case StatisticsNames.productionSuccess: - data.productionSuccess = latestEvent; - } - }); - - return data; - }), -); - -/** - * GET /workflow-stats/:id/data-loaded/ - */ -workflowStatsController.get( - '/:id/data-loaded/', - ResponseHelper.send(async (req: ExecutionRequest.Get): Promise => { - // Get flag - const workflowId = req.params.id; - - // Get the flag - const stats = await Db.collections.WorkflowStatistics.findOne({ - select: ['latestEvent'], - where: { - workflowId, - name: StatisticsNames.dataLoaded, - }, - }); - - const data: IWorkflowStatisticsDataLoaded = { - dataLoaded: stats ? true : false, - }; - - return data; - }), -); diff --git a/packages/cli/src/commands/execute.ts b/packages/cli/src/commands/execute.ts index 16631a962e..fbccee762c 100644 --- a/packages/cli/src/commands/execute.ts +++ b/packages/cli/src/commands/execute.ts @@ -10,7 +10,6 @@ import { WorkflowRunner } from '@/WorkflowRunner'; import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import { getInstanceOwner } from '@/UserManagement/UserManagementHelper'; import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils'; -import { initEvents } from '@/events'; import { BaseCommand } from './BaseCommand'; import { Container } from 'typedi'; @@ -36,9 +35,6 @@ export class Execute extends BaseCommand { await super.init(); await this.initBinaryManager(); await this.initExternalHooks(); - - // Add event handlers - initEvents(); } async run() { diff --git a/packages/cli/src/commands/executeBatch.ts b/packages/cli/src/commands/executeBatch.ts index df34d32392..b1586b895a 100644 --- a/packages/cli/src/commands/executeBatch.ts +++ b/packages/cli/src/commands/executeBatch.ts @@ -15,7 +15,6 @@ import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces'; import type { User } from '@db/entities/User'; import { getInstanceOwner } from '@/UserManagement/UserManagementHelper'; import { findCliWorkflowStart } from '@/utils'; -import { initEvents } from '@/events'; import { BaseCommand } from './BaseCommand'; import { Container } from 'typedi'; import type { @@ -183,9 +182,6 @@ export class ExecuteBatch extends BaseCommand { await super.init(); await this.initBinaryManager(); await this.initExternalHooks(); - - // Add event handlers - initEvents(); } async run() { diff --git a/packages/cli/src/controllers/index.ts b/packages/cli/src/controllers/index.ts index 04b46af5f1..c3e742924e 100644 --- a/packages/cli/src/controllers/index.ts +++ b/packages/cli/src/controllers/index.ts @@ -8,3 +8,4 @@ export { PasswordResetController } from './passwordReset.controller'; export { TagsController } from './tags.controller'; export { TranslationController } from './translation.controller'; export { UsersController } from './users.controller'; +export { WorkflowStatisticsController } from './workflowStatistics.controller'; diff --git a/packages/cli/src/controllers/workflowStatistics.controller.ts b/packages/cli/src/controllers/workflowStatistics.controller.ts new file mode 100644 index 0000000000..d4760c605d --- /dev/null +++ b/packages/cli/src/controllers/workflowStatistics.controller.ts @@ -0,0 +1,124 @@ +import { Service } from 'typedi'; +import { Response, NextFunction } from 'express'; +import { ILogger } from 'n8n-workflow'; +import { Get, Middleware, RestController } from '@/decorators'; +import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics'; +import { StatisticsNames } from '@db/entities/WorkflowStatistics'; +import { SharedWorkflowRepository, WorkflowStatisticsRepository } from '@db/repositories'; +import { ExecutionRequest } from '@/requests'; +import { whereClause } from '@/UserManagement/UserManagementHelper'; +import { NotFoundError } from '@/ResponseHelper'; +import type { IWorkflowStatisticsDataLoaded } from '@/Interfaces'; + +interface WorkflowStatisticsData { + productionSuccess: T; + productionError: T; + manualSuccess: T; + manualError: T; +} + +@Service() +@RestController('/workflow-stats') +export class WorkflowStatisticsController { + constructor( + private sharedWorkflowRepository: SharedWorkflowRepository, + private workflowStatisticsRepository: WorkflowStatisticsRepository, + private readonly logger: ILogger, + ) {} + + /** + * Check that the workflow ID is valid and allowed to be read by the user + */ + // TODO: move this into a new decorator `@ValidateWorkflowPermission` + @Middleware() + async hasWorkflowAccess(req: ExecutionRequest.Get, res: Response, next: NextFunction) { + const { user } = req; + const workflowId = req.params.id; + const allowed = await this.sharedWorkflowRepository.exist({ + relations: ['workflow'], + where: whereClause({ + user, + entityType: 'workflow', + entityId: workflowId, + }), + }); + + if (allowed) { + next(); + } else { + this.logger.verbose('User attempted to read a workflow without permissions', { + workflowId, + userId: user.id, + }); + // Otherwise, make and return an error + throw new NotFoundError(`Workflow ${workflowId} does not exist.`); + } + } + + @Get('/:id/counts/') + async getCounts(req: ExecutionRequest.Get): Promise> { + return this.getData(req.params.id, 'count', 0); + } + + @Get('/:id/times/') + async getTimes(req: ExecutionRequest.Get): Promise> { + return this.getData(req.params.id, 'latestEvent', null); + } + + @Get('/:id/data-loaded/') + async getDataLoaded(req: ExecutionRequest.Get): Promise { + // Get flag + const workflowId = req.params.id; + + // Get the flag + const stats = await this.workflowStatisticsRepository.findOne({ + select: ['latestEvent'], + where: { + workflowId, + name: StatisticsNames.dataLoaded, + }, + }); + + return { + dataLoaded: stats ? true : false, + }; + } + + private async getData< + C extends 'count' | 'latestEvent', + D = WorkflowStatistics[C] extends number ? 0 : null, + >(workflowId: string, columnName: C, defaultValue: WorkflowStatistics[C] | D) { + const stats = await this.workflowStatisticsRepository.find({ + select: [columnName, 'name'], + where: { workflowId }, + }); + + const data: WorkflowStatisticsData = { + productionSuccess: defaultValue, + productionError: defaultValue, + manualSuccess: defaultValue, + manualError: defaultValue, + }; + + stats.forEach(({ name, [columnName]: value }) => { + switch (name) { + case StatisticsNames.manualError: + data.manualError = value; + break; + + case StatisticsNames.manualSuccess: + data.manualSuccess = value; + break; + + case StatisticsNames.productionError: + data.productionError = value; + break; + + case StatisticsNames.productionSuccess: + data.productionSuccess = value; + } + }); + + return data; + } +} diff --git a/packages/cli/src/databases/repositories/workflowStatistics.repository.ts b/packages/cli/src/databases/repositories/workflowStatistics.repository.ts index 90ebd510d0..3ce4ea6d83 100644 --- a/packages/cli/src/databases/repositories/workflowStatistics.repository.ts +++ b/packages/cli/src/databases/repositories/workflowStatistics.repository.ts @@ -1,10 +1,94 @@ import { Service } from 'typedi'; -import { DataSource, Repository } from 'typeorm'; +import { DataSource, QueryFailedError, Repository } from 'typeorm'; +import config from '@/config'; +import type { StatisticsNames } from '../entities/WorkflowStatistics'; import { WorkflowStatistics } from '../entities/WorkflowStatistics'; +type StatisticsInsertResult = 'insert' | 'failed'; +type StatisticsUpsertResult = StatisticsInsertResult | 'update'; + @Service() export class WorkflowStatisticsRepository extends Repository { + private readonly dbType = config.getEnv('database.type'); + constructor(dataSource: DataSource) { super(WorkflowStatistics, dataSource.manager); } + + async insertWorkflowStatistics( + eventName: StatisticsNames, + workflowId: string, + ): Promise { + // Try to insert the data loaded statistic + try { + await this.insert({ + workflowId, + name: eventName, + count: 1, + latestEvent: new Date(), + }); + return 'insert'; + } catch (error) { + // if it's a duplicate key error then that's fine, otherwise throw the error + if (!(error instanceof QueryFailedError)) { + throw error; + } + // If it is a query failed error, we return + return 'failed'; + } + } + + async upsertWorkflowStatistics( + eventName: StatisticsNames, + workflowId: string, + ): Promise { + const { tableName } = this.metadata; + try { + if (this.dbType === 'sqlite') { + await this.query( + `INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent") + VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP) + ON CONFLICT (workflowId, name) + DO UPDATE SET count = count + 1, latestEvent = CURRENT_TIMESTAMP`, + ); + // SQLite does not offer a reliable way to know whether or not an insert or update happened. + // We'll use a naive approach in this case. Query again after and it might cause us to miss the + // first production execution sometimes due to concurrency, but it's the only way. + const counter = await this.findOne({ + select: ['count'], + where: { + name: eventName, + workflowId, + }, + }); + + return counter?.count === 1 ? 'insert' : 'failed'; + } else if (this.dbType === 'postgresdb') { + const queryResult = (await this.query( + `INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent") + VALUES (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) + ON CONFLICT ("name", "workflowId") + DO UPDATE SET "count" = "${tableName}"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP + RETURNING *;`, + )) as Array<{ + count: number; + }>; + return queryResult[0].count === 1 ? 'insert' : 'update'; + } else { + const queryResult = (await this.query( + `INSERT INTO \`${tableName}\` (count, name, workflowId, latestEvent) + VALUES (1, "${eventName}", "${workflowId}", NOW()) + ON DUPLICATE KEY + UPDATE count = count + 1, latestEvent = NOW();`, + )) as { + affectedRows: number; + }; + // MySQL returns 2 affected rows on update + return queryResult.affectedRows === 1 ? 'insert' : 'update'; + } + } catch (error) { + if (error instanceof QueryFailedError) return 'failed'; + throw error; + } + } } diff --git a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts index 9e155d29f1..2cb718b8f8 100644 --- a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts +++ b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts @@ -3,9 +3,6 @@ import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow'; import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses'; import type { DateTime } from 'luxon'; import { Push } from '@/push'; -import type { IPushDataExecutionRecovered } from '../../Interfaces'; -import { workflowExecutionCompleted } from '../../events/WorkflowStatistics'; -import { eventBus } from './MessageEventBus'; import { Container } from 'typedi'; import { InternalHooks } from '@/InternalHooks'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; @@ -193,16 +190,12 @@ export async function recoverExecutionDataFromEventLogMessages( // execute workflowExecuteAfter hook to trigger error workflow await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]); - // calling workflowExecutionCompleted directly because the eventEmitter is not up yet at this point - await workflowExecutionCompleted(executionEntry.workflowData, iRunData); - + const push = Container.get(Push); // wait for UI to be back up and send the execution data - eventBus.once('editorUiConnected', function handleUiBackUp() { + push.once('editorUiConnected', function handleUiBackUp() { // add a small timeout to make sure the UI is back up setTimeout(() => { - Container.get(Push).send('executionRecovered', { - executionId, - } as IPushDataExecutionRecovered); + push.send('executionRecovered', { executionId }); }, 1000); }); } diff --git a/packages/cli/src/events/WorkflowStatistics.ts b/packages/cli/src/events/WorkflowStatistics.ts deleted file mode 100644 index b2a15b4919..0000000000 --- a/packages/cli/src/events/WorkflowStatistics.ts +++ /dev/null @@ -1,187 +0,0 @@ -import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; -import { LoggerProxy } from 'n8n-workflow'; -import * as Db from '@/Db'; -import { StatisticsNames } from '@db/entities/WorkflowStatistics'; -import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; -import { QueryFailedError } from 'typeorm'; -import { Container } from 'typedi'; -import { InternalHooks } from '@/InternalHooks'; -import config from '@/config'; -import { UserService } from '@/user/user.service'; - -const enum StatisticsUpsertResult { - insert = 'insert', - update = 'update', - failed = 'failed', -} - -async function upsertWorkflowStatistics( - eventName: StatisticsNames, - workflowId: string, -): Promise { - const dbType = config.getEnv('database.type'); - const { tableName } = Db.collections.WorkflowStatistics.metadata; - try { - if (dbType === 'sqlite') { - await Db.collections.WorkflowStatistics.query( - `INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent") - VALUES (1, "${eventName}", "${workflowId}", CURRENT_TIMESTAMP) - ON CONFLICT (workflowId, name) - DO UPDATE SET count = count + 1, latestEvent = CURRENT_TIMESTAMP`, - ); - // SQLite does not offer a reliable way to know whether or not an insert or update happened. - // We'll use a naive approach in this case. Query again after and it might cause us to miss the - // first production execution sometimes due to concurrency, but it's the only way. - - const counter = await Db.collections.WorkflowStatistics.findOne({ - select: ['count'], - where: { - name: eventName, - workflowId, - }, - }); - - if (counter?.count === 1) { - return StatisticsUpsertResult.insert; - } - return StatisticsUpsertResult.update; - } else if (dbType === 'postgresdb') { - const queryResult = (await Db.collections.WorkflowStatistics.query( - `INSERT INTO "${tableName}" ("count", "name", "workflowId", "latestEvent") - VALUES (1, '${eventName}', '${workflowId}', CURRENT_TIMESTAMP) - ON CONFLICT ("name", "workflowId") - DO UPDATE SET "count" = "${tableName}"."count" + 1, "latestEvent" = CURRENT_TIMESTAMP - RETURNING *;`, - )) as Array<{ - count: number; - }>; - if (queryResult[0].count === 1) { - return StatisticsUpsertResult.insert; - } - return StatisticsUpsertResult.update; - } else { - const queryResult = (await Db.collections.WorkflowStatistics.query( - `INSERT INTO \`${tableName}\` (count, name, workflowId, latestEvent) - VALUES (1, "${eventName}", "${workflowId}", NOW()) - ON DUPLICATE KEY - UPDATE count = count + 1, latestEvent = NOW();`, - )) as { - affectedRows: number; - }; - if (queryResult.affectedRows === 1) { - return StatisticsUpsertResult.insert; - } - // MySQL returns 2 affected rows on update - return StatisticsUpsertResult.update; - } - } catch (error) { - if (error instanceof QueryFailedError) return StatisticsUpsertResult.failed; - throw error; - } -} - -export async function workflowExecutionCompleted( - workflowData: IWorkflowBase, - runData: IRun, -): Promise { - // Determine the name of the statistic - const finished = runData.finished ? runData.finished : false; - const manual = runData.mode === 'manual'; - let name: StatisticsNames; - - if (finished) { - if (manual) name = StatisticsNames.manualSuccess; - else name = StatisticsNames.productionSuccess; - } else { - if (manual) name = StatisticsNames.manualError; - else name = StatisticsNames.productionError; - } - - // Get the workflow id - const workflowId = workflowData.id; - if (!workflowId) return; - - try { - const upsertResult = await upsertWorkflowStatistics(name, workflowId); - - if ( - name === StatisticsNames.productionSuccess && - upsertResult === StatisticsUpsertResult.insert - ) { - const owner = await getWorkflowOwner(workflowId); - const metrics = { - user_id: owner.id, - workflow_id: workflowId, - }; - - if (!owner.settings?.userActivated) { - await UserService.updateUserSettings(owner.id, { - firstSuccessfulWorkflowId: workflowId, - userActivated: true, - }); - } - - // Send the metrics - await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics); - } - } catch (error) { - LoggerProxy.verbose('Unable to fire first workflow success telemetry event'); - } -} - -export async function nodeFetchedData( - workflowId: string | undefined | null, - node: INode, -): Promise { - if (!workflowId) return; - - const hasLoadedDataPreviously = await Db.collections.WorkflowStatistics.findOne({ - select: ['count'], - where: { - workflowId, - name: StatisticsNames.dataLoaded, - }, - }); - - if (hasLoadedDataPreviously) { - return; - } - - // Try to insert the data loaded statistic - try { - await Db.collections.WorkflowStatistics.createQueryBuilder('workflowStatistics') - .insert() - .values({ - workflowId, - name: StatisticsNames.dataLoaded, - count: 1, - latestEvent: new Date(), - }) - .orIgnore() - .execute(); - } catch (error) { - LoggerProxy.warn('Failed saving loaded data statistics'); - } - - // Compile the metrics since this was a new data loaded event - const owner = await getWorkflowOwner(workflowId); - let metrics = { - user_id: owner.id, - workflow_id: workflowId, - node_type: node.type, - node_id: node.id, - }; - - // This is probably naive but I can't see a way for a node to have multiple credentials attached so.. - if (node.credentials) { - Object.entries(node.credentials).forEach(([credName, credDetails]) => { - metrics = Object.assign(metrics, { - credential_type: credName, - credential_id: credDetails.id, - }); - }); - } - - // Send metrics to posthog - await Container.get(InternalHooks).onFirstWorkflowDataLoad(metrics); -} diff --git a/packages/cli/src/events/index.ts b/packages/cli/src/events/index.ts deleted file mode 100644 index a73a848f75..0000000000 --- a/packages/cli/src/events/index.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { eventEmitter } from 'n8n-core'; -import { nodeFetchedData, workflowExecutionCompleted } from './WorkflowStatistics'; - -export function initEvents() { - if ('SKIP_STATISTICS_EVENTS' in process.env) return; - - // Check for undefined as during testing these functions end up undefined for some reason - if (nodeFetchedData) { - eventEmitter.on(eventEmitter.types.nodeFetchedData, nodeFetchedData); - } - if (workflowExecutionCompleted) { - eventEmitter.on(eventEmitter.types.workflowExecutionCompleted, workflowExecutionCompleted); - } -} diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index a9022bdc5c..8216e0c3b2 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,6 +1,5 @@ import { jsonStringify, LoggerProxy as Logger } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; -import { eventBus } from '../eventbus'; export abstract class AbstractPush { protected connections: Record = {}; @@ -11,7 +10,6 @@ export abstract class AbstractPush { protected add(sessionId: string, connection: T): void { const { connections } = this; Logger.debug('Add editor-UI session', { sessionId }); - eventBus.emit('editorUiConnected', sessionId); const existingConnection = connections[sessionId]; if (existingConnection) { diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 72289e79e3..09af08799d 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from 'events'; import { ServerResponse } from 'http'; import type { Server } from 'http'; import type { Socket } from 'net'; @@ -16,7 +17,7 @@ import type { IPushDataType } from '@/Interfaces'; const useWebSockets = config.getEnv('push.backend') === 'websocket'; @Service() -export class Push { +export class Push extends EventEmitter { private backend = useWebSockets ? new WebSocketPush() : new SSEPush(); handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { @@ -27,6 +28,7 @@ export class Push { } else { res.status(401).send('Unauthorized'); } + this.emit('editorUiConnected', req.query.sessionId); } send(type: IPushDataType, data: D, sessionId: string | undefined = undefined) { diff --git a/packages/cli/src/services/events.service.ts b/packages/cli/src/services/events.service.ts new file mode 100644 index 0000000000..ec288fa23b --- /dev/null +++ b/packages/cli/src/services/events.service.ts @@ -0,0 +1,123 @@ +import { EventEmitter } from 'events'; +import { Service } from 'typedi'; +import type { INode, IRun, IWorkflowBase } from 'n8n-workflow'; +import { LoggerProxy } from 'n8n-workflow'; +import { StatisticsNames } from '@db/entities/WorkflowStatistics'; +import { WorkflowStatisticsRepository } from '@db/repositories'; +import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; +import { UserService } from '@/user/user.service'; + +@Service() +export class EventsService extends EventEmitter { + constructor(private repository: WorkflowStatisticsRepository) { + super({ captureRejections: true }); + if ('SKIP_STATISTICS_EVENTS' in process.env) return; + + this.on('nodeFetchedData', async (workflowId, node) => this.nodeFetchedData(workflowId, node)); + this.on('workflowExecutionCompleted', async (workflowData, runData) => + this.workflowExecutionCompleted(workflowData, runData), + ); + } + + async workflowExecutionCompleted(workflowData: IWorkflowBase, runData: IRun): Promise { + // Determine the name of the statistic + const finished = runData.finished ? runData.finished : false; + const manual = runData.mode === 'manual'; + let name: StatisticsNames; + + if (finished) { + if (manual) name = StatisticsNames.manualSuccess; + else name = StatisticsNames.productionSuccess; + } else { + if (manual) name = StatisticsNames.manualError; + else name = StatisticsNames.productionError; + } + + // Get the workflow id + const workflowId = workflowData.id; + if (!workflowId) return; + + try { + const upsertResult = await this.repository.upsertWorkflowStatistics(name, workflowId); + + if (name === 'production_success' && upsertResult === 'insert') { + const owner = await getWorkflowOwner(workflowId); + const metrics = { + user_id: owner.id, + workflow_id: workflowId, + }; + + if (!owner.settings?.userActivated) { + await UserService.updateUserSettings(owner.id, { + firstSuccessfulWorkflowId: workflowId, + userActivated: true, + }); + } + + // Send the metrics + this.emit('telemetry.onFirstProductionWorkflowSuccess', metrics); + } + } catch (error) { + LoggerProxy.verbose('Unable to fire first workflow success telemetry event'); + } + } + + async nodeFetchedData(workflowId: string | undefined | null, node: INode): Promise { + if (!workflowId) return; + + const insertResult = await this.repository.insertWorkflowStatistics( + StatisticsNames.dataLoaded, + workflowId, + ); + if (insertResult === 'failed') return; + + // Compile the metrics since this was a new data loaded event + const owner = await getWorkflowOwner(workflowId); + + let metrics = { + user_id: owner.id, + workflow_id: workflowId, + node_type: node.type, + node_id: node.id, + }; + + // This is probably naive but I can't see a way for a node to have multiple credentials attached so.. + if (node.credentials) { + Object.entries(node.credentials).forEach(([credName, credDetails]) => { + metrics = Object.assign(metrics, { + credential_type: credName, + credential_id: credDetails.id, + }); + }); + } + + // Send metrics to posthog + this.emit('telemetry.onFirstWorkflowDataLoad', metrics); + } +} + +export declare interface EventsService { + on( + event: 'nodeFetchedData', + listener: (workflowId: string | undefined | null, node: INode) => void, + ): this; + on( + event: 'workflowExecutionCompleted', + listener: (workflowData: IWorkflowBase, runData: IRun) => void, + ): this; + on( + event: 'telemetry.onFirstProductionWorkflowSuccess', + listener: (metrics: { user_id: string; workflow_id: string }) => void, + ): this; + on( + event: 'telemetry.onFirstWorkflowDataLoad', + listener: (metrics: { + user_id: string; + workflow_id: string; + node_type: string; + node_id: string; + credential_type?: string; + credential_id?: string; + }) => void, + ): this; +} diff --git a/packages/cli/test/unit/Events.test.ts b/packages/cli/test/unit/services/events.service.test.ts similarity index 54% rename from packages/cli/test/unit/Events.test.ts rename to packages/cli/test/unit/services/events.service.test.ts index 452014a739..027e180ceb 100644 --- a/packages/cli/test/unit/Events.test.ts +++ b/packages/cli/test/unit/services/events.service.test.ts @@ -1,75 +1,61 @@ -import type { IRun, WorkflowExecuteMode } from 'n8n-workflow'; +import type { IRun, WorkflowExecuteMode, ILogger } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow'; +import { + QueryFailedError, + type DataSource, + type EntityManager, + type EntityMetadata, +} from 'typeorm'; +import { mocked } from 'jest-mock'; import { mock } from 'jest-mock-extended'; import config from '@/config'; -import * as Db from '@/Db'; -import { User } from '@db/entities/User'; -import { StatisticsNames } from '@db/entities/WorkflowStatistics'; +import type { User } from '@db/entities/User'; import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics'; -import type { WorkflowStatisticsRepository } from '@db/repositories'; -import { nodeFetchedData, workflowExecutionCompleted } from '@/events/WorkflowStatistics'; -import * as UserManagementHelper from '@/UserManagement/UserManagementHelper'; -import { getLogger } from '@/Logger'; -import { InternalHooks } from '@/InternalHooks'; - -import { mockInstance } from '../integration/shared/utils/'; +import { WorkflowStatisticsRepository } from '@db/repositories'; +import { EventsService } from '@/services/events.service'; import { UserService } from '@/user/user.service'; -import { WorkflowEntity } from '@db/entities/WorkflowEntity'; +import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; -jest.mock('@/Db', () => { - return { - collections: { - WorkflowStatistics: mock({ - metadata: { tableName: 'workflow_statistics' }, - }), - }, - }; -}); +jest.mock('@/UserManagement/UserManagementHelper', () => ({ getWorkflowOwner: jest.fn() })); -jest.spyOn(UserService, 'updateUserSettings').mockImplementation(); - -describe('Events', () => { +describe('EventsService', () => { const dbType = config.getEnv('database.type'); - const fakeUser = Object.assign(new User(), { id: 'abcde-fghij' }); - const internalHooks = mockInstance(InternalHooks); + const fakeUser = mock({ id: 'abcde-fghij' }); - jest.spyOn(UserManagementHelper, 'getWorkflowOwner').mockResolvedValue(fakeUser); - - const workflowStatisticsRepository = Db.collections.WorkflowStatistics as ReturnType< - typeof mock - >; - - beforeAll(() => { - config.set('diagnostics.enabled', true); - config.set('deployment.type', 'n8n-testing'); - LoggerProxy.init(getLogger()); + const entityManager = mock(); + const dataSource = mock({ + manager: entityManager, + getMetadata: () => + mock({ + tableName: 'workflow_statistics', + }), }); + Object.assign(entityManager, { connection: dataSource }); - afterAll(() => { - jest.clearAllTimers(); - jest.useRealTimers(); - }); + LoggerProxy.init(mock()); + config.set('diagnostics.enabled', true); + config.set('deployment.type', 'n8n-testing'); + mocked(getWorkflowOwner).mockResolvedValue(fakeUser); + const updateUserSettingsMock = jest.spyOn(UserService, 'updateUserSettings').mockImplementation(); + + const eventsService = new EventsService(new WorkflowStatisticsRepository(dataSource)); + + const onFirstProductionWorkflowSuccess = jest.fn(); + const onFirstWorkflowDataLoad = jest.fn(); + eventsService.on('telemetry.onFirstProductionWorkflowSuccess', onFirstProductionWorkflowSuccess); + eventsService.on('telemetry.onFirstWorkflowDataLoad', onFirstWorkflowDataLoad); beforeEach(() => { - if (dbType === 'sqlite') { - workflowStatisticsRepository.findOne.mockClear(); - } else { - workflowStatisticsRepository.query.mockClear(); - } - - internalHooks.onFirstProductionWorkflowSuccess.mockClear(); - internalHooks.onFirstWorkflowDataLoad.mockClear(); + jest.clearAllMocks(); }); const mockDBCall = (count = 1) => { if (dbType === 'sqlite') { - workflowStatisticsRepository.findOne.mockResolvedValueOnce( - mock({ count }), - ); + entityManager.findOne.mockResolvedValueOnce(mock({ count })); } else { const result = dbType === 'postgresdb' ? [{ count }] : { affectedRows: count }; - workflowStatisticsRepository.query.mockImplementationOnce(async (query) => + entityManager.query.mockImplementationOnce(async (query) => query.startsWith('INSERT INTO') ? result : null, ); } @@ -96,9 +82,10 @@ describe('Events', () => { }; mockDBCall(); - await workflowExecutionCompleted(workflow, runData); - expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(1); - expect(internalHooks.onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, { + await eventsService.workflowExecutionCompleted(workflow, runData); + expect(updateUserSettingsMock).toHaveBeenCalledTimes(1); + expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(1); + expect(onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, { user_id: fakeUser.id, workflow_id: workflow.id, }); @@ -122,8 +109,8 @@ describe('Events', () => { mode: 'internal' as WorkflowExecuteMode, startedAt: new Date(), }; - await workflowExecutionCompleted(workflow, runData); - expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0); + await eventsService.workflowExecutionCompleted(workflow, runData); + expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(0); }); test('should not send metrics for updated entries', async () => { @@ -145,8 +132,8 @@ describe('Events', () => { startedAt: new Date(), }; mockDBCall(2); - await workflowExecutionCompleted(workflow, runData); - expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0); + await eventsService.workflowExecutionCompleted(workflow, runData); + expect(onFirstProductionWorkflowSuccess).toBeCalledTimes(0); }); }); @@ -162,9 +149,9 @@ describe('Events', () => { position: [0, 0] as [number, number], parameters: {}, }; - await nodeFetchedData(workflowId, node); - expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(1); - expect(internalHooks.onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, { + await eventsService.nodeFetchedData(workflowId, node); + expect(onFirstWorkflowDataLoad).toBeCalledTimes(1); + expect(onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, { user_id: fakeUser.id, workflow_id: workflowId, node_type: node.type, @@ -189,9 +176,9 @@ describe('Events', () => { }, }, }; - await nodeFetchedData(workflowId, node); - expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(1); - expect(internalHooks.onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, { + await eventsService.nodeFetchedData(workflowId, node); + expect(onFirstWorkflowDataLoad).toBeCalledTimes(1); + expect(onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, { user_id: fakeUser.id, workflow_id: workflowId, node_type: node.type, @@ -203,15 +190,7 @@ describe('Events', () => { test('should not send metrics for entries that already have the flag set', async () => { // Fetch data for workflow 2 which is set up to not be altered in the mocks - workflowStatisticsRepository.findOne.mockImplementationOnce(async () => { - return { - count: 1, - name: StatisticsNames.dataLoaded, - latestEvent: new Date(), - workflowId: '2', - workflow: new WorkflowEntity(), - }; - }); + entityManager.insert.mockRejectedValueOnce(new QueryFailedError('', undefined, '')); const workflowId = '1'; const node = { id: 'abcde', @@ -221,8 +200,8 @@ describe('Events', () => { position: [0, 0] as [number, number], parameters: {}, }; - await nodeFetchedData(workflowId, node); - expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(0); + await eventsService.nodeFetchedData(workflowId, node); + expect(onFirstWorkflowDataLoad).toBeCalledTimes(0); }); }); }); diff --git a/packages/core/src/EventEmitter.ts b/packages/core/src/EventEmitter.ts deleted file mode 100644 index a5e7163356..0000000000 --- a/packages/core/src/EventEmitter.ts +++ /dev/null @@ -1,15 +0,0 @@ -import EventEmitter from 'events'; - -interface EventTypes { - nodeFetchedData: string; - workflowExecutionCompleted: string; -} - -class N8NEventEmitter extends EventEmitter { - types: EventTypes = { - nodeFetchedData: 'nodeFetchedData', - workflowExecutionCompleted: 'workflowExecutionCompleted', - }; -} - -export const eventEmitter = new N8NEventEmitter(); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 516c30282e..2b441605ee 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,4 +1,3 @@ -import { eventEmitter } from './EventEmitter'; import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import * as UserSettings from './UserSettings'; @@ -14,5 +13,5 @@ export * from './LoadNodeParameterOptions'; export * from './LoadNodeListSearch'; export * from './NodeExecuteFunctions'; export * from './WorkflowExecute'; -export { eventEmitter, NodeExecuteFunctions, UserSettings }; +export { NodeExecuteFunctions, UserSettings }; export * from './errors';