diff --git a/.vscode/launch.json b/.vscode/launch.json index cada9486ee..6e81451787 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -39,7 +39,7 @@ "name": "Launch n8n CLI dev with debug", "runtimeExecutable": "pnpm", "cwd": "${workspaceFolder}/packages/cli", - "runtimeArgs": ["run", "dev", "--", "--inspect-brk"], + "runtimeArgs": ["run", "buildAndDev", "--", "--inspect-brk"], "console": "integratedTerminal", "restart": true, "autoAttachChildProcesses": true, diff --git a/packages/cli/package.json b/packages/cli/package.json index 6b1f9227b3..6c7f275e80 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -22,7 +22,9 @@ "clean": "rimraf dist .turbo", "typecheck": "tsc", "build": "tsc -p tsconfig.build.json && tsc-alias -p tsconfig.build.json && node scripts/build.mjs", + "buildAndDev": "pnpm run build && pnpm run dev", "dev": "concurrently -k -n \"TypeScript,Node\" -c \"yellow.bold,cyan.bold\" \"npm run watch\" \"nodemon\"", + "dev:worker": "concurrently -k -n \"TypeScript,Node\" -c \"yellow.bold,cyan.bold\" \"npm run watch\" \"nodemon worker\"", "format": "prettier --write . --ignore-path ../../.prettierignore", "lint": "eslint --quiet .", "lintfix": "eslint . --fix", diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 681d5c1f21..23102daf4e 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -5,8 +5,13 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import type { IDeferredPromise, IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; -import { createDeferredPromise } from 'n8n-workflow'; +import type { + IDeferredPromise, + IExecuteResponsePromiseData, + IRun, + ExecutionStatus, +} from 'n8n-workflow'; +import { createDeferredPromise, LoggerProxy } from 'n8n-workflow'; import type { ChildProcess } from 'child_process'; import { stringify } from 'flatted'; @@ -36,6 +41,7 @@ export class ActiveExecutions { process?: ChildProcess, executionId?: string, ): Promise { + let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; if (executionId === undefined) { // Is a new execution so save in DB @@ -45,6 +51,7 @@ export class ActiveExecutions { finished: false, startedAt: new Date(), workflowData: executionData.workflowData, + status: executionStatus, }; if (executionData.retryOf !== undefined) { @@ -59,32 +66,37 @@ export class ActiveExecutions { const execution = ResponseHelper.flattenExecutionData(fullExecutionData); const executionResult = await Db.collections.Execution.save(execution as IExecutionFlattedDb); + // TODO: what is going on here? executionId = typeof executionResult.id === 'object' ? // @ts-ignore executionResult.id!.toString() : executionResult.id + ''; + if (executionId === undefined) { + throw new Error('There was an issue assigning an execution id to the execution'); + } + executionStatus = 'running'; } else { // Is an existing execution we want to finish so update in DB - const execution = { + const execution: Pick = { id: executionId, data: stringify(executionData.executionData!), waitTill: null, + status: executionStatus, }; await Db.collections.Execution.update(executionId, execution); } - // @ts-ignore this.activeExecutions[executionId] = { executionData, process, startedAt: new Date(), postExecutePromises: [], + status: executionStatus, }; - // @ts-ignore return executionId; } @@ -215,11 +227,31 @@ export class ActiveExecutions { startedAt: data.startedAt, mode: data.executionData.executionMode, workflowId: data.executionData.workflowData.id! as string, + status: data.status, }); } return returnData; } + + async setStatus(executionId: string, status: ExecutionStatus): Promise { + if (this.activeExecutions[executionId] === undefined) { + LoggerProxy.debug( + `There is no active execution with id "${executionId}", can't update status to ${status}.`, + ); + return; + } + + this.activeExecutions[executionId].status = status; + } + + getStatus(executionId: string): ExecutionStatus { + if (this.activeExecutions[executionId] === undefined) { + return 'unknown'; + } + + return this.activeExecutions[executionId].status; + } } let activeExecutionsInstance: ActiveExecutions | undefined; diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 4eddb668d2..cc803ee606 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -760,6 +760,7 @@ export class ActiveWorkflowRunner { mode, startedAt: new Date(), stoppedAt: new Date(), + status: 'running', }; WorkflowExecuteAdditionalData.executeErrorWorkflow(workflowData, fullRunData, mode); diff --git a/packages/cli/src/GenericHelpers.ts b/packages/cli/src/GenericHelpers.ts index e3940a620b..7d8011abde 100644 --- a/packages/cli/src/GenericHelpers.ts +++ b/packages/cli/src/GenericHelpers.ts @@ -190,6 +190,7 @@ export async function createErrorExecution( workflowData, workflowId: workflow.id, stoppedAt: new Date(), + status: 'new', }; const execution = ResponseHelper.flattenExecutionData(fullExecutionData); diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 2605c80929..8ec9b0ccbd 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -20,6 +20,8 @@ import type { Workflow, WorkflowActivateMode, WorkflowExecuteMode, + ExecutionStatus, + IExecutionsSummary, } from 'n8n-workflow'; import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; @@ -153,6 +155,7 @@ export interface IExecutionBase { finished: boolean; retryOf?: string; // If it is a retry, the id of the execution it is a retry of. retrySuccessId?: string; // If it failed and a retry did succeed. The id of the successful retry. + status: ExecutionStatus; } // Data in regular format with references @@ -188,6 +191,7 @@ export interface IExecutionFlattedDb extends IExecutionBase { data: string; waitTill?: Date | null; workflowData: Omit; + status: ExecutionStatus; } export interface IExecutionFlattedResponse extends IExecutionFlatted { @@ -222,25 +226,13 @@ export interface IExecutionsStopData { stoppedAt?: Date; } -export interface IExecutionsSummary { - id: string; - finished?: boolean; - mode: WorkflowExecuteMode; - retryOf?: string; - retrySuccessId?: string; - waitTill?: Date; - startedAt: Date; - stoppedAt?: Date; - workflowId: string; - workflowName?: string; -} - export interface IExecutionsCurrentSummary { id: string; retryOf?: string; startedAt: Date; mode: WorkflowExecuteMode; workflowId: string; + status?: ExecutionStatus; } export interface IExecutionDeleteFilter { @@ -256,6 +248,7 @@ export interface IExecutingWorkflowData { postExecutePromises: Array>; responsePromise?: IDeferredPromise; workflowExecution?: PCancelable; + status: ExecutionStatus; } export interface IExternalHooks { @@ -568,7 +561,13 @@ export type IPushData = | PushDataReloadNodeType | PushDataRemoveNodeType | PushDataTestWebhook - | PushDataNodeDescriptionUpdated; + | PushDataNodeDescriptionUpdated + | PushDataExecutionRecovered; + +type PushDataExecutionRecovered = { + data: IPushDataExecutionRecovered; + type: 'executionRecovered'; +}; type PushDataExecutionFinished = { data: IPushDataExecutionFinished; @@ -615,6 +614,10 @@ type PushDataNodeDescriptionUpdated = { type: 'nodeDescriptionUpdated'; }; +export interface IPushDataExecutionRecovered { + executionId: string; +} + export interface IPushDataExecutionFinished { data: IRun; executionId: string; diff --git a/packages/cli/src/InternalHooks.ts b/packages/cli/src/InternalHooks.ts index d97a5c98a3..84102e62e3 100644 --- a/packages/cli/src/InternalHooks.ts +++ b/packages/cli/src/InternalHooks.ts @@ -4,11 +4,13 @@ import { snakeCase } from 'change-case'; import { BinaryDataManager } from 'n8n-core'; import type { + ExecutionStatus, INodesGraphResult, INodeTypes, IRun, ITelemetryTrackProperties, IWorkflowBase, + WorkflowExecuteMode, } from 'n8n-workflow'; import { TelemetryHelpers } from 'n8n-workflow'; import { get as pslGet } from 'psl'; @@ -26,6 +28,7 @@ import { RoleService } from './role/role.service'; import { eventBus } from './eventbus'; import type { User } from '@db/entities/User'; import { N8N_VERSION } from '@/constants'; +import * as Db from '@/Db'; function userToPayload(user: User): { userId: string; @@ -220,6 +223,7 @@ export class InternalHooksClass implements IInternalHooksClass { data: IWorkflowExecutionDataProcess, ): Promise { void Promise.all([ + Db.collections.Execution.update(executionId, { status: 'running' }), eventBus.sendWorkflowEvent({ eventName: 'n8n.workflow.started', payload: { @@ -233,6 +237,24 @@ export class InternalHooksClass implements IInternalHooksClass { ]); } + async onWorkflowCrashed( + executionId: string, + executionMode: WorkflowExecuteMode, + workflowData?: IWorkflowBase, + ): Promise { + void Promise.all([ + eventBus.sendWorkflowEvent({ + eventName: 'n8n.workflow.crashed', + payload: { + executionId, + isManual: executionMode === 'manual', + workflowId: workflowData?.id?.toString(), + workflowName: workflowData?.name, + }, + }), + ]); + } + async onWorkflowPostExecute( executionId: string, workflow: IWorkflowBase, @@ -315,6 +337,7 @@ export class InternalHooksClass implements IInternalHooksClass { user_id: userId, workflow_id: workflow.id, status: properties.success ? 'success' : 'failed', + executionStatus: runData?.status ?? 'unknown', error_message: properties.error_message as string, error_node_type: properties.error_node_type, node_graph_string: properties.node_graph_string as string, @@ -363,6 +386,21 @@ export class InternalHooksClass implements IInternalHooksClass { } } + let executionStatus: ExecutionStatus; + if (runData?.status === 'crashed') { + executionStatus = 'crashed'; + } else if (runData?.status === 'waiting' || runData?.data?.waitTill) { + executionStatus = 'waiting'; + } else { + executionStatus = properties.success ? 'success' : 'failed'; + } + + promises.push( + Db.collections.Execution.update(executionId, { + status: executionStatus, + }) as unknown as Promise, + ); + promises.push( properties.success ? eventBus.sendWorkflowEvent({ diff --git a/packages/cli/src/PublicApi/types.d.ts b/packages/cli/src/PublicApi/types.d.ts index 481888d00d..b908f07b85 100644 --- a/packages/cli/src/PublicApi/types.d.ts +++ b/packages/cli/src/PublicApi/types.d.ts @@ -1,5 +1,5 @@ import type express from 'express'; -import type { IDataObject } from 'n8n-workflow'; +import type { IDataObject, ExecutionStatus } from 'n8n-workflow'; import type { User } from '@db/entities/User'; @@ -11,8 +11,6 @@ import type * as UserManagementMailer from '@/UserManagement/email/UserManagemen import type { Risk } from '@/audit/types'; -export type ExecutionStatus = 'error' | 'running' | 'success' | 'waiting' | null; - export type AuthlessRequest< RouteParams = {}, ResponseBody = {}, diff --git a/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts b/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts index fa0d5b0c61..4ae709dea6 100644 --- a/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts +++ b/packages/cli/src/PublicApi/v1/handlers/executions/executions.service.ts @@ -4,7 +4,7 @@ import { In, Not, Raw, LessThan, IsNull } from 'typeorm'; import * as Db from '@/Db'; import type { IExecutionFlattedDb, IExecutionResponseApi } from '@/Interfaces'; -import type { ExecutionStatus } from '@/PublicApi/types'; +import type { ExecutionStatus } from 'n8n-workflow'; function prepareExecutionData( execution: IExecutionFlattedDb | null, diff --git a/packages/cli/src/ResponseHelper.ts b/packages/cli/src/ResponseHelper.ts index 7cf88b385c..b7b22336d5 100644 --- a/packages/cli/src/ResponseHelper.ts +++ b/packages/cli/src/ResponseHelper.ts @@ -228,6 +228,7 @@ export function flattenExecutionData(fullExecutionData: IExecutionDb): IExecutio workflowId: fullExecutionData.workflowId, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion workflowData: fullExecutionData.workflowData!, + status: fullExecutionData.status, }; if (fullExecutionData.id !== undefined) { @@ -261,6 +262,7 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedDb): stoppedAt: fullExecutionData.stoppedAt, finished: fullExecutionData.finished ? fullExecutionData.finished : false, workflowId: fullExecutionData.workflowId, + status: fullExecutionData.status, }; return returnData; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 870a7e124c..87e9b7463e 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -21,7 +21,7 @@ import cookieParser from 'cookie-parser'; import express from 'express'; import type { ServeStaticOptions } from 'serve-static'; import type { FindManyOptions } from 'typeorm'; -import { In } from 'typeorm'; +import { Not, In } from 'typeorm'; import type { AxiosRequestConfig } from 'axios'; import axios from 'axios'; import type { RequestOptions } from 'oauth-1.0a'; @@ -45,6 +45,8 @@ import type { ITelemetrySettings, WorkflowExecuteMode, ICredentialTypes, + ExecutionStatus, + IExecutionsSummary, } from 'n8n-workflow'; import { LoggerProxy, jsonParse } from 'n8n-workflow'; @@ -109,7 +111,6 @@ import type { IDiagnosticInfo, IExecutionFlattedDb, IExecutionsStopData, - IExecutionsSummary, IN8nUISettings, } from '@/Interfaces'; import * as ActiveExecutions from '@/ActiveExecutions'; @@ -130,7 +131,6 @@ import { WaitTracker } from '@/WaitTracker'; import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { toHttpNodeParameters } from '@/CurlConverterHelper'; -import { eventBus } from '@/eventbus'; import { eventBusRouter } from '@/eventbus/eventBusRoutes'; import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper'; import { getLicense } from '@/License'; @@ -145,6 +145,7 @@ import { AbstractServer } from './AbstractServer'; import { configureMetrics } from './metrics'; import { setupBasicAuth } from './middlewares/basicAuth'; import { setupExternalJWTAuth } from './middlewares/externalJWTAuth'; +import { eventBus } from './eventbus'; import { isSamlEnabled } from './Saml/helpers'; const exec = promisify(callbackExec); @@ -980,10 +981,11 @@ class Server extends AbstractServer { if (!currentlyRunningExecutionIds.length) return []; const findOptions: FindManyOptions = { - select: ['id', 'workflowId', 'mode', 'retryOf', 'startedAt'], + select: ['id', 'workflowId', 'mode', 'retryOf', 'startedAt', 'stoppedAt', 'status'], order: { id: 'DESC' }, where: { id: In(currentlyRunningExecutionIds), + status: Not(In(['finished', 'stopped', 'failed', 'crashed'] as ExecutionStatus[])), }, }; @@ -992,10 +994,16 @@ class Server extends AbstractServer { if (!sharedWorkflowIds.length) return []; if (req.query.filter) { - const { workflowId } = jsonParse(req.query.filter); + const { workflowId, status, finished } = jsonParse(req.query.filter); if (workflowId && sharedWorkflowIds.includes(workflowId)) { Object.assign(findOptions.where!, { workflowId }); } + if (status) { + Object.assign(findOptions.where!, { status: In(status) }); + } + if (finished) { + Object.assign(findOptions.where!, { finished }); + } } else { Object.assign(findOptions.where!, { workflowId: In(sharedWorkflowIds) }); } @@ -1011,6 +1019,8 @@ class Server extends AbstractServer { mode: execution.mode, retryOf: execution.retryOf !== null ? execution.retryOf : undefined, startedAt: new Date(execution.startedAt), + status: execution.status ?? null, + stoppedAt: execution.stoppedAt ?? null, } as IExecutionsSummary; }); } diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index 98b292df73..06ca1c988d 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -11,7 +11,7 @@ import { WorkflowOperationError, } from 'n8n-workflow'; import type { FindManyOptions, ObjectLiteral } from 'typeorm'; -import { LessThanOrEqual } from 'typeorm'; +import { Not, LessThanOrEqual } from 'typeorm'; import { DateUtils } from 'typeorm/util/DateUtils'; import config from '@/config'; @@ -57,6 +57,7 @@ export class WaitTrackerClass { select: ['id', 'waitTill'], where: { waitTill: LessThanOrEqual(new Date(Date.now() + 70000)), + status: Not('crashed'), }, order: { waitTill: 'ASC', @@ -127,10 +128,13 @@ export class WaitTrackerClass { fullExecutionData.stoppedAt = new Date(); fullExecutionData.waitTill = undefined; + fullExecutionData.status = 'canceled'; await Db.collections.Execution.update( executionId, - ResponseHelper.flattenExecutionData(fullExecutionData), + ResponseHelper.flattenExecutionData({ + ...fullExecutionData, + }), ); return { diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 9f220dd164..e24a6b586e 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -33,6 +33,7 @@ import type { IWorkflowHooksOptionalParameters, IWorkflowSettings, WorkflowExecuteMode, + ExecutionStatus, } from 'n8n-workflow'; import { ErrorReporterProxy as ErrorReporter, @@ -335,16 +336,22 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { // Clone the object except the runData. That one is not supposed // to be send. Because that data got send piece by piece after // each node which finished executing - const pushRunData = { - ...fullRunData, - data: { - ...fullRunData.data, - resultData: { - ...fullRunData.data.resultData, - runData: {}, + // Edit: we now DO send the runData to the UI if mode=manual so that it shows the point of crashes + let pushRunData; + if (fullRunData.mode === 'manual') { + pushRunData = fullRunData; + } else { + pushRunData = { + ...fullRunData, + data: { + ...fullRunData.data, + resultData: { + ...fullRunData.data.resultData, + runData: {}, + }, }, - }, - }; + }; + } // Push data to editor-ui once workflow finished Logger.debug(`Save execution progress to database for execution ID ${executionId} `, { @@ -445,6 +452,8 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx // Set last executed node so that it may resume on failure fullExecutionData.data.resultData.lastNodeExecuted = nodeName; + fullExecutionData.status = 'running'; + const flattenedExecutionData = ResponseHelper.flattenExecutionData(fullExecutionData); await Db.collections.Execution.update( @@ -600,6 +609,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { stoppedAt: fullRunData.stoppedAt, workflowData: pristineWorkflowData, waitTill: fullRunData.waitTill, + status: fullRunData.status, }; if (this.retryOf !== undefined) { @@ -711,7 +721,11 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { } } - const workflowDidSucceed = !fullRunData.data.resultData.error; + const workflowHasCrashed = fullRunData.status === 'crashed'; + const workflowDidSucceed = !fullRunData.data.resultData.error && !workflowHasCrashed; + let workflowStatusFinal: ExecutionStatus = workflowDidSucceed ? 'success' : 'failed'; + if (workflowHasCrashed) workflowStatusFinal = 'crashed'; + if (!workflowDidSucceed) { executeErrorWorkflow( this.workflowData, @@ -730,6 +744,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { stoppedAt: fullRunData.stoppedAt, workflowData: this.workflowData, waitTill: fullRunData.data.waitTill, + status: workflowStatusFinal, }; if (this.retryOf !== undefined) { @@ -749,6 +764,11 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { executionData as IExecutionFlattedDb, ); + // For reasons(tm) the execution status is not updated correctly in the first update, so has to be written again (tbd) + await Db.collections.Execution.update(this.executionId, { + status: executionData.status, + }); + if (fullRunData.finished === true && this.retryOf !== undefined) { // If the retry was successful save the reference it on the original execution await Db.collections.Execution.update(this.retryOf, { @@ -995,6 +1015,7 @@ async function executeWorkflow( mode: 'integrated', startedAt: new Date(), stoppedAt: new Date(), + status: 'error', }; // When failing, we might not have finished the execution // Therefore, database might not contain finished errors. @@ -1006,6 +1027,7 @@ async function executeWorkflow( finished: fullRunData.finished ? fullRunData.finished : false, startedAt: fullRunData.startedAt, stoppedAt: fullRunData.stoppedAt, + status: fullRunData.status, workflowData, }; if (workflowData.id) { @@ -1048,6 +1070,19 @@ async function executeWorkflow( }; } +export function setExecutionStatus(status: ExecutionStatus) { + if (this.executionId === undefined) { + Logger.debug(`Setting execution status "${status}" failed because executionId is undefined`); + return; + } + Logger.debug(`Setting execution status for ${this.executionId} to "${status}"`); + ActiveExecutions.getInstance() + .setStatus(this.executionId, status) + .catch((error) => { + Logger.debug(`Setting execution status "${status}" failed: ${error.message}`); + }); +} + // eslint-disable-next-line @typescript-eslint/no-explicit-any export function sendMessageToUI(source: string, messages: any[]) { const { sessionId } = this; @@ -1101,6 +1136,7 @@ export async function getBase( currentNodeParameters, executionTimeoutTimestamp, userId, + setExecutionStatus, }; } diff --git a/packages/cli/src/WorkflowHelpers.ts b/packages/cli/src/WorkflowHelpers.ts index c4e19b89a5..958030b51a 100644 --- a/packages/cli/src/WorkflowHelpers.ts +++ b/packages/cli/src/WorkflowHelpers.ts @@ -484,6 +484,7 @@ export function generateFailedExecutionFromError( mode, startedAt: new Date(), stoppedAt: new Date(), + status: 'failed', }; } diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 0e0f90bbd4..8d61c13723 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -55,6 +55,8 @@ import { initErrorHandling } from '@/ErrorReporting'; import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import type { Push } from '@/push'; import { getPushInstance } from '@/push'; +import { eventBus } from './eventbus'; +import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents'; export class WorkflowRunner { activeExecutions: ActiveExecutions.ActiveExecutions; @@ -103,8 +105,40 @@ export class WorkflowRunner { mode: executionMode, startedAt, stoppedAt: new Date(), + status: 'error', }; + // The following will attempt to recover runData from event logs + // Note that this will only work as long as the event logs actually contain the events from this workflow execution + // Since processError is run almost immediately after the workflow execution has failed, it is likely that the event logs + // does contain those messages. + try { + // Search for messages for this executionId in event logs + const eventLogMessages = await eventBus.getEventsByExecutionId(executionId); + // Attempt to recover more better runData from these messages (but don't update the execution db entry yet) + if (eventLogMessages.length > 0) { + const eventLogExecutionData = await recoverExecutionDataFromEventLogMessages( + executionId, + eventLogMessages, + false, + ); + if (eventLogExecutionData) { + fullRunData.data.resultData.runData = eventLogExecutionData.resultData.runData; + fullRunData.status = 'crashed'; + } + } + + const executionFlattedData = await Db.collections.Execution.findOneBy({ id: executionId }); + + void InternalHooksManager.getInstance().onWorkflowCrashed( + executionId, + executionMode, + executionFlattedData?.workflowData, + ); + } catch { + // Ignore errors + } + // Remove from active execution with empty data. That will // set the execution to failed. this.activeExecutions.remove(executionId, fullRunData); @@ -287,6 +321,10 @@ export class WorkflowRunner { }, ]; + additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ + executionId, + }); + additionalData.sendMessageToUI = WorkflowExecuteAdditionalData.sendMessageToUI.bind({ sessionId: data.sessionId, }); @@ -354,6 +392,7 @@ export class WorkflowRunner { if (workflowExecution.isCanceled) { fullRunData.finished = false; } + fullRunData.status = this.activeExecutions.getStatus(executionId); this.activeExecutions.remove(executionId, fullRunData); }) .catch((error) => { @@ -708,7 +747,7 @@ export class WorkflowRunner { } // eslint-disable-next-line @typescript-eslint/await-thenable - await this.activeExecutions.remove(message.data.executionId, message.data.result); + this.activeExecutions.remove(message.data.executionId, message.data.result); } }); @@ -733,7 +772,7 @@ export class WorkflowRunner { ); // Process did exit with error code, so something went wrong. const executionError = new WorkflowOperationError( - 'Workflow execution process did crash for an unknown reason!', + 'Workflow execution process crashed for an unknown reason!', ); await this.processError( @@ -752,7 +791,7 @@ export class WorkflowRunner { // Instead of pending forever as executing when it // actually isn't anymore. // eslint-disable-next-line @typescript-eslint/await-thenable, no-await-in-loop - await this.activeExecutions.remove(executionId); + this.activeExecutions.remove(executionId); } clearTimeout(executionTimeout); diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 1b8b4246ed..b8fd32f773 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -181,6 +181,9 @@ class WorkflowRunnerProcess { additionalData.executionId = inputData.executionId; + additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({ + executionId: inputData.executionId, + }); // eslint-disable-next-line @typescript-eslint/no-explicit-any additionalData.sendMessageToUI = async (source: string, message: any) => { if (workflowRunner.data!.executionMode !== 'manual') { @@ -487,6 +490,7 @@ process.on('message', async (message: IProcessMessage) => { : ('own' as WorkflowExecuteMode), startedAt: workflowRunner.startedAt, stoppedAt: new Date(), + status: 'canceled', }; // eslint-disable-next-line @typescript-eslint/no-floating-promises diff --git a/packages/cli/src/commands/Interfaces.d.ts b/packages/cli/src/commands/Interfaces.d.ts index 9f423b575b..e06bb83abd 100644 --- a/packages/cli/src/commands/Interfaces.d.ts +++ b/packages/cli/src/commands/Interfaces.d.ts @@ -46,8 +46,6 @@ interface INodeSpecialCase { keepOnlyProperties?: string[]; } -type ExecutionStatus = 'success' | 'error' | 'warning' | 'running'; - declare module 'json-diff' { interface IDiffOptions { keysOnly?: boolean; diff --git a/packages/cli/src/commands/executeBatch.ts b/packages/cli/src/commands/executeBatch.ts index d12c7fabd8..897f5211ae 100644 --- a/packages/cli/src/commands/executeBatch.ts +++ b/packages/cli/src/commands/executeBatch.ts @@ -618,10 +618,10 @@ export class ExecuteBatch extends BaseCommand { const resultError = data.data.resultData.error; if (resultError) { // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - executionResult.error = resultError.hasOwnProperty('description') - ? // @ts-ignore - resultError.description - : resultError.message; + executionResult.error = + resultError.hasOwnProperty('description') && resultError.description !== null + ? resultError.description + : resultError.message; if (data.data.resultData.lastNodeExecuted !== undefined) { executionResult.error += ` on node ${data.data.resultData.lastNodeExecuted}`; } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 66073ed711..87d7b55bff 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -5,7 +5,7 @@ import type PCancelable from 'p-cancelable'; import { flags } from '@oclif/command'; import { WorkflowExecute } from 'n8n-core'; -import type { IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; +import type { ExecutionStatus, IExecuteResponsePromiseData, INodeTypes, IRun } from 'n8n-workflow'; import { Workflow, NodeOperationError, LoggerProxy, sleep } from 'n8n-workflow'; import * as Db from '@/Db'; @@ -189,6 +189,11 @@ export class Worker extends BaseCommand { additionalData.executionId = executionId; + additionalData.setExecutionStatus = (status: ExecutionStatus) => { + // Can't set the status directly in the queued worker, but it will happen in InternalHook.onWorkflowPostExecute + LoggerProxy.debug(`Queued worker execution status for ${executionId} is "${status}"`); + }; + let workflowExecute: WorkflowExecute; let workflowRun: PCancelable; if (currentExecutionDb.data !== undefined) { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 3fa9dc0cbb..9b91f411fb 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -1150,7 +1150,7 @@ export const schema = { maxFileSizeInKB: { doc: 'Maximum size of an event log file before a new one is started.', format: Number, - default: 102400, // 100MB + default: 10240, // 10MB env: 'N8N_EVENTBUS_LOGWRITER_MAXFILESIZEINKB', }, logBaseName: { diff --git a/packages/cli/src/databases/entities/ExecutionEntity.ts b/packages/cli/src/databases/entities/ExecutionEntity.ts index 41891f375c..2172f27003 100644 --- a/packages/cli/src/databases/entities/ExecutionEntity.ts +++ b/packages/cli/src/databases/entities/ExecutionEntity.ts @@ -1,4 +1,4 @@ -import { WorkflowExecuteMode } from 'n8n-workflow'; +import { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow'; import { Column, Entity, Generated, Index, PrimaryColumn } from 'typeorm'; import { datetimeColumnType, jsonColumnType } from './AbstractEntity'; import { IWorkflowDb } from '@/Interfaces'; @@ -31,6 +31,9 @@ export class ExecutionEntity implements IExecutionFlattedDb { @Column({ nullable: true }) retrySuccessId: string; + @Column('varchar', { nullable: true }) + status: ExecutionStatus; + @Column(datetimeColumnType) startedAt: Date; diff --git a/packages/cli/src/databases/migrations/mysqldb/1674138566000-AddStatusToExecutions.ts b/packages/cli/src/databases/migrations/mysqldb/1674138566000-AddStatusToExecutions.ts new file mode 100644 index 0000000000..ef41dc9758 --- /dev/null +++ b/packages/cli/src/databases/migrations/mysqldb/1674138566000-AddStatusToExecutions.ts @@ -0,0 +1,24 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; +import config from '@/config'; + +export class AddStatusToExecutions1674138566000 implements MigrationInterface { + name = 'AddStatusToExecutions1674138566000'; + public async up(queryRunner: QueryRunner): Promise { + logMigrationStart(this.name); + const tablePrefix = config.getEnv('database.tablePrefix'); + + await queryRunner.query( + // 'ALTER TABLE `' + tablePrefix + 'execution_entity` ADD COLUMN `status` varchar', + `ALTER TABLE \`${tablePrefix}execution_entity\` ADD COLUMN \`status\` VARCHAR(255)`, + ); + + logMigrationEnd(this.name); + } + + public async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.getEnv('database.tablePrefix'); + + await queryRunner.query(`ALTER TABLE \`${tablePrefix}execution_entity\` DROP COLUMN \`status\``); + } +} diff --git a/packages/cli/src/databases/migrations/mysqldb/index.ts b/packages/cli/src/databases/migrations/mysqldb/index.ts index 2c25db44b1..af0240b4d2 100644 --- a/packages/cli/src/databases/migrations/mysqldb/index.ts +++ b/packages/cli/src/databases/migrations/mysqldb/index.ts @@ -31,6 +31,7 @@ import { MessageEventBusDestinations1671535397530 } from './1671535397530-Messag import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows'; import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities'; import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections'; +import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions'; export const mysqlMigrations = [ InitialMigration1588157391238, @@ -65,5 +66,6 @@ export const mysqlMigrations = [ MessageEventBusDestinations1671535397530, DeleteExecutionsWithWorkflows1673268682475, CreateLdapEntities1674509946020, - PurgeInvalidWorkflowConnections1675940580449 + PurgeInvalidWorkflowConnections1675940580449, + AddStatusToExecutions1674138566000, ]; diff --git a/packages/cli/src/databases/migrations/postgresdb/1674138566000-AddStatusToExecutions.ts b/packages/cli/src/databases/migrations/postgresdb/1674138566000-AddStatusToExecutions.ts new file mode 100644 index 0000000000..b8cc0e2b00 --- /dev/null +++ b/packages/cli/src/databases/migrations/postgresdb/1674138566000-AddStatusToExecutions.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; +import config from '@/config'; + +export class AddStatusToExecutions1674138566000 implements MigrationInterface { + name = 'AddStatusToExecutions1674138566000'; + public async up(queryRunner: QueryRunner): Promise { + logMigrationStart(this.name); + const tablePrefix = config.getEnv('database.tablePrefix'); + + await queryRunner.query(`ALTER TABLE ${tablePrefix}execution_entity ADD COLUMN status varchar`); + + logMigrationEnd(this.name); + } + + public async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.getEnv('database.tablePrefix'); + + await queryRunner.query(`ALTER TABLE ${tablePrefix}execution_entity DROP COLUMN status`); + } +} diff --git a/packages/cli/src/databases/migrations/postgresdb/index.ts b/packages/cli/src/databases/migrations/postgresdb/index.ts index dde85c808b..1c19fa86bc 100644 --- a/packages/cli/src/databases/migrations/postgresdb/index.ts +++ b/packages/cli/src/databases/migrations/postgresdb/index.ts @@ -29,6 +29,7 @@ import { MessageEventBusDestinations1671535397530 } from './1671535397530-Messag import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows'; import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities'; import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections'; +import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions'; export const postgresMigrations = [ InitialMigration1587669153312, @@ -61,5 +62,6 @@ export const postgresMigrations = [ MessageEventBusDestinations1671535397530, DeleteExecutionsWithWorkflows1673268682475, CreateLdapEntities1674509946020, - PurgeInvalidWorkflowConnections1675940580449 + PurgeInvalidWorkflowConnections1675940580449, + AddStatusToExecutions1674138566000, ]; diff --git a/packages/cli/src/databases/migrations/sqlite/1674138566000-AddStatusToExecutions.ts b/packages/cli/src/databases/migrations/sqlite/1674138566000-AddStatusToExecutions.ts new file mode 100644 index 0000000000..b77a57f45e --- /dev/null +++ b/packages/cli/src/databases/migrations/sqlite/1674138566000-AddStatusToExecutions.ts @@ -0,0 +1,23 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; +import { logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers'; +import config from '@/config'; + +export class AddStatusToExecutions1674138566000 implements MigrationInterface { + name = 'AddStatusToExecutions1674138566000'; + public async up(queryRunner: QueryRunner): Promise { + logMigrationStart(this.name); + const tablePrefix = config.getEnv('database.tablePrefix'); + + await queryRunner.query( + `ALTER TABLE \`${tablePrefix}execution_entity\` ADD COLUMN "status" varchar`, + ); + + logMigrationEnd(this.name); + } + + public async down(queryRunner: QueryRunner): Promise { + const tablePrefix = config.getEnv('database.tablePrefix'); + + await queryRunner.query(`ALTER TABLE \`${tablePrefix}execution_entity\` DROP COLUMN "status"`); + } +} diff --git a/packages/cli/src/databases/migrations/sqlite/index.ts b/packages/cli/src/databases/migrations/sqlite/index.ts index 7325893d18..01cd22f226 100644 --- a/packages/cli/src/databases/migrations/sqlite/index.ts +++ b/packages/cli/src/databases/migrations/sqlite/index.ts @@ -28,6 +28,7 @@ import { MessageEventBusDestinations1671535397530 } from './1671535397530-Messag import { DeleteExecutionsWithWorkflows1673268682475 } from './1673268682475-DeleteExecutionsWithWorkflows'; import { CreateLdapEntities1674509946020 } from './1674509946020-CreateLdapEntities'; import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-PurgeInvalidWorkflowConnections'; +import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions'; const sqliteMigrations = [ InitialMigration1588102412422, @@ -60,6 +61,7 @@ const sqliteMigrations = [ DeleteExecutionsWithWorkflows1673268682475, CreateLdapEntities1674509946020, PurgeInvalidWorkflowConnections1675940580449, + AddStatusToExecutions1674138566000, ]; export { sqliteMigrations }; diff --git a/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts b/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts index 0b89bebc05..f71b03a413 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessage.ts @@ -4,6 +4,7 @@ import type { EventMessageTypeNames, JsonObject } from 'n8n-workflow'; import { v4 as uuid } from 'uuid'; import type { AbstractEventPayload } from './AbstractEventPayload'; import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; +import type { EventNamesTypes } from '.'; function modifyUnderscoredKeys( input: { [key: string]: any }, @@ -85,7 +86,7 @@ export abstract class AbstractEventMessage { ts: DateTime; - eventName: string; + eventName: EventNamesTypes; message: string; diff --git a/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessageOptions.ts b/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessageOptions.ts index 71842e4e30..95f3ef6432 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessageOptions.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/AbstractEventMessageOptions.ts @@ -1,12 +1,13 @@ import type { DateTime } from 'luxon'; import type { EventMessageTypeNames } from 'n8n-workflow'; +import type { EventNamesTypes } from '.'; import type { AbstractEventPayload } from './AbstractEventPayload'; export interface AbstractEventMessageOptions { __type?: EventMessageTypeNames; id?: string; ts?: DateTime | string; - eventName: string; + eventName: EventNamesTypes; message?: string; payload?: AbstractEventPayload; anonymize?: boolean; diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts index f349823382..737a1ebf0a 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageAudit.ts @@ -1,31 +1,9 @@ import { AbstractEventMessage, isEventMessageOptionsWithType } from './AbstractEventMessage'; -import type { JsonObject, JsonValue } from 'n8n-workflow'; import { EventMessageTypeNames } from 'n8n-workflow'; +import type { JsonObject, JsonValue } from 'n8n-workflow'; import type { AbstractEventPayload } from './AbstractEventPayload'; import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; - -export const eventNamesAudit = [ - 'n8n.audit.user.signedup', - 'n8n.audit.user.updated', - 'n8n.audit.user.deleted', - 'n8n.audit.user.invited', - 'n8n.audit.user.invitation.accepted', - 'n8n.audit.user.reinvited', - 'n8n.audit.user.email.failed', - 'n8n.audit.user.reset.requested', - 'n8n.audit.user.reset', - 'n8n.audit.user.credentials.created', - 'n8n.audit.user.credentials.shared', - 'n8n.audit.user.api.created', - 'n8n.audit.user.api.deleted', - 'n8n.audit.package.installed', - 'n8n.audit.package.updated', - 'n8n.audit.package.deleted', - 'n8n.audit.workflow.created', - 'n8n.audit.workflow.deleted', - 'n8n.audit.workflow.updated', -] as const; -export type EventNamesAuditType = (typeof eventNamesAudit)[number]; +import type { EventNamesAuditType } from '.'; // -------------------------------------- // EventMessage class for Audit events diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageNode.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageNode.ts index 072aafaa43..d9dda95e27 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/EventMessageNode.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageNode.ts @@ -3,9 +3,7 @@ import type { JsonObject } from 'n8n-workflow'; import { EventMessageTypeNames } from 'n8n-workflow'; import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; import type { AbstractEventPayload } from './AbstractEventPayload'; - -export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const; -export type EventNamesNodeType = (typeof eventNamesNode)[number]; +import type { EventNamesNodeType } from '.'; // -------------------------------------- // EventMessage class for Node events diff --git a/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts b/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts index 71acfd85ab..41cb442e39 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/EventMessageWorkflow.ts @@ -4,14 +4,7 @@ import { EventMessageTypeNames } from 'n8n-workflow'; import type { AbstractEventMessageOptions } from './AbstractEventMessageOptions'; import type { AbstractEventPayload } from './AbstractEventPayload'; import type { IExecutionBase } from '@/Interfaces'; - -export const eventNamesWorkflow = [ - 'n8n.workflow.started', - 'n8n.workflow.success', - 'n8n.workflow.failed', -] as const; - -export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; +import type { EventNamesWorkflowType } from '.'; // -------------------------------------- // EventMessage class for Workflow events diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index f49e1ae4f3..508b5031cf 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -1,12 +1,47 @@ -import type { EventMessageAudit, EventNamesAuditType } from './EventMessageAudit'; -import { eventNamesAudit } from './EventMessageAudit'; +import type { EventMessageAudit } from './EventMessageAudit'; import type { EventMessageGeneric } from './EventMessageGeneric'; -import type { EventMessageNode, EventNamesNodeType } from './EventMessageNode'; -import { eventNamesNode } from './EventMessageNode'; -import type { EventMessageWorkflow, EventNamesWorkflowType } from './EventMessageWorkflow'; -import { eventNamesWorkflow } from './EventMessageWorkflow'; +import type { EventMessageNode } from './EventMessageNode'; +import type { EventMessageWorkflow } from './EventMessageWorkflow'; + +export const eventNamesWorkflow = [ + 'n8n.workflow.started', + 'n8n.workflow.success', + 'n8n.workflow.failed', + 'n8n.workflow.crashed', +] as const; +export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const; +export const eventNamesAudit = [ + 'n8n.audit.user.signedup', + 'n8n.audit.user.updated', + 'n8n.audit.user.deleted', + 'n8n.audit.user.invited', + 'n8n.audit.user.invitation.accepted', + 'n8n.audit.user.reinvited', + 'n8n.audit.user.email.failed', + 'n8n.audit.user.reset.requested', + 'n8n.audit.user.reset', + 'n8n.audit.user.credentials.created', + 'n8n.audit.user.credentials.shared', + 'n8n.audit.user.api.created', + 'n8n.audit.user.api.deleted', + 'n8n.audit.package.installed', + 'n8n.audit.package.updated', + 'n8n.audit.package.deleted', + 'n8n.audit.workflow.created', + 'n8n.audit.workflow.deleted', + 'n8n.audit.workflow.updated', +] as const; + +export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; +export type EventNamesAuditType = (typeof eventNamesAudit)[number]; +export type EventNamesNodeType = (typeof eventNamesNode)[number]; + +export type EventNamesTypes = + | EventNamesAuditType + | EventNamesWorkflowType + | EventNamesNodeType + | 'n8n.destination.test'; -export type EventNamesTypes = EventNamesAuditType | EventNamesWorkflowType | EventNamesNodeType; export const eventNamesAll = [...eventNamesAudit, ...eventNamesWorkflow, ...eventNamesNode]; export type EventMessageTypes = diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 3956c373af..439e4773e6 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,5 +1,5 @@ -import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import { LoggerProxy } from 'n8n-workflow'; +import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { DeleteResult } from 'typeorm'; import type { EventMessageTypes } from '../EventMessageClasses/'; import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee'; @@ -24,10 +24,16 @@ import { EventMessageGeneric, eventMessageGenericDestinationTestEvent, } from '../EventMessageClasses/EventMessageGeneric'; +import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; -class MessageEventBus extends EventEmitter { +export interface MessageWithCallback { + msg: EventMessageTypes; + confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void; +} + +export class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; isInitialized: boolean; @@ -71,12 +77,13 @@ class MessageEventBus extends EventEmitter { if (savedEventDestinations.length > 0) { for (const destinationData of savedEventDestinations) { try { - const destination = messageEventBusDestinationFromDb(destinationData); + const destination = messageEventBusDestinationFromDb(this, destinationData); if (destination) { await this.addDestination(destination); } } catch (error) { - console.log(error); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + if (error.message) LoggerProxy.debug(error.message as string); } } } @@ -96,9 +103,13 @@ class MessageEventBus extends EventEmitter { this.logWriter?.startLogging(); await this.send(unsentAndUnfinished.unsentMessages); - if (unsentAndUnfinished.unfinishedExecutions.size > 0) { - for (const executionId of unsentAndUnfinished.unfinishedExecutions) { - LoggerProxy.debug(`Found unfinished execution ${executionId} in event log(s)`); + if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) { + for (const executionId of Object.keys(unsentAndUnfinished.unfinishedExecutions)) { + await recoverExecutionDataFromEventLogMessages( + executionId, + unsentAndUnfinished.unfinishedExecutions[executionId], + true, + ); } } @@ -181,12 +192,15 @@ class MessageEventBus extends EventEmitter { } async testDestination(destinationId: string): Promise { - const testMessage = new EventMessageGeneric({ + const msg = new EventMessageGeneric({ eventName: eventMessageGenericDestinationTestEvent, }); const destination = await this.findDestination(destinationId); if (destination.length > 0) { - const sendResult = await this.destinations[destinationId].receiveFromEventBus(testMessage); + const sendResult = await this.destinations[destinationId].receiveFromEventBus({ + msg, + confirmCallback: () => this.confirmSent(msg, { id: '0', name: 'eventBus' }), + }); return sendResult; } return false; @@ -212,17 +226,21 @@ class MessageEventBus extends EventEmitter { // generic emit for external modules to capture events // this is for internal use ONLY and not for use with custom destinations! - this.emit('message', msg); - - // LoggerProxy.debug(`Listeners: ${this.eventNames().join(',')}`); + this.emitMessageWithCallback('message', msg); if (this.shouldSendMsg(msg)) { for (const destinationName of Object.keys(this.destinations)) { - this.emit(this.destinations[destinationName].getId(), msg); + this.emitMessageWithCallback(this.destinations[destinationName].getId(), msg); } } } + private emitMessageWithCallback(eventName: string, msg: EventMessageTypes): boolean { + const confirmCallback = (message: EventMessageTypes, src: EventMessageConfirmSource) => + this.confirmSent(message, src); + return this.emit(eventName, msg, confirmCallback); + } + shouldSendMsg(msg: EventMessageTypes): boolean { return ( isLogStreamingEnabled() && @@ -249,14 +267,14 @@ class MessageEventBus extends EventEmitter { return filtered; } - async getUnfinishedExecutions(): Promise> { + async getUnfinishedExecutions(): Promise> { const queryResult = await this.logWriter?.getUnfinishedExecutions(); return queryResult; } async getUnsentAndUnfinishedExecutions(): Promise<{ unsentMessages: EventMessageTypes[]; - unfinishedExecutions: Set; + unfinishedExecutions: Record; }> { const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions(); return queryResult; diff --git a/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts new file mode 100644 index 0000000000..3647c888ec --- /dev/null +++ b/packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts @@ -0,0 +1,190 @@ +import { parse, stringify } from 'flatted'; +import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; +import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow'; +import * as Db from '@/Db'; +import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses'; +import type { DateTime } from 'luxon'; +import { InternalHooksManager } from '../../InternalHooksManager'; +import { getPushInstance } from '@/push'; +import type { IPushDataExecutionRecovered } from '../../Interfaces'; +import { workflowExecutionCompleted } from '../../events/WorkflowStatistics'; +import { eventBus } from './MessageEventBus'; + +export async function recoverExecutionDataFromEventLogMessages( + executionId: string, + messages: EventMessageTypes[], + applyToDb = true, +): Promise { + const executionEntry = await Db.collections.Execution.findOne({ + where: { + id: executionId, + }, + }); + + if (executionEntry && messages) { + let executionData: IRunExecutionData | undefined; + let workflowError: WorkflowOperationError | undefined; + try { + executionData = parse(executionEntry.data) as IRunExecutionData; + } catch {} + if (!executionData) { + executionData = { resultData: { runData: {} } }; + } + let nodeNames: string[] = []; + if ( + executionData?.resultData?.runData && + Object.keys(executionData.resultData.runData).length > 0 + ) { + } else { + if (!executionData.resultData) { + executionData.resultData = { + runData: {}, + }; + } else { + if (!executionData.resultData.runData) { + executionData.resultData.runData = {}; + } + } + } + nodeNames = executionEntry.workflowData.nodes.map((n) => n.name); + + let lastNodeRunTimestamp: DateTime | undefined = undefined; + + for (const nodeName of nodeNames) { + const nodeByName = executionEntry?.workflowData.nodes.find((n) => n.name === nodeName); + + if (!nodeByName) continue; + + const nodeStartedMessage = messages.find( + (message) => + message.eventName === 'n8n.node.started' && message.payload.nodeName === nodeName, + ); + const nodeFinishedMessage = messages.find( + (message) => + message.eventName === 'n8n.node.finished' && message.payload.nodeName === nodeName, + ); + + const executionTime = + nodeStartedMessage && nodeFinishedMessage + ? nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis() + : 0; + + let taskData: ITaskData; + if (executionData.resultData.runData[nodeName]?.length > 0) { + taskData = executionData.resultData.runData[nodeName][0]; + } else { + taskData = { + startTime: nodeStartedMessage ? nodeStartedMessage.ts.toUnixInteger() : 0, + executionTime, + source: [null], + executionStatus: 'unknown', + }; + } + + if (nodeStartedMessage && !nodeFinishedMessage) { + const nodeError = new NodeOperationError( + nodeByName, + 'Node crashed, possible out-of-memory issue', + { + message: 'Execution stopped at this node', + description: + "n8n may have run out of memory while executing it. More context and tips on how to avoid this in the docs", + }, + ); + workflowError = new WorkflowOperationError( + 'Workflow did not finish, possible out-of-memory issue', + ); + taskData.error = nodeError; + taskData.executionStatus = 'crashed'; + executionData.resultData.lastNodeExecuted = nodeName; + if (nodeStartedMessage) lastNodeRunTimestamp = nodeStartedMessage.ts; + } else if (nodeStartedMessage && nodeFinishedMessage) { + taskData.executionStatus = 'success'; + if (taskData.data === undefined) { + taskData.data = { + main: [ + [ + { + json: { + isArtificalRecoveredEventItem: true, + }, + pairedItem: undefined, + }, + ], + ], + }; + } + } + + if (!executionData.resultData.runData[nodeName]) { + executionData.resultData.runData[nodeName] = [taskData]; + } + } + + if (!executionData.resultData.error && workflowError) { + executionData.resultData.error = workflowError; + } + if (!lastNodeRunTimestamp) { + const workflowEndedMessage = messages.find((message) => + ( + [ + 'n8n.workflow.success', + 'n8n.workflow.crashed', + 'n8n.workflow.failed', + ] as EventNamesTypes[] + ).includes(message.eventName), + ); + if (workflowEndedMessage) { + lastNodeRunTimestamp = workflowEndedMessage.ts; + } else { + const workflowStartedMessage = messages.find( + (message) => message.eventName === 'n8n.workflow.started', + ); + if (workflowStartedMessage) { + lastNodeRunTimestamp = workflowStartedMessage.ts; + } + } + } + if (applyToDb) { + await Db.collections.Execution.update(executionId, { + data: stringify(executionData), + status: 'crashed', + stoppedAt: lastNodeRunTimestamp?.toJSDate(), + }); + const internalHooks = InternalHooksManager.getInstance(); + await internalHooks.onWorkflowPostExecute(executionId, executionEntry.workflowData, { + data: executionData, + finished: false, + mode: executionEntry.mode, + waitTill: executionEntry.waitTill ?? undefined, + startedAt: executionEntry.startedAt, + stoppedAt: lastNodeRunTimestamp?.toJSDate(), + status: 'crashed', + }); + const iRunData: IRun = { + data: executionData, + finished: false, + mode: executionEntry.mode, + waitTill: executionEntry.waitTill ?? undefined, + startedAt: executionEntry.startedAt, + stoppedAt: lastNodeRunTimestamp?.toJSDate(), + status: 'crashed', + }; + + // calling workflowExecutionCompleted directly because the eventEmitter is not up yet at this point + await workflowExecutionCompleted(executionEntry.workflowData, iRunData); + + // wait for UI to be back up and send the execution data + eventBus.once('editorUiConnected', function handleUiBackUp() { + // add a small timeout to make sure the UI is back up + setTimeout(() => { + getPushInstance().send('executionRecovered', { + executionId, + } as IPushDataExecutionRecovered); + }, 1000); + }); + } + return executionData; + } + return; +} diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts index a262468aae..301c38c213 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/Helpers.ee.ts @@ -1,5 +1,4 @@ -/* eslint-disable import/no-cycle */ -import type { EventDestinations } from '@db/entities/MessageEventBusDestinationEntity'; +import type { EventDestinations } from '@/databases/entities/MessageEventBusDestinationEntity'; import { promClient } from '@/metrics'; import { EventMessageTypeNames, @@ -12,21 +11,23 @@ import type { MessageEventBusDestination } from './MessageEventBusDestination.ee import { MessageEventBusDestinationSentry } from './MessageEventBusDestinationSentry.ee'; import { MessageEventBusDestinationSyslog } from './MessageEventBusDestinationSyslog.ee'; import { MessageEventBusDestinationWebhook } from './MessageEventBusDestinationWebhook.ee'; +import type { MessageEventBus } from '../MessageEventBus/MessageEventBus'; export function messageEventBusDestinationFromDb( + eventBusInstance: MessageEventBus, dbData: EventDestinations, ): MessageEventBusDestination | null { const destinationData = dbData.destination; if ('__type' in destinationData) { switch (destinationData.__type) { case MessageEventBusDestinationTypeNames.sentry: - return MessageEventBusDestinationSentry.deserialize(destinationData); + return MessageEventBusDestinationSentry.deserialize(eventBusInstance, destinationData); case MessageEventBusDestinationTypeNames.syslog: - return MessageEventBusDestinationSyslog.deserialize(destinationData); + return MessageEventBusDestinationSyslog.deserialize(eventBusInstance, destinationData); case MessageEventBusDestinationTypeNames.webhook: - return MessageEventBusDestinationWebhook.deserialize(destinationData); + return MessageEventBusDestinationWebhook.deserialize(eventBusInstance, destinationData); default: - console.log('MessageEventBusDestination __type unknown'); + LoggerProxy.debug('MessageEventBusDestination __type unknown'); } } return null; diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts index bd04cbedcd..0a5117b00b 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestination.ee.ts @@ -4,14 +4,17 @@ import { LoggerProxy, MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import * as Db from '@/Db'; import type { AbstractEventMessage } from '../EventMessageClasses/AbstractEventMessage'; import type { EventMessageTypes } from '../EventMessageClasses'; -import { eventBus } from '..'; import type { DeleteResult, InsertResult } from 'typeorm'; +import type { EventMessageConfirmSource } from '../EventMessageClasses/EventMessageConfirm'; +import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; export abstract class MessageEventBusDestination implements MessageEventBusDestinationOptions { // Since you can't have static abstract functions - this just serves as a reminder that you need to implement these. Please. // static abstract deserialize(): MessageEventBusDestination | null; readonly id: string; + readonly eventBusInstance: MessageEventBus; + __type: MessageEventBusDestinationTypeNames; label: string; @@ -24,7 +27,8 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti anonymizeAuditMessages: boolean; - constructor(options: MessageEventBusDestinationOptions) { + constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationOptions) { + this.eventBusInstance = eventBusInstance; this.id = !options.id || options.id.length !== 36 ? uuid() : options.id; this.__type = options.__type ?? MessageEventBusDestinationTypeNames.abstract; this.label = options.label ?? 'Log Destination'; @@ -37,15 +41,21 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti startListening() { if (this.enabled) { - eventBus.on(this.getId(), async (msg: EventMessageTypes) => { - await this.receiveFromEventBus(msg); - }); + this.eventBusInstance.on( + this.getId(), + async ( + msg: EventMessageTypes, + confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void, + ) => { + await this.receiveFromEventBus({ msg, confirmCallback }); + }, + ); LoggerProxy.debug(`${this.id} listener started`); } } stopListening() { - eventBus.removeAllListeners(this.getId()); + this.eventBusInstance.removeAllListeners(this.getId()); } enable() { @@ -81,7 +91,6 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti skipUpdateIfNoValuesChanged: true, conflictPaths: ['id'], }); - Db.collections.EventDestinations.createQueryBuilder().insert().into('something').onConflict(''); return dbResult; } @@ -105,7 +114,7 @@ export abstract class MessageEventBusDestination implements MessageEventBusDesti }; } - abstract receiveFromEventBus(msg: AbstractEventMessage): Promise; + abstract receiveFromEventBus(emitterPayload: MessageWithCallback): Promise; toString() { return JSON.stringify(this.serialize()); diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts index 4d515a83cc..ab1c4ca379 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee.ts @@ -3,16 +3,15 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { MessageEventBusDestination } from './MessageEventBusDestination.ee'; import * as Sentry from '@sentry/node'; -import { eventBus } from '../MessageEventBus/MessageEventBus'; +import { LoggerProxy, MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions, MessageEventBusDestinationSentryOptions, } from 'n8n-workflow'; -import { MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; -import type { EventMessageTypes } from '../EventMessageClasses'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; import { N8N_VERSION } from '@/constants'; +import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; export const isMessageEventBusDestinationSentryOptions = ( candidate: unknown, @@ -34,8 +33,8 @@ export class MessageEventBusDestinationSentry sentryClient?: Sentry.NodeClient; - constructor(options: MessageEventBusDestinationSentryOptions) { - super(options); + constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationSentryOptions) { + super(eventBusInstance, options); this.label = options.label ?? 'Sentry DSN'; this.__type = options.__type ?? MessageEventBusDestinationTypeNames.sentry; this.dsn = options.dsn; @@ -54,7 +53,8 @@ export class MessageEventBusDestinationSentry }); } - async receiveFromEventBus(msg: EventMessageTypes): Promise { + async receiveFromEventBus(emitterPayload: MessageWithCallback): Promise { + const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (!this.sentryClient) return sendResult; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { @@ -84,11 +84,12 @@ export class MessageEventBusDestinationSentry ); if (sentryResult) { - eventBus.confirmSent(msg, { id: this.id, name: this.label }); + // eventBus.confirmSent(msg, { id: this.id, name: this.label }); + confirmCallback(msg, { id: this.id, name: this.label }); sendResult = true; } } catch (error) { - console.log(error); + if (error.message) LoggerProxy.debug(error.message as string); } return sendResult; } @@ -104,6 +105,7 @@ export class MessageEventBusDestinationSentry } static deserialize( + eventBusInstance: MessageEventBus, data: MessageEventBusDestinationOptions, ): MessageEventBusDestinationSentry | null { if ( @@ -111,7 +113,7 @@ export class MessageEventBusDestinationSentry data.__type === MessageEventBusDestinationTypeNames.sentry && isMessageEventBusDestinationSentryOptions(data) ) { - return new MessageEventBusDestinationSentry(data); + return new MessageEventBusDestinationSentry(eventBusInstance, data); } return null; } diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts index fe7983abc8..13d6a1636c 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationSyslog.ee.ts @@ -2,7 +2,6 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ import syslog from 'syslog-client'; -import { eventBus } from '../MessageEventBus/MessageEventBus'; import type { MessageEventBusDestinationOptions, MessageEventBusDestinationSyslogOptions, @@ -10,8 +9,8 @@ import type { import { LoggerProxy, MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import { MessageEventBusDestination } from './MessageEventBusDestination.ee'; import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; -import type { EventMessageTypes } from '../EventMessageClasses'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; +import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; export const isMessageEventBusDestinationSyslogOptions = ( candidate: unknown, @@ -41,8 +40,8 @@ export class MessageEventBusDestinationSyslog eol: string; - constructor(options: MessageEventBusDestinationSyslogOptions) { - super(options); + constructor(eventBusInstance: MessageEventBus, options: MessageEventBusDestinationSyslogOptions) { + super(eventBusInstance, options); this.__type = options.__type ?? MessageEventBusDestinationTypeNames.syslog; this.label = options.label ?? 'Syslog Server'; @@ -70,7 +69,8 @@ export class MessageEventBusDestinationSyslog }); } - async receiveFromEventBus(msg: EventMessageTypes): Promise { + async receiveFromEventBus(emitterPayload: MessageWithCallback): Promise { + const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { if (!isLogStreamingEnabled()) return sendResult; @@ -92,16 +92,17 @@ export class MessageEventBusDestinationSyslog timestamp: msg.ts.toJSDate(), }, async (error) => { - if (error) { - console.log(error); + if (error?.message) { + LoggerProxy.debug(error.message); } else { - eventBus.confirmSent(msg, { id: this.id, name: this.label }); + // eventBus.confirmSent(msg, { id: this.id, name: this.label }); + confirmCallback(msg, { id: this.id, name: this.label }); sendResult = true; } }, ); } catch (error) { - console.log(error); + if (error.message) LoggerProxy.debug(error.message as string); } if (msg.eventName === eventMessageGenericDestinationTestEvent) { await new Promise((resolve) => setTimeout(resolve, 500)); @@ -124,6 +125,7 @@ export class MessageEventBusDestinationSyslog } static deserialize( + eventBusInstance: MessageEventBus, data: MessageEventBusDestinationOptions, ): MessageEventBusDestinationSyslog | null { if ( @@ -131,7 +133,7 @@ export class MessageEventBusDestinationSyslog data.__type === MessageEventBusDestinationTypeNames.syslog && isMessageEventBusDestinationSyslogOptions(data) ) { - return new MessageEventBusDestinationSyslog(data); + return new MessageEventBusDestinationSyslog(eventBusInstance, data); } return null; } diff --git a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts index 7cfee04af6..95dcd340d6 100644 --- a/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts +++ b/packages/cli/src/eventbus/MessageEventBusDestination/MessageEventBusDestinationWebhook.ee.ts @@ -1,4 +1,3 @@ -/* eslint-disable import/no-cycle */ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-return */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ @@ -6,23 +5,22 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ import { MessageEventBusDestination } from './MessageEventBusDestination.ee'; -import type { AxiosRequestConfig, Method } from 'axios'; import axios from 'axios'; -import { eventBus } from '../MessageEventBus/MessageEventBus'; -import type { EventMessageTypes } from '../EventMessageClasses'; +import type { AxiosRequestConfig, Method } from 'axios'; +import { jsonParse, LoggerProxy, MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions, MessageEventBusDestinationWebhookOptions, MessageEventBusDestinationWebhookParameterItem, MessageEventBusDestinationWebhookParameterOptions, } from 'n8n-workflow'; -import { jsonParse, LoggerProxy, MessageEventBusDestinationTypeNames } from 'n8n-workflow'; import { CredentialsHelper } from '@/CredentialsHelper'; import { UserSettings } from 'n8n-core'; import { Agent as HTTPSAgent } from 'https'; import config from '@/config'; import { isLogStreamingEnabled } from '../MessageEventBus/MessageEventBusHelper'; import { eventMessageGenericDestinationTestEvent } from '../EventMessageClasses/EventMessageGeneric'; +import type { MessageEventBus, MessageWithCallback } from '../MessageEventBus/MessageEventBus'; export const isMessageEventBusDestinationWebhookOptions = ( candidate: unknown, @@ -74,8 +72,11 @@ export class MessageEventBusDestinationWebhook axiosRequestOptions: AxiosRequestConfig; - constructor(options: MessageEventBusDestinationWebhookOptions) { - super(options); + constructor( + eventBusInstance: MessageEventBus, + options: MessageEventBusDestinationWebhookOptions, + ) { + super(eventBusInstance, options); this.url = options.url; this.label = options.label ?? 'Webhook Endpoint'; this.__type = options.__type ?? MessageEventBusDestinationTypeNames.webhook; @@ -246,6 +247,7 @@ export class MessageEventBusDestinationWebhook } static deserialize( + eventBusInstance: MessageEventBus, data: MessageEventBusDestinationOptions, ): MessageEventBusDestinationWebhook | null { if ( @@ -253,12 +255,13 @@ export class MessageEventBusDestinationWebhook data.__type === MessageEventBusDestinationTypeNames.webhook && isMessageEventBusDestinationWebhookOptions(data) ) { - return new MessageEventBusDestinationWebhook(data); + return new MessageEventBusDestinationWebhook(eventBusInstance, data); } return null; } - async receiveFromEventBus(msg: EventMessageTypes): Promise { + async receiveFromEventBus(emitterPayload: MessageWithCallback): Promise { + const { msg, confirmCallback } = emitterPayload; let sendResult = false; if (msg.eventName !== eventMessageGenericDestinationTestEvent) { if (!isLogStreamingEnabled()) return sendResult; @@ -345,13 +348,13 @@ export class MessageEventBusDestinationWebhook if (requestResponse) { if (this.responseCodeMustMatch) { if (requestResponse.status === this.expectedStatusCode) { - eventBus.confirmSent(msg, { id: this.id, name: this.label }); + confirmCallback(msg, { id: this.id, name: this.label }); sendResult = true; } else { sendResult = false; } } else { - eventBus.confirmSent(msg, { id: this.id, name: this.label }); + confirmCallback(msg, { id: this.id, name: this.label }); sendResult = true; } } diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 44e76339f2..c8dc5f74ba 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -36,7 +36,7 @@ export interface MessageEventBusLogWriterOptions { interface ReadMessagesFromLogFileResult { loggedMessages: EventMessageTypes[]; sentMessages: EventMessageTypes[]; - unfinishedExecutions: Set; + unfinishedExecutions: Record; } /** @@ -156,7 +156,7 @@ export class MessageEventBusLogWriter { const results: ReadMessagesFromLogFileResult = { loggedMessages: [], sentMessages: [], - unfinishedExecutions: new Set(), + unfinishedExecutions: {}, }; const logCount = logHistory ? Math.min(config.get('eventBus.logWriter.keepLogCount') as number, logHistory) @@ -188,14 +188,28 @@ export class MessageEventBusLogWriter { if (isEventMessageOptions(json) && json.__type !== undefined) { const msg = getEventMessageObjectByType(json); if (msg !== null) results.loggedMessages.push(msg); - if (msg?.eventName === 'n8n.workflow.started' && msg?.payload?.executionId) { - results.unfinishedExecutions.add(msg?.payload?.executionId as string); - } else if ( - (msg?.eventName === 'n8n.workflow.success' || - msg?.eventName === 'n8n.workflow.failed') && - msg?.payload?.executionId - ) { - results.unfinishedExecutions.delete(msg?.payload?.executionId as string); + if (msg?.eventName && msg.payload?.executionId) { + const executionId = msg.payload.executionId as string; + switch (msg.eventName) { + case 'n8n.workflow.started': + if (!Object.keys(results.unfinishedExecutions).includes(executionId)) { + results.unfinishedExecutions[executionId] = []; + } + results.unfinishedExecutions[executionId] = [msg]; + break; + case 'n8n.workflow.success': + case 'n8n.workflow.failed': + case 'n8n.workflow.crashed': + delete results.unfinishedExecutions[executionId]; + break; + case 'n8n.node.started': + case 'n8n.node.finished': + if (!Object.keys(results.unfinishedExecutions).includes(executionId)) { + results.unfinishedExecutions[executionId] = []; + } + results.unfinishedExecutions[executionId].push(msg); + break; + } } } if (isEventMessageConfirm(json) && mode !== 'all') { @@ -204,9 +218,10 @@ export class MessageEventBusLogWriter { results.sentMessages.push(...removedMessage); } } - } catch { + } catch (error) { LoggerProxy.error( - `Error reading line messages from file: ${logFileName}, line: ${line}`, + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Error reading line messages from file: ${logFileName}, line: ${line}, ${error.message}}`, ); } }); @@ -301,13 +316,13 @@ export class MessageEventBusLogWriter { return (await this.getMessages('unsent')).loggedMessages; } - async getUnfinishedExecutions(): Promise> { + async getUnfinishedExecutions(): Promise> { return (await this.getMessages('unfinished')).unfinishedExecutions; } async getUnsentAndUnfinishedExecutions(): Promise<{ unsentMessages: EventMessageTypes[]; - unfinishedExecutions: Set; + unfinishedExecutions: Record; }> { const result = await this.getMessages('unsent'); return { diff --git a/packages/cli/src/eventbus/eventBusRoutes.ts b/packages/cli/src/eventbus/eventBusRoutes.ts index bf30e295f0..26d76e6c39 100644 --- a/packages/cli/src/eventbus/eventBusRoutes.ts +++ b/packages/cli/src/eventbus/eventBusRoutes.ts @@ -30,6 +30,7 @@ import type { User } from '../databases/entities/User'; import * as ResponseHelper from '@/ResponseHelper'; import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode'; import { EventMessageNode } from './EventMessageClasses/EventMessageNode'; +import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents'; export const eventBusRouter = express.Router(); @@ -102,6 +103,32 @@ eventBusRouter.get( }), ); +eventBusRouter.get( + '/execution-recover/:id', + ResponseHelper.send(async (req: express.Request): Promise => { + if (req.params?.id) { + let logHistory; + let applyToDb = true; + if (req.query?.logHistory) { + logHistory = parseInt(req.query.logHistory as string, 10); + } + if (req.query?.applyToDb) { + applyToDb = !!req.query.applyToDb; + } + const messages = await eventBus.getEventsByExecutionId(req.params.id, logHistory); + if (messages.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const recoverResult = await recoverExecutionDataFromEventLogMessages( + req.params.id, + messages, + applyToDb, + ); + return recoverResult; + } + } + }), +); + eventBusRouter.post( '/event', ResponseHelper.send(async (req: express.Request): Promise => { @@ -159,17 +186,23 @@ eventBusRouter.post( switch (req.body.__type) { case MessageEventBusDestinationTypeNames.sentry: if (isMessageEventBusDestinationSentryOptions(req.body)) { - result = await eventBus.addDestination(new MessageEventBusDestinationSentry(req.body)); + result = await eventBus.addDestination( + new MessageEventBusDestinationSentry(eventBus, req.body), + ); } break; case MessageEventBusDestinationTypeNames.webhook: if (isMessageEventBusDestinationWebhookOptions(req.body)) { - result = await eventBus.addDestination(new MessageEventBusDestinationWebhook(req.body)); + result = await eventBus.addDestination( + new MessageEventBusDestinationWebhook(eventBus, req.body), + ); } break; case MessageEventBusDestinationTypeNames.syslog: if (isMessageEventBusDestinationSyslogOptions(req.body)) { - result = await eventBus.addDestination(new MessageEventBusDestinationSyslog(req.body)); + result = await eventBus.addDestination( + new MessageEventBusDestinationSyslog(eventBus, req.body), + ); } break; default: @@ -180,7 +213,10 @@ eventBusRouter.post( } if (result) { await result.saveToDb(); - return result; + return { + ...result, + eventBusInstance: undefined, + }; } throw new BadRequestError('There was an error adding the destination'); } diff --git a/packages/cli/src/executions/executions.service.ts b/packages/cli/src/executions/executions.service.ts index 8a5ad8b82b..936d34ea7d 100644 --- a/packages/cli/src/executions/executions.service.ts +++ b/packages/cli/src/executions/executions.service.ts @@ -3,7 +3,15 @@ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { validate as jsonSchemaValidate } from 'jsonschema'; import { BinaryDataManager } from 'n8n-core'; -import type { IDataObject, IWorkflowBase, JsonObject } from 'n8n-workflow'; +import type { + IDataObject, + IWorkflowBase, + JsonObject, + ExecutionStatus, + IRunExecutionData, + NodeOperationError, + IExecutionsSummary, +} from 'n8n-workflow'; import { deepCopy, LoggerProxy, jsonParse, Workflow } from 'n8n-workflow'; import type { FindOperator, FindOptionsWhere } from 'typeorm'; import { In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm'; @@ -25,6 +33,7 @@ import { getSharedWorkflowIds } from '@/WorkflowHelpers'; import { WorkflowRunner } from '@/WorkflowRunner'; import * as Db from '@/Db'; import * as GenericHelpers from '@/GenericHelpers'; +import { parse } from 'flatted'; interface IGetExecutionsQueryFilter { id?: FindOperator; @@ -32,6 +41,7 @@ interface IGetExecutionsQueryFilter { mode?: string; retryOf?: string; retrySuccessId?: string; + status?: ExecutionStatus[]; workflowId?: string; // eslint-disable-next-line @typescript-eslint/no-explicit-any waitTill?: FindOperator | boolean; @@ -45,6 +55,10 @@ const schemaGetExecutionsQueryFilter = { mode: { type: 'string' }, retryOf: { type: 'string' }, retrySuccessId: { type: 'string' }, + status: { + type: 'array', + items: { type: 'string' }, + }, waitTill: { type: 'boolean' }, workflowId: { anyOf: [{ type: 'integer' }, { type: 'string' }] }, }, @@ -193,7 +207,16 @@ export class ExecutionsService { .map(({ id }) => id), ); - const findWhere: FindOptionsWhere = { workflowId: In(sharedWorkflowIds) }; + const findWhere: FindOptionsWhere = { + workflowId: In(sharedWorkflowIds), + }; + if (filter?.status) { + Object.assign(findWhere, { status: In(filter.status) }); + delete filter.status; // remove status from filter so it does not get applied twice + } + if (filter?.finished) { + Object.assign(findWhere, { finished: filter.finished }); + } const rangeQuery: string[] = []; const rangeQueryParams: { @@ -257,7 +280,42 @@ export class ExecutionsService { req.user, ); - const formattedExecutions = executions.map((execution) => { + const formattedExecutions: IExecutionsSummary[] = executions.map((execution) => { + // inject potential node execution errors into the execution response + const nodeExecutionStatus = {}; + let lastNodeExecuted; + let executionError; + try { + const data = parse(execution.data) as IRunExecutionData; + lastNodeExecuted = data?.resultData?.lastNodeExecuted ?? ''; + executionError = data?.resultData?.error; + if (data?.resultData?.runData) { + for (const key of Object.keys(data.resultData.runData)) { + const errors = data.resultData.runData[key] + ?.filter((taskdata) => taskdata.error?.name) + ?.map((taskdata) => { + if (taskdata.error?.name === 'NodeOperationError') { + return { + name: (taskdata.error as NodeOperationError).name, + message: (taskdata.error as NodeOperationError).message, + description: (taskdata.error as NodeOperationError).description, + }; + } else { + return { + name: taskdata.error?.name, + }; + } + }); + Object.assign(nodeExecutionStatus, { + [key]: { + executionStatus: data.resultData.runData[key][0].executionStatus, + errors, + data: data.resultData.runData[key][0].data ?? undefined, + }, + }); + } + } + } catch {} return { id: execution.id, finished: execution.finished, @@ -269,9 +327,12 @@ export class ExecutionsService { stoppedAt: execution.stoppedAt, workflowId: execution.workflowData?.id ?? '', workflowName: execution.workflowData?.name, - }; + status: execution.status, + lastNodeExecuted, + executionError, + nodeExecutionStatus, + } as IExecutionsSummary; }); - return { count, results: formattedExecutions, diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index dbbbe63b73..90866f6268 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,5 +1,6 @@ import { LoggerProxy as Logger } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; +import { eventBus } from '../eventbus'; export abstract class AbstractPush { protected connections: Record = {}; @@ -10,6 +11,7 @@ export abstract class AbstractPush { protected add(sessionId: string, connection: T): void { const { connections } = this; Logger.debug('Add editor-UI session', { sessionId }); + eventBus.emit('editorUiConnected', sessionId); const existingConnection = connections[sessionId]; if (existingConnection) { diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 00bd6d4752..d30b542f14 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -22,6 +22,7 @@ import { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventBusDes import { MessageEventBusDestinationSentry } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; import { EventMessageAudit } from '@/eventbus/EventMessageClasses/EventMessageAudit'; import { v4 as uuid } from 'uuid'; +import { EventNamesTypes } from '../../src/eventbus/EventMessageClasses'; jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); jest.mock('axios'); @@ -62,12 +63,6 @@ const testSentryDestination: MessageEventBusDestinationSentryOptions = { subscribedEvents: ['n8n.test.message', 'n8n.audit.user.updated'], }; -async function cleanLogs() { - eventBus.logWriter.cleanAllLogs(); - const allMessages = await eventBus.getEventsAll(); - expect(allMessages.length).toBe(0); -} - async function confirmIdInAll(id: string) { const sent = await eventBus.getEventsAll(); expect(sent.length).toBeGreaterThan(0); @@ -106,13 +101,13 @@ beforeAll(async () => { config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.keepLogCount', '1'); config.set('enterprise.features.logStreaming', true); + await eventBus.initialize(); }); beforeEach(async () => { config.set('userManagement.disabled', false); config.set('userManagement.isInstanceOwnerSetUp', true); - config.set('enterprise.features.logStreaming', false); }); afterAll(async () => { @@ -127,7 +122,10 @@ test('should have a running logwriter process', () => { }); test('should have logwriter log messages', async () => { - const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); await eventBus.send(testMessage); await new Promise((resolve) => { eventBus.logWriter.worker?.once('message', async (msg: { command: string; data: any }) => { @@ -176,10 +174,13 @@ test('GET /eventbus/destination all returned destinations should exist in eventb } }); +// this test (presumably the mocking) is causing the test suite to randomly fail test.skip('should send message to syslog', async () => { - const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); config.set('enterprise.features.logStreaming', true); - // await cleanLogs(); const syslogDestination = eventBus.destinations[ testSyslogDestination.id! @@ -217,11 +218,10 @@ test.skip('should send message to syslog', async () => { test.skip('should confirm send message if there are no subscribers', async () => { const testMessageUnsubscribed = new EventMessageGeneric({ - eventName: 'n8n.test.unsub', + eventName: 'n8n.test.unsub' as EventNamesTypes, id: uuid(), }); config.set('enterprise.features.logStreaming', true); - // await cleanLogs(); const syslogDestination = eventBus.destinations[ testSyslogDestination.id! @@ -229,11 +229,6 @@ test.skip('should confirm send message if there are no subscribers', async () => syslogDestination.enable(); - const mockedSyslogClientLog = jest.spyOn(syslogDestination.client, 'log'); - mockedSyslogClientLog.mockImplementation((_m, _options, _cb) => { - return syslogDestination.client; - }); - await eventBus.send(testMessageUnsubscribed); await new Promise((resolve) => { @@ -244,7 +239,6 @@ test.skip('should confirm send message if there are no subscribers', async () => await confirmIdInAll(testMessageUnsubscribed.id); } else if (msg.command === 'confirmMessageSent') { await confirmIdSent(testMessageUnsubscribed.id); - expect(mockedSyslogClientLog).toHaveBeenCalled(); syslogDestination.disable(); eventBus.logWriter.worker?.removeListener('message', handler002); resolve(true); @@ -264,7 +258,6 @@ test('should anonymize audit message to syslog ', async () => { id: uuid(), }); config.set('enterprise.features.logStreaming', true); - // await cleanLogs(); const syslogDestination = eventBus.destinations[ testSyslogDestination.id! @@ -322,9 +315,11 @@ test('should anonymize audit message to syslog ', async () => { }); test('should send message to webhook ', async () => { - const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); config.set('enterprise.features.logStreaming', true); - // await cleanLogs(); const webhookDestination = eventBus.destinations[ testWebhookDestination.id! @@ -355,9 +350,11 @@ test('should send message to webhook ', async () => { }); test('should send message to sentry ', async () => { - const testMessage = new EventMessageGeneric({ eventName: 'n8n.test.message', id: uuid() }); + const testMessage = new EventMessageGeneric({ + eventName: 'n8n.test.message' as EventNamesTypes, + id: uuid(), + }); config.set('enterprise.features.logStreaming', true); - // await cleanLogs(); const sentryDestination = eventBus.destinations[ testSentryDestination.id! diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 75e233d662..19c1d316e6 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -156,6 +156,7 @@ function mockFullRunData(): IRun { }, mode: 'manual', startedAt: new Date(), + status: 'new', }; } diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 8cb90f4339..b7179460ec 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -2292,6 +2292,9 @@ export function getExecuteFunctions( prepareOutputData: NodeHelpers.prepareOutputData, async putExecutionToWait(waitTill: Date): Promise { runExecutionData.waitTill = waitTill; + if (additionalData.setExecutionStatus) { + additionalData.setExecutionStatus('waiting'); + } }, sendMessageToUI(...args: any[]): void { if (mode !== 'manual') { diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 5969504df1..17f0968203 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -13,6 +13,7 @@ import PCancelable from 'p-cancelable'; import type { ExecutionError, + ExecutionStatus, IConnection, IDataObject, IExecuteData, @@ -46,6 +47,8 @@ export class WorkflowExecute { private mode: WorkflowExecuteMode; + private status: ExecutionStatus; + constructor( additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, @@ -53,6 +56,7 @@ export class WorkflowExecute { ) { this.additionalData = additionalData; this.mode = mode; + this.status = 'new'; this.runExecutionData = runExecutionData || { startData: {}, resultData: { @@ -85,6 +89,8 @@ export class WorkflowExecute { destinationNode?: string, pinData?: IPinData, ): PCancelable { + this.status = 'running'; + // Get the nodes to start workflow execution from startNode = startNode || workflow.getStartNode(destinationNode); @@ -157,6 +163,8 @@ export class WorkflowExecute { let incomingNodeConnections: INodeConnections | undefined; let connection: IConnection; + this.status = 'running'; + const runIndex = 0; // Initialize the nodeExecutionStack and waitingExecution with @@ -682,6 +690,8 @@ export class WorkflowExecute { const startedAt = new Date(); + this.status = 'running'; + const startNode = this.runExecutionData.executionData!.nodeExecutionStack[0].node.name; let destinationNode: string | undefined; @@ -758,6 +768,7 @@ export class WorkflowExecute { main: executionData.data.main, } as ITaskDataConnections, source: [], + executionStatus: 'error', }, ], }, @@ -1048,10 +1059,12 @@ export class WorkflowExecute { startTime, executionTime: new Date().getTime() - startTime, source: !executionData.source ? [] : executionData.source.main, + executionStatus: 'success', }; if (executionError !== undefined) { taskData.error = executionError; + taskData.executionStatus = 'error'; if (executionData.node.continueOnFail === true) { // Workflow should continue running even if node errors @@ -1317,6 +1330,7 @@ export class WorkflowExecute { mode: this.mode, startedAt, stoppedAt: new Date(), + status: this.status, }; return fullRunData; diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index d20f671302..7d85aa9e35 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -28,6 +28,7 @@ import { NodeParameterValueType, INodeActionTypeDescription, IDisplayOptions, + IExecutionsSummary, IAbstractEventMessage, } from 'n8n-workflow'; import { FAKE_DOOR_FEATURES } from './constants'; @@ -352,19 +353,6 @@ export interface IExecutionsStopData { stoppedAt: Date; } -export interface IExecutionsSummary { - id: string; - mode: WorkflowExecuteMode; - finished?: boolean; - retryOf?: string; - retrySuccessId?: string; - waitTill?: Date; - startedAt: Date; - stoppedAt?: Date; - workflowId: string; - workflowName?: string; -} - export interface IExecutionDeleteFilter { deleteBefore?: Date; filters?: IDataObject; @@ -379,7 +367,13 @@ export type IPushData = | PushDataConsoleMessage | PushDataReloadNodeType | PushDataRemoveNodeType - | PushDataTestWebhook; + | PushDataTestWebhook + | PushDataExecutionRecovered; + +type PushDataExecutionRecovered = { + data: IPushDataExecutionRecovered; + type: 'executionRecovered'; +}; type PushDataExecutionFinished = { data: IPushDataExecutionFinished; @@ -429,6 +423,9 @@ export interface IPushDataExecutionStarted { workflowId: string; workflowName?: string; } +export interface IPushDataExecutionRecovered { + executionId: string; +} export interface IPushDataExecutionFinished { data: IRun; diff --git a/packages/editor-ui/src/components/ExecutionsList.vue b/packages/editor-ui/src/components/ExecutionsList.vue index e511e98ec0..ba27506d2e 100644 --- a/packages/editor-ui/src/components/ExecutionsList.vue +++ b/packages/editor-ui/src/components/ExecutionsList.vue @@ -264,7 +264,7 @@ import { IExecutionsSummary, IWorkflowShortResponse, } from '@/Interface'; -import { IDataObject } from 'n8n-workflow'; +import type { ExecutionStatus, IDataObject } from 'n8n-workflow'; import { range as _range } from 'lodash'; import mixins from 'vue-typed-mixins'; import { mapStores } from 'pinia'; @@ -272,8 +272,6 @@ import { useUIStore } from '@/stores/ui'; import { useWorkflowsStore } from '@/stores/workflows'; import { setPageTitle } from '@/utils'; -type ExecutionStatus = 'failed' | 'success' | 'waiting' | 'running' | 'unknown'; - export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, showMessage).extend( { name: 'ExecutionsList', @@ -337,6 +335,14 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, id: 'error', name: this.$locale.baseText('executionsList.error'), }, + { + id: 'crashed', + name: this.$locale.baseText('executionsList.error'), + }, + { + id: 'new', + name: this.$locale.baseText('executionsList.new'), + }, { id: 'running', name: this.$locale.baseText('executionsList.running'), @@ -357,13 +363,12 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, combinedExecutions(): IExecutionsSummary[] { const returnData: IExecutionsSummary[] = []; - if (['ALL', 'running'].includes(this.filter.status)) { + if (['ALL', 'running', 'new'].includes(this.filter.status)) { returnData.push(...this.activeExecutions); } - if (['ALL', 'error', 'success', 'waiting'].includes(this.filter.status)) { + if (['ALL', 'error', 'crashed', 'success', 'waiting'].includes(this.filter.status)) { returnData.push(...this.finishedExecutions); } - return returnData; }, combinedExecutionsCount(): number { @@ -391,16 +396,31 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, return filter; }, workflowFilterPast(): IDataObject { - const filter: IDataObject = {}; + const queryFilter: IDataObject = {}; if (this.filter.workflowId !== 'ALL') { - filter.workflowId = this.filter.workflowId; + queryFilter.workflowId = this.filter.workflowId; } - if (this.filter.status === 'waiting') { - filter.waitTill = true; - } else if (['error', 'success'].includes(this.filter.status)) { - filter.finished = this.filter.status === 'success'; + switch (this.filter.status as ExecutionStatus) { + case 'waiting': + queryFilter.status = ['waiting']; + break; + case 'crashed': + queryFilter.status = ['crashed']; + break; + case 'new': + queryFilter.status = ['new']; + break; + case 'error': + queryFilter.status = ['failed', 'crashed', 'error']; + break; + case 'success': + queryFilter.status = ['success']; + break; + case 'running': + queryFilter.status = ['running']; + break; } - return filter; + return queryFilter; }, pageTitle() { return this.$locale.baseText('executionsList.workflowExecutions'); @@ -793,19 +813,23 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, this.isDataLoading = false; }, getStatus(execution: IExecutionsSummary): ExecutionStatus { - let status: ExecutionStatus = 'unknown'; - if (execution.waitTill) { - status = 'waiting'; - } else if (execution.stoppedAt === undefined) { - status = 'running'; - } else if (execution.finished) { - status = 'success'; - } else if (execution.stoppedAt !== null) { - status = 'failed'; - } else { - status = 'unknown'; + if (execution.status) return execution.status; + else { + // this should not happen but just in case + let status: ExecutionStatus = 'unknown'; + if (execution.waitTill) { + status = 'waiting'; + } else if (execution.stoppedAt === undefined) { + status = 'running'; + } else if (execution.finished) { + status = 'success'; + } else if (execution.stoppedAt !== null) { + status = 'failed'; + } else { + status = 'unknown'; + } + return status; } - return status; }, getRowClass(execution: IExecutionsSummary): string { return [this.$style.execRow, this.$style[this.getStatus(execution)]].join(' '); @@ -816,6 +840,10 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, if (status === 'waiting') { text = this.$locale.baseText('executionsList.waiting'); + } else if (status === 'crashed') { + text = this.$locale.baseText('executionsList.error'); + } else if (status === 'new') { + text = this.$locale.baseText('executionsList.new'); } else if (status === 'running') { text = this.$locale.baseText('executionsList.running'); } else if (status === 'success') { @@ -834,6 +862,10 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, if (status === 'waiting') { path = 'executionsList.statusWaiting'; + } else if (status === 'crashed') { + path = 'executionsList.statusText'; + } else if (status === 'new') { + path = 'executionsList.statusNew'; } else if (status === 'running') { path = 'executionsList.statusRunning'; } else if (status === 'success') { @@ -990,6 +1022,10 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, color: var(--color-danger); } + .crashed & { + color: var(--color-danger); + } + .waiting & { color: var(--color-secondary); } @@ -1099,6 +1135,10 @@ export default mixins(externalHooks, genericHelpers, executionHelpers, restApi, background: var(--color-danger); } + &.crashed td:first-child::before { + background: var(--color-danger); + } + &.success td:first-child::before { background: var(--color-success); } diff --git a/packages/editor-ui/src/components/ExecutionsView/ExecutionCard.vue b/packages/editor-ui/src/components/ExecutionsView/ExecutionCard.vue index aaa602f0ca..2e86a0aeb3 100644 --- a/packages/editor-ui/src/components/ExecutionsView/ExecutionCard.vue +++ b/packages/editor-ui/src/components/ExecutionsView/ExecutionCard.vue @@ -46,7 +46,7 @@ > {{ $locale.baseText('executionDetails.runningTimeFinished', { - interpolate: { time: executionUIDetails.runningTime }, + interpolate: { time: executionUIDetails?.runningTime }, }) }} @@ -191,7 +191,8 @@ export default mixins(executionHelpers, showMessage, restApi).extend({ } } - &.error { + &.error, + &.crashed { &, & .executionLink { border-left: var(--spacing-4xs) var(--border-style-base) hsl(var(--color-danger-h), 94%, 80%); diff --git a/packages/editor-ui/src/components/ExecutionsView/ExecutionPreview.vue b/packages/editor-ui/src/components/ExecutionsView/ExecutionPreview.vue index 43e727b1dd..bd5774a47a 100644 --- a/packages/editor-ui/src/components/ExecutionsView/ExecutionPreview.vue +++ b/packages/editor-ui/src/components/ExecutionsView/ExecutionPreview.vue @@ -1,8 +1,5 @@ + + @@ -387,6 +398,16 @@ export default mixins(workflowHelpers).extend({ } } +.recoveredOutputData { + margin: auto; + max-width: 250px; + text-align: center; + + > *:first-child { + margin-bottom: var(--spacing-m); + } +} + .notConnected { max-width: 300px; diff --git a/packages/editor-ui/src/components/InviteUsersModal.vue b/packages/editor-ui/src/components/InviteUsersModal.vue index db1f1fead3..94a78158ad 100644 --- a/packages/editor-ui/src/components/InviteUsersModal.vue +++ b/packages/editor-ui/src/components/InviteUsersModal.vue @@ -168,7 +168,6 @@ export default mixins(showMessage, copyPaste).extend({ return this.emailsCount >= 1; }, invitedUsers(): IUser[] { - console.log(this.usersStore.allUsers, this.showInviteUrls); return this.showInviteUrls ? this.usersStore.allUsers.filter((user) => this.showInviteUrls!.find((invite) => invite.user.id === user.id), diff --git a/packages/editor-ui/src/components/Node.vue b/packages/editor-ui/src/components/Node.vue index 6f57fba76f..03a498c8ef 100644 --- a/packages/editor-ui/src/components/Node.vue +++ b/packages/editor-ui/src/components/Node.vue @@ -34,7 +34,7 @@ -
+
+ + @@ -284,4 +295,26 @@ export default mixins(pinData).extend({ font-weight: var(--font-weight-bold); font-size: var(--font-size-s); } + +.noOutputData { + max-width: 180px; + + > *:first-child { + margin-bottom: var(--spacing-m); + } + + > * { + margin-bottom: var(--spacing-2xs); + } +} + +.recoveredOutputData { + margin: auto; + max-width: 250px; + text-align: center; + + > *:first-child { + margin-bottom: var(--spacing-m); + } +} diff --git a/packages/editor-ui/src/components/RunData.vue b/packages/editor-ui/src/components/RunData.vue index f48ee8f191..d44ddd8c79 100644 --- a/packages/editor-ui/src/components/RunData.vue +++ b/packages/editor-ui/src/components/RunData.vue @@ -162,7 +162,7 @@
@@ -216,6 +216,10 @@
+
+ +
+
{{ @@ -244,7 +248,7 @@
- + xxx
@@ -679,6 +683,9 @@ export default mixins(externalHooks, genericHelpers, nodeHelpers, pinData).exten this.hasPinData), ); }, + isArtificalRecoveredEventItem(): boolean { + return this.inputData?.[0]?.json?.isArtificalRecoveredEventItem !== undefined ?? false; + }, subworkflowExecutionError(): Error | null { return this.workflowsStore.subWorkflowExecutionError; }, diff --git a/packages/editor-ui/src/components/TitledList.vue b/packages/editor-ui/src/components/TitledList.vue index a3368937de..64d1d64f3e 100644 --- a/packages/editor-ui/src/components/TitledList.vue +++ b/packages/editor-ui/src/components/TitledList.vue @@ -2,7 +2,7 @@

    -
  • +
diff --git a/packages/editor-ui/src/mixins/executionsHelpers.ts b/packages/editor-ui/src/mixins/executionsHelpers.ts index 6ba4e9d52d..36c540f9de 100644 --- a/packages/editor-ui/src/mixins/executionsHelpers.ts +++ b/packages/editor-ui/src/mixins/executionsHelpers.ts @@ -1,9 +1,9 @@ -import { IExecutionsSummary } from '@/Interface'; import { useWorkflowsStore } from '@/stores/workflows'; import { i18n as locale } from '@/plugins/i18n'; import { mapStores } from 'pinia'; import mixins from 'vue-typed-mixins'; import { genericHelpers } from './genericHelpers'; +import { IExecutionsSummary } from 'n8n-workflow'; export interface IExecutionUIData { name: string; @@ -40,17 +40,17 @@ export const executionHelpers = mixins(genericHelpers).extend({ runningTime: '', }; - if (execution.waitTill) { + if (execution.status === 'waiting' || execution.waitTill) { status.name = 'waiting'; status.label = this.$locale.baseText('executionsList.waiting'); - } else if (execution.stoppedAt === undefined) { + } else if (execution.status === 'running' || execution.stoppedAt === undefined) { status.name = 'running'; status.label = this.$locale.baseText('executionsList.running'); status.runningTime = this.displayTimer( new Date().getTime() - new Date(execution.startedAt).getTime(), true, ); - } else if (execution.finished) { + } else if (execution.status === 'success' || execution.finished) { status.name = 'success'; status.label = this.$locale.baseText('executionsList.succeeded'); if (execution.stoppedAt) { @@ -59,7 +59,23 @@ export const executionHelpers = mixins(genericHelpers).extend({ true, ); } - } else if (execution.stoppedAt !== null) { + } else if (execution.status === 'crashed') { + status.name = 'crashed'; + status.label = this.$locale.baseText('executionsList.error'); + if (execution.stoppedAt) { + status.runningTime = this.displayTimer( + new Date(execution.stoppedAt).getTime() - new Date(execution.startedAt).getTime(), + true, + ); + } + } else if (execution.status === 'new') { + status.name = 'new'; + status.label = this.$locale.baseText('executionsList.new'); + } else if ( + execution.status === 'error' || + execution.status === 'failed' || + execution.stoppedAt !== null + ) { status.name = 'error'; status.label = this.$locale.baseText('executionsList.error'); if (execution.stoppedAt) { diff --git a/packages/editor-ui/src/mixins/pushConnection.ts b/packages/editor-ui/src/mixins/pushConnection.ts index 7ef18364fb..4511e025ff 100644 --- a/packages/editor-ui/src/mixins/pushConnection.ts +++ b/packages/editor-ui/src/mixins/pushConnection.ts @@ -1,4 +1,9 @@ -import { IExecutionResponse, IExecutionsCurrentSummaryExtended, IPushData } from '@/Interface'; +import { + IExecutionResponse, + IExecutionsCurrentSummaryExtended, + IPushData, + IPushDataExecutionFinished, +} from '@/Interface'; import { externalHooks } from '@/mixins/externalHooks'; import { nodeHelpers } from '@/mixins/nodeHelpers'; @@ -10,6 +15,8 @@ import { ExpressionError, IDataObject, INodeTypeNameVersion, + IRun, + IRunExecutionData, IWorkflowBase, SubworkflowOperationError, TelemetryHelpers, @@ -25,6 +32,7 @@ import { useWorkflowsStore } from '@/stores/workflows'; import { useNodeTypesStore } from '@/stores/nodeTypes'; import { useCredentialsStore } from '@/stores/credentials'; import { useSettingsStore } from '@/stores/settings'; +import { parse } from 'flatted'; export const pushConnection = mixins( externalHooks, @@ -57,21 +65,6 @@ export const pushConnection = mixins( }, methods: { attemptReconnect() { - const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning'); - if (this.connectRetries > 3 && !this.lostConnection && isWorkflowRunning) { - this.lostConnection = true; - - this.workflowsStore.executingNode = null; - this.uiStore.removeActiveAction('workflowRunning'); - - this.$showMessage({ - title: this.$locale.baseText('pushConnection.executionFailed'), - message: this.$locale.baseText('pushConnection.executionFailed.message'), - type: 'error', - duration: 0, - }); - } - this.pushConnect(); }, @@ -115,13 +108,17 @@ export const pushConnection = mixins( this.connectRetries = 0; this.lostConnection = false; this.rootStore.pushConnectionActive = true; + this.clearAllStickyNotifications(); this.pushSource?.removeEventListener('open', this.onConnectionSuccess); }, onConnectionError() { this.pushDisconnect(); this.connectRetries++; - this.reconnectTimeout = setTimeout(this.attemptReconnect, this.connectRetries * 5000); + this.reconnectTimeout = setTimeout( + this.attemptReconnect, + Math.min(this.connectRetries * 3000, 30000), // maximum 30 seconds backoff + ); }, /** @@ -186,7 +183,7 @@ export const pushConnection = mixins( /** * Process a newly received message */ - pushMessageReceived(event: Event, isRetry?: boolean): boolean { + async pushMessageReceived(event: Event, isRetry?: boolean): Promise { const retryAttempts = 5; let receivedData: IPushData; try { @@ -229,11 +226,81 @@ export const pushConnection = mixins( } } - if (receivedData.type === 'executionFinished') { - // The workflow finished executing - const pushData = receivedData.data; + // recovered execution data is handled like executionFinished data, however for security reasons + // we need to fetch the data from the server again rather than push it to all clients + let recoveredPushData: IPushDataExecutionFinished | undefined = undefined; + if (receivedData.type === 'executionRecovered') { + const recoveredExecutionId = receivedData.data?.executionId; + const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning'); + if (isWorkflowRunning && this.workflowsStore.activeExecutionId === recoveredExecutionId) { + // pull execution data for the recovered execution from the server + const executionData = await this.workflowsStore.fetchExecutionDataById( + this.workflowsStore.activeExecutionId, + ); + if (executionData?.data) { + // data comes in as 'flatten' object, so we need to parse it + executionData.data = parse( + executionData.data as unknown as string, + ) as IRunExecutionData; + const iRunExecutionData: IRunExecutionData = { + startData: executionData.data?.startData, + resultData: executionData.data?.resultData ?? { runData: {} }, + executionData: executionData.data?.executionData, + }; + if ( + this.workflowsStore.workflowExecutionData?.workflowId === executionData.workflowId + ) { + const activeRunData = + this.workflowsStore.workflowExecutionData?.data?.resultData?.runData; + if (activeRunData) { + for (const key of Object.keys(activeRunData)) { + iRunExecutionData.resultData.runData[key] = activeRunData[key]; + } + } + } + const iRun: IRun = { + data: iRunExecutionData, + finished: executionData.finished, + mode: executionData.mode, + waitTill: executionData.data?.waitTill, + startedAt: executionData.startedAt, + stoppedAt: executionData.stoppedAt, + status: 'crashed', + }; + if (executionData.data) { + recoveredPushData = { + executionId: executionData.id, + data: iRun, + }; + } + } + } + } - this.workflowsStore.finishActiveExecution(pushData); + if (receivedData.type === 'executionFinished' || receivedData.type === 'executionRecovered') { + // The workflow finished executing + let pushData: IPushDataExecutionFinished; + if (receivedData.type === 'executionRecovered' && recoveredPushData !== undefined) { + pushData = recoveredPushData as IPushDataExecutionFinished; + } else { + pushData = receivedData.data as IPushDataExecutionFinished; + } + + if (this.workflowsStore.activeExecutionId === pushData.executionId) { + const activeRunData = + this.workflowsStore.workflowExecutionData?.data?.resultData?.runData; + if (activeRunData) { + for (const key of Object.keys(activeRunData)) { + if ( + pushData.data.data.resultData.runData[key]?.[0]?.data?.main?.[0]?.[0]?.json + .isArtificalRecoveredEventItem === true && + activeRunData[key].length > 0 + ) + pushData.data.data.resultData.runData[key] = activeRunData[key]; + } + } + this.workflowsStore.finishActiveExecution(pushData); + } if (!this.uiStore.isActionActive('workflowRunning')) { // No workflow is running so ignore the messages @@ -251,7 +318,13 @@ export const pushConnection = mixins( const runDataExecuted = pushData.data; - const runDataExecutedErrorMessage = this.$getExecutionError(runDataExecuted.data); + let runDataExecutedErrorMessage = this.$getExecutionError(runDataExecuted.data); + + if (pushData.data.status === 'crashed') { + runDataExecutedErrorMessage = this.$locale.baseText( + 'pushConnection.executionFailed.message', + ); + } const lineNumber = runDataExecuted && diff --git a/packages/editor-ui/src/mixins/restApi.ts b/packages/editor-ui/src/mixins/restApi.ts index 0349722902..d479f38c8b 100644 --- a/packages/editor-ui/src/mixins/restApi.ts +++ b/packages/editor-ui/src/mixins/restApi.ts @@ -27,6 +27,7 @@ import { INodePropertyOptions, INodeTypeDescription, INodeTypeNameVersion, + IRunExecutionData, } from 'n8n-workflow'; import { makeRestApiRequest } from '@/utils'; import { mapStores } from 'pinia'; diff --git a/packages/editor-ui/src/plugins/i18n/locales/en.json b/packages/editor-ui/src/plugins/i18n/locales/en.json index 3460093297..df4cb84fe5 100644 --- a/packages/editor-ui/src/plugins/i18n/locales/en.json +++ b/packages/editor-ui/src/plugins/i18n/locales/en.json @@ -405,6 +405,8 @@ "executionDetails.confirmMessage.message": "Are you sure that you want to delete the current execution?", "executionDetails.deleteExecution": "Delete this execution", "executionDetails.executionFailed": "Execution failed", + "executionDetails.executionFailed.recoveredNodeTitle": "Can’t show data", + "executionDetails.executionFailed.recoveredNodeMessage": "The execution was interrupted, so the data was not saved. Try fixing the workflow and re-executing.", "executionDetails.executionId": "Execution ID", "executionDetails.executionWaiting": "Execution waiting", "executionDetails.executionWasSuccessful": "Execution was successful", @@ -440,6 +442,8 @@ "executionsList.confirmMessage.headline": "Delete Executions?", "executionsList.confirmMessage.message": "Are you sure that you want to delete the {numSelected} selected execution(s)?", "executionsList.clearSelection": "Clear selection", + "executionsList.crashed": "Crashed", + "executionsList.new": "New", "executionsList.error": "Failed", "executionsList.filters": "Filters", "executionsList.loadMore": "Load More", @@ -468,7 +472,6 @@ "executionsList.showError.refreshData.title": "Problem loading data", "executionsList.showError.retryExecution.title": "Problem with retry", "executionsList.showError.stopExecution.title": "Problem stopping execution", - "executionsList.showError.getExecutionEvents.title": "Problem fetching execution events", "executionsList.showMessage.handleDeleteSelected.title": "Execution deleted", "executionsList.showMessage.retrySuccessfulFalse.title": "Retry unsuccessful", "executionsList.showMessage.retrySuccessfulTrue.title": "Retry successful", @@ -479,13 +482,15 @@ "executionsList.id": "Execution ID", "executionsList.status": "Status", "executionsList.statusText": "{status} in {time}", + "executionsList.statusCrashed": "{status}", + "executionsList.statusNew": "{status}", "executionsList.statusRunning": "{status} for {time}", "executionsList.statusWaiting": "{status} until {time}", - "executionsList.statusUnknown": "{status}", + "executionsList.statusUnknown": "Could not complete", "executionsList.stopExecution": "Stop Execution", "executionsList.success": "Success", "executionsList.successRetry": "Success retry", - "executionsList.unknown": "Unknown", + "executionsList.unknown": "Could not complete", "executionsList.unsavedWorkflow": "[UNSAVED WORKFLOW]", "executionsList.waiting": "Waiting", "executionsList.workflowExecutions": "All Executions", @@ -983,10 +988,11 @@ "pushConnection.workflowExecutedSuccessfully": "Workflow executed successfully", "pushConnectionTracker.cannotConnectToServer": "You have a connection issue or the server is down.
n8n should reconnect automatically once the issue is resolved.", "pushConnectionTracker.connectionLost": "Connection lost", + "pushConnectionTracker.connectionLost.message": "Attempting to reconnect...", "pushConnection.pollingNode.dataNotFound": "No {service} data found", "pushConnection.pollingNode.dataNotFound.message": "We didn’t find any data in {service} to simulate an event. Please create one in {service} and try again.", "pushConnection.executionFailed": "Execution failed", - "pushConnection.executionFailed.message": "There might not be enough memory to finish execution. Tips for avoiding this here", + "pushConnection.executionFailed.message": "There might not be enough memory to finish the execution. Tips for avoiding this here", "resourceLocator.id.placeholder": "Enter ID...", "resourceLocator.mode.id": "By ID", "resourceLocator.mode.url": "By URL", @@ -1146,7 +1152,6 @@ "settings.users.setupSMTPInfo": "You will need details of an {link} to complete the setup.", "settings.users.setupSMTPInfo.link": "SMTP server", "settings.users.smtpToAddUsersWarning": "Set up SMTP before adding users (so that n8n can send them invitation emails). Instructions", - "settings.users.smtpToAddUsersWarning": "Set up SMTP before adding users (so that n8n can send them invitation emails). Instructions", "settings.users.transferWorkflowsAndCredentials": "Transfer their workflows and credentials to another user", "settings.users.transferredToUser": "Data transferred to {user}", "settings.users.userDeleted": "User deleted", diff --git a/packages/editor-ui/src/stores/workflows.ts b/packages/editor-ui/src/stores/workflows.ts index 43d9260f8c..e7ab20a502 100644 --- a/packages/editor-ui/src/stores/workflows.ts +++ b/packages/editor-ui/src/stores/workflows.ts @@ -9,7 +9,6 @@ import { import { IExecutionResponse, IExecutionsCurrentSummaryExtended, - IExecutionsSummary, INewWorkflowData, INodeUi, INodeUpdatePropertiesInformation, @@ -28,6 +27,7 @@ import { IConnection, IConnections, IDataObject, + IExecutionsSummary, INode, INodeConnections, INodeCredentials, @@ -36,7 +36,9 @@ import { INodeIssueData, INodeParameters, IPinData, + IRun, IRunData, + IRunExecutionData, ITaskData, IWorkflowSettings, NodeHelpers, @@ -448,6 +450,10 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, { this.workflowExecutionPairedItemMappings = getPairedItemsMapping(this.workflowExecutionData); }, + setWorkflowExecutionRunData(workflowResultData: IRunExecutionData): void { + if (this.workflowExecutionData) this.workflowExecutionData.data = workflowResultData; + }, + setWorkflowSettings(workflowSettings: IWorkflowSettings): void { Vue.set(this.workflow, 'settings', workflowSettings); }, @@ -921,6 +927,11 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, { Vue.set(activeExecution, 'finished', finishedActiveExecution.data.finished); Vue.set(activeExecution, 'stoppedAt', finishedActiveExecution.data.stoppedAt); + if (finishedActiveExecution.data) { + this.setWorkflowExecutionRunData( + finishedActiveExecution.data as unknown as IRunExecutionData, + ); + } }, setActiveExecutions(newActiveExecutions: IExecutionsCurrentSummaryExtended[]): void { diff --git a/packages/editor-ui/src/utils/nodeViewUtils.ts b/packages/editor-ui/src/utils/nodeViewUtils.ts index b3a4f50fcc..9388ecc550 100644 --- a/packages/editor-ui/src/utils/nodeViewUtils.ts +++ b/packages/editor-ui/src/utils/nodeViewUtils.ts @@ -541,7 +541,11 @@ export const getOutputSummary = (data: ITaskData[], nodeConnections: NodeInputCo const outputMap: { [sourceOutputIndex: string]: { [targetNodeName: string]: { - [targetInputIndex: string]: { total: number; iterations: number }; + [targetInputIndex: string]: { + total: number; + iterations: number; + isArtificalRecoveredEventItem?: boolean; + }; }; }; } = {}; @@ -554,6 +558,13 @@ export const getOutputSummary = (data: ITaskData[], nodeConnections: NodeInputCo run.data.main.forEach((output: INodeExecutionData[] | null, i: number) => { const sourceOutputIndex = i; + // executionData that was recovered by recoverEvents in the CLI will have an isArtificalRecoveredEventItem property + // to indicate that it was not part of the original executionData + // we do not want to count these items in the summary + // if (output?.[0]?.json?.isArtificalRecoveredEventItem) { + // return outputMap; + // } + if (!outputMap[sourceOutputIndex]) { outputMap[sourceOutputIndex] = {}; } @@ -589,10 +600,19 @@ export const getOutputSummary = (data: ITaskData[], nodeConnections: NodeInputCo }; } - outputMap[sourceOutputIndex][targetNodeName][targetInputIndex].total += output - ? output.length - : 0; - outputMap[sourceOutputIndex][targetNodeName][targetInputIndex].iterations += output ? 1 : 0; + if (output?.[0]?.json?.isArtificalRecoveredEventItem) { + outputMap[sourceOutputIndex][targetNodeName][ + targetInputIndex + ].isArtificalRecoveredEventItem = true; + outputMap[sourceOutputIndex][targetNodeName][targetInputIndex].total = 0; + } else { + outputMap[sourceOutputIndex][targetNodeName][targetInputIndex].total += output + ? output.length + : 0; + outputMap[sourceOutputIndex][targetNodeName][targetInputIndex].iterations += output + ? 1 + : 0; + } }); }); }); @@ -607,6 +627,13 @@ export const resetConnection = (connection: Connection) => { connection.setPaintStyle(CONNECTOR_PAINT_STYLE_DEFAULT); }; +export const recoveredConnection = (connection: Connection) => { + connection.removeOverlay(OVERLAY_RUN_ITEMS_ID); + connection.addClass('success'); + showOrHideMidpointArrow(connection); + connection.setPaintStyle(CONNECTOR_PAINT_STYLE_PRIMARY); +}; + export const getRunItemsLabel = (output: { total: number; iterations: number }): string => { let label = `${output.total}`; label = output.total > 1 ? `${label} items` : `${label} item`; diff --git a/packages/editor-ui/src/views/NodeView.vue b/packages/editor-ui/src/views/NodeView.vue index 84a3c005e7..f5f854ea12 100644 --- a/packages/editor-ui/src/views/NodeView.vue +++ b/packages/editor-ui/src/views/NodeView.vue @@ -2869,7 +2869,9 @@ export default mixins( if (connection) { const output = outputMap[sourceOutputIndex][targetNodeName][targetInputIndex]; - if (!output || !output.total) { + if (output.isArtificalRecoveredEventItem) { + NodeViewUtils.recoveredConnection(connection); + } else if ((!output || !output.total) && !output.isArtificalRecoveredEventItem) { NodeViewUtils.resetConnection(connection); } else { NodeViewUtils.addConnectionOutputSuccess(connection, output); diff --git a/packages/workflow/src/ExecutionStatus.ts b/packages/workflow/src/ExecutionStatus.ts new file mode 100644 index 0000000000..501bd683ed --- /dev/null +++ b/packages/workflow/src/ExecutionStatus.ts @@ -0,0 +1,10 @@ +export type ExecutionStatus = + | 'canceled' + | 'crashed' + | 'error' + | 'failed' + | 'new' + | 'running' + | 'success' + | 'unknown' + | 'waiting'; diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index ed7346d8a2..b113d0d843 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -16,6 +16,7 @@ import type { WorkflowOperationError } from './WorkflowErrors'; import type { NodeApiError, NodeOperationError } from './NodeErrors'; import type { ExpressionError } from './ExpressionError'; import type { PathLike } from 'fs'; +import type { ExecutionStatus } from './ExecutionStatus'; export interface IAdditionalCredentialOptions { oauth2?: IOAuth2Options; @@ -1533,6 +1534,7 @@ export interface IRun { waitTill?: Date; startedAt: Date; stoppedAt?: Date; + status: ExecutionStatus; } // Contains all the data which is needed to execute a workflow and so also to @@ -1567,6 +1569,7 @@ export interface IRunData { export interface ITaskData { startTime: number; executionTime: number; + executionStatus?: ExecutionStatus; data?: ITaskDataConnections; error?: ExecutionError; source: Array; // Is an array as nodes have multiple inputs @@ -1661,6 +1664,7 @@ export interface IWorkflowExecuteAdditionalData { httpResponse?: express.Response; httpRequest?: express.Request; restApiUrl: string; + setExecutionStatus?: (status: ExecutionStatus) => void; sendMessageToUI?: (source: string, message: any) => void; timezone: string; webhookBaseUrl: string; @@ -1852,3 +1856,31 @@ export type PublicInstalledNode = { export interface NodeExecutionWithMetadata extends INodeExecutionData { pairedItem: IPairedItemData | IPairedItemData[]; } + +export interface IExecutionsSummary { + id: string; + finished?: boolean; + mode: WorkflowExecuteMode; + retryOf?: string; + retrySuccessId?: string; + waitTill?: Date; + startedAt: Date; + stoppedAt?: Date; + workflowId: string; + workflowName?: string; + status?: ExecutionStatus; + lastNodeExecuted?: string; + executionError?: ExecutionError; + nodeExecutionStatus?: { + [key: string]: IExceutionSummaryNodeExecutionResult; + }; +} + +export interface IExceutionSummaryNodeExecutionResult { + executionStatus: ExecutionStatus; + errors?: Array<{ + name?: string; + message?: string; + description?: string; + }>; +} diff --git a/packages/workflow/src/NodeErrors.ts b/packages/workflow/src/NodeErrors.ts index 983206408b..c2225d46fb 100644 --- a/packages/workflow/src/NodeErrors.ts +++ b/packages/workflow/src/NodeErrors.ts @@ -84,6 +84,19 @@ export abstract class ExecutionBaseError extends Error { this.cause = cause; } } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + toJSON?(): any { + return { + message: this.message, + lineNumber: this.lineNumber, + timestamp: this.timestamp, + name: this.name, + description: this.description, + context: this.context, + cause: this.cause, + }; + } } /** diff --git a/packages/workflow/src/WorkflowErrors.ts b/packages/workflow/src/WorkflowErrors.ts index 30326db3a6..c161d4475f 100644 --- a/packages/workflow/src/WorkflowErrors.ts +++ b/packages/workflow/src/WorkflowErrors.ts @@ -10,6 +10,8 @@ export class WorkflowOperationError extends Error { lineNumber: number | undefined; + description: string | undefined; + constructor(message: string, node?: INode) { super(message); this.name = this.constructor.name; diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 5e60cfb422..2cd0635257 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -8,6 +8,7 @@ export * from './Cron'; export * from './DeferredPromise'; export * from './Interfaces'; export * from './MessageEventBus'; +export * from './ExecutionStatus'; export * from './Expression'; export * from './ExpressionError'; export * from './NodeErrors';