diff --git a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts index 5ea8e411ad..46f27d3541 100644 --- a/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts +++ b/packages/cli/src/execution-lifecycle/__tests__/execution-lifecycle-hooks.test.ts @@ -25,6 +25,7 @@ import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.serv import { mockInstance } from '@test/mocking'; import { + getWorkflowHooksIntegrated, getWorkflowHooksMain, getWorkflowHooksWorkerExecuter, getWorkflowHooksWorkerMain, @@ -85,6 +86,8 @@ describe('Execution Lifecycle Hooks', () => { const now = new Date('2025-01-13T18:25:50.267Z'); jest.useFakeTimers({ now }); + let hooks: WorkflowHooks; + beforeEach(() => { jest.clearAllMocks(); workflowData.settings = {}; @@ -101,8 +104,62 @@ describe('Execution Lifecycle Hooks', () => { }; }); + const workflowEventTests = () => { + describe('workflowExecuteBefore', () => { + it('should emit workflow-pre-execute events', async () => { + await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + + expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', { + executionId, + data: workflowData, + }); + }); + }); + + describe('workflowExecuteAfter', () => { + it('should emit workflow-post-execute events', async () => { + await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + + expect(eventService.emit).toHaveBeenCalledWith('workflow-post-execute', { + executionId, + runData: successfulRun, + workflow: workflowData, + }); + }); + }); + }; + + const nodeEventsTests = () => { + describe('nodeExecuteBefore', () => { + it('should emit node-pre-execute event', async () => { + await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); + + expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', { + executionId, + workflow: workflowData, + nodeName, + }); + }); + }); + + describe('nodeExecuteAfter', () => { + it('should emit node-post-execute event', async () => { + await hooks.executeHookFunctions('nodeExecuteAfter', [ + nodeName, + taskData, + runExecutionData, + ]); + + expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', { + executionId, + workflow: workflowData, + nodeName, + }); + }); + }); + }; + describe('getWorkflowHooksMain', () => { - let hooks: WorkflowHooks; beforeEach(() => { hooks = getWorkflowHooksMain( { @@ -115,6 +172,9 @@ describe('Execution Lifecycle Hooks', () => { ); }); + workflowEventTests(); + nodeEventsTests(); + it('should setup the correct set of hooks', () => { expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks.mode).toBe('manual'); @@ -126,10 +186,10 @@ describe('Execution Lifecycle Hooks', () => { const { hookFunctions } = hooks; expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); expect(hookFunctions.nodeExecuteAfter).toHaveLength(3); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(2); + expect(hookFunctions.workflowExecuteBefore).toHaveLength(3); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); expect(hookFunctions.nodeFetchedData).toHaveLength(1); - expect(hookFunctions.sendResponse).toBeUndefined(); + expect(hookFunctions.sendResponse).toHaveLength(0); }); describe('nodeExecuteBefore', () => { @@ -141,16 +201,6 @@ describe('Execution Lifecycle Hooks', () => { pushRef, ); }); - - it('should emit node-pre-execute event', async () => { - await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); - - expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', { - executionId, - workflow: workflowData, - nodeName, - }); - }); }); describe('nodeExecuteAfter', () => { @@ -167,20 +217,6 @@ describe('Execution Lifecycle Hooks', () => { ); }); - it('should emit node-post-execute event', async () => { - await hooks.executeHookFunctions('nodeExecuteAfter', [ - nodeName, - taskData, - runExecutionData, - ]); - - expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', { - executionId, - workflow: workflowData, - nodeName, - }); - }); - it('should save execution progress when enabled', async () => { workflowData.settings = { saveExecutionProgress: true }; @@ -230,12 +266,6 @@ describe('Execution Lifecycle Hooks', () => { ); }); - it('should not call eventService', async () => { - await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); - - expect(eventService.emit).not.toHaveBeenCalled(); - }); - it('should run workflow.preExecute external hook', async () => { await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); @@ -249,7 +279,6 @@ describe('Execution Lifecycle Hooks', () => { describe('workflowExecuteAfter', () => { it('should send executionFinished push event', async () => { await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); - expect(eventService.emit).not.toHaveBeenCalled(); expect(push.send).toHaveBeenCalledWith( { type: 'executionFinished', @@ -320,15 +349,6 @@ describe('Execution Lifecycle Hooks', () => { ); }); - it('should handle errors when updating execution', async () => { - const error = new Error('Failed to update execution'); - executionRepository.updateExistingExecution.mockRejectedValueOnce(error); - - await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); - - expect(errorReporter.error).toHaveBeenCalledWith(error); - }); - it('should not delete unfinished executions', async () => { const unfinishedRun = mock({ finished: false, status: 'running' }); @@ -457,11 +477,28 @@ describe('Execution Lifecycle Hooks', () => { }); }); }); + + describe("when pushRef isn't set", () => { + beforeEach(() => { + hooks = getWorkflowHooksMain({ executionMode, workflowData }, executionId); + }); + + it('should not send any push events', async () => { + await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]); + await hooks.executeHookFunctions('nodeExecuteAfter', [ + nodeName, + taskData, + runExecutionData, + ]); + await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); + await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); + + expect(push.send).not.toHaveBeenCalled(); + }); + }); }); describe('getWorkflowHooksWorkerMain', () => { - let hooks: WorkflowHooks; - beforeEach(() => { hooks = getWorkflowHooksWorkerMain(executionMode, executionId, workflowData, { pushRef, @@ -469,6 +506,8 @@ describe('Execution Lifecycle Hooks', () => { }); }); + workflowEventTests(); + it('should setup the correct set of hooks', () => { expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks.mode).toBe('manual'); @@ -480,8 +519,10 @@ describe('Execution Lifecycle Hooks', () => { const { hookFunctions } = hooks; expect(hookFunctions.nodeExecuteBefore).toHaveLength(0); expect(hookFunctions.nodeExecuteAfter).toHaveLength(0); - expect(hookFunctions.workflowExecuteBefore).toHaveLength(1); - expect(hookFunctions.workflowExecuteAfter).toHaveLength(1); + expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(3); + expect(hookFunctions.nodeFetchedData).toHaveLength(0); + expect(hookFunctions.sendResponse).toHaveLength(0); }); describe('workflowExecuteBefore', () => { @@ -535,8 +576,6 @@ describe('Execution Lifecycle Hooks', () => { }); describe('getWorkflowHooksWorkerExecuter', () => { - let hooks: WorkflowHooks; - beforeEach(() => { hooks = getWorkflowHooksWorkerExecuter(executionMode, executionId, workflowData, { pushRef, @@ -544,6 +583,25 @@ describe('Execution Lifecycle Hooks', () => { }); }); + nodeEventsTests(); + + it('should setup the correct set of hooks', () => { + expect(hooks).toBeInstanceOf(WorkflowHooks); + expect(hooks.mode).toBe('manual'); + expect(hooks.executionId).toBe(executionId); + expect(hooks.workflowData).toEqual(workflowData); + expect(hooks.pushRef).toEqual('test-push-ref'); + expect(hooks.retryOf).toEqual('test-retry-of'); + + const { hookFunctions } = hooks; + expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); + expect(hookFunctions.nodeExecuteAfter).toHaveLength(3); + expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(4); + expect(hookFunctions.nodeFetchedData).toHaveLength(1); + expect(hookFunctions.sendResponse).toHaveLength(0); + }); + describe('saving static data', () => { it('should skip saving static data for manual executions', async () => { hooks.mode = 'manual'; @@ -614,4 +672,30 @@ describe('Execution Lifecycle Hooks', () => { }); }); }); + + describe('getWorkflowHooksIntegrated', () => { + beforeEach(() => { + hooks = getWorkflowHooksIntegrated(executionMode, executionId, workflowData, undefined); + }); + + workflowEventTests(); + nodeEventsTests(); + + it('should setup the correct set of hooks', () => { + expect(hooks).toBeInstanceOf(WorkflowHooks); + expect(hooks.mode).toBe('manual'); + expect(hooks.executionId).toBe(executionId); + expect(hooks.workflowData).toEqual(workflowData); + expect(hooks.pushRef).toBeUndefined(); + expect(hooks.retryOf).toBeUndefined(); + + const { hookFunctions } = hooks; + expect(hookFunctions.nodeExecuteBefore).toHaveLength(1); + expect(hookFunctions.nodeExecuteAfter).toHaveLength(2); + expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); + expect(hookFunctions.workflowExecuteAfter).toHaveLength(3); + expect(hookFunctions.nodeFetchedData).toHaveLength(1); + expect(hookFunctions.sendResponse).toHaveLength(0); + }); + }); }); diff --git a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts index 1296f53958..c991e2a6ab 100644 --- a/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts +++ b/packages/cli/src/execution-lifecycle/execution-lifecycle-hooks.ts @@ -34,6 +34,60 @@ import { } from './shared/shared-hook-functions'; import { toSaveSettings } from './to-save-settings'; +function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks { + const result: IWorkflowExecuteHooks = { + nodeExecuteBefore: [], + nodeExecuteAfter: [], + workflowExecuteBefore: [], + workflowExecuteAfter: [], + sendResponse: [], + nodeFetchedData: [], + }; + for (const hooks of hookFunctions) { + for (const key in hooks) { + if (!result[key] || !hooks[key]) continue; + result[key].push(...hooks[key]); + } + } + return result; +} + +function hookFunctionsWorkflowEvents(userId?: string): IWorkflowExecuteHooks { + const eventService = Container.get(EventService); + return { + workflowExecuteBefore: [ + async function (this: WorkflowHooks): Promise { + const { executionId, workflowData } = this; + eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); + }, + ], + workflowExecuteAfter: [ + async function (this: WorkflowHooks, runData: IRun): Promise { + const { executionId, workflowData: workflow } = this; + eventService.emit('workflow-post-execute', { executionId, runData, workflow, userId }); + }, + ], + }; +} + +function hookFunctionsNodeEvents(): IWorkflowExecuteHooks { + const eventService = Container.get(EventService); + return { + nodeExecuteBefore: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { executionId, workflowData: workflow } = this; + eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); + }, + ], + nodeExecuteAfter: [ + async function (this: WorkflowHooks, nodeName: string): Promise { + const { executionId, workflowData: workflow } = this; + eventService.emit('node-post-execute', { executionId, workflow, nodeName }); + }, + ], + }; +} + /** * Returns hook functions to push data to Editor-UI */ @@ -166,29 +220,24 @@ function hookFunctionsPreExecute(): IWorkflowExecuteHooks { }; } +/** This should ideally be added before any other `workflowExecuteAfter` hook to ensure all hooks get the same execution status */ +function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks { + return { + workflowExecuteAfter: [ + async function (fullRunData: IRun) { + fullRunData.status = determineFinalExecutionStatus(fullRunData); + }, + ], + }; +} + /** * Returns hook functions to save workflow execution and call error workflow */ function hookFunctionsSave(): IWorkflowExecuteHooks { const logger = Container.get(Logger); const workflowStatisticsService = Container.get(WorkflowStatisticsService); - const eventService = Container.get(EventService); return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - workflowExecuteBefore: [], workflowExecuteAfter: [ async function ( this: WorkflowHooks, @@ -222,9 +271,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { } } - const executionStatus = determineFinalExecutionStatus(fullRunData); - fullRunData.status = executionStatus; - const saveSettings = toSaveSettings(this.workflowData.settings); if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { @@ -243,8 +289,8 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { } const shouldNotSave = - (executionStatus === 'success' && !saveSettings.success) || - (executionStatus !== 'success' && !saveSettings.error); + (fullRunData.status === 'success' && !saveSettings.success) || + (fullRunData.status !== 'success' && !saveSettings.error); if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { executeErrorWorkflow( @@ -268,7 +314,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { const fullExecutionData = prepareExecutionDataForDbUpdate({ runData: fullRunData, workflowData: this.workflowData, - workflowStatusFinal: executionStatus, + workflowStatusFinal: fullRunData.status, retryOf: this.retryOf, }); @@ -283,23 +329,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { executionData: fullExecutionData, }); - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } - } catch (error) { - Container.get(ErrorReporter).error(error); - logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, { - executionId: this.executionId, - workflowId: this.workflowData.id, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - error, - }); if (!isManualMode) { executeErrorWorkflow( this.workflowData, @@ -333,29 +362,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks { function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { const logger = Container.get(Logger); const workflowStatisticsService = Container.get(WorkflowStatisticsService); - const eventService = Container.get(EventService); return { - nodeExecuteBefore: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-pre-execute', { executionId, workflow, nodeName }); - }, - ], - nodeExecuteAfter: [ - async function (this: WorkflowHooks, nodeName: string): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('node-post-execute', { executionId, workflow, nodeName }); - }, - ], - workflowExecuteBefore: [ - async function (this: WorkflowHooks): Promise { - const { executionId, workflowData } = this; - - eventService.emit('workflow-pre-execute', { executionId, data: workflowData }); - }, - ], workflowExecuteAfter: [ async function ( this: WorkflowHooks, @@ -387,13 +394,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { } } - const workflowStatusFinal = determineFinalExecutionStatus(fullRunData); - fullRunData.status = workflowStatusFinal; - if ( !isManualMode && - workflowStatusFinal !== 'success' && - workflowStatusFinal !== 'waiting' + fullRunData.status !== 'success' && + fullRunData.status !== 'waiting' ) { executeErrorWorkflow( this.workflowData, @@ -409,7 +413,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { const fullExecutionData = prepareExecutionDataForDbUpdate({ runData: fullRunData, workflowData: this.workflowData, - workflowStatusFinal, + workflowStatusFinal: fullRunData.status, retryOf: this.retryOf, }); @@ -423,16 +427,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { workflowId: this.workflowData.id, executionData: fullExecutionData, }); - } catch (error) { - if (!isManualMode) { - executeErrorWorkflow( - this.workflowData, - fullRunData, - this.mode, - this.executionId, - this.retryOf, - ); - } } finally { workflowStatisticsService.emit('workflowExecutionCompleted', { workflowData: this.workflowData, @@ -440,15 +434,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { }); } }, - async function (this: WorkflowHooks, runData: IRun): Promise { - const { executionId, workflowData: workflow } = this; - - eventService.emit('workflow-post-execute', { - workflow, - executionId, - runData, - }); - }, async function (this: WorkflowHooks, fullRunData: IRun) { const externalHooks = Container.get(ExternalHooks); if (externalHooks.exists('workflow.postExecute')) { @@ -485,13 +470,15 @@ export function getWorkflowHooksIntegrated( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, + userId?: string, ): WorkflowHooks { - const hookFunctions = hookFunctionsSave(); - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } + const hookFunctions = mergeHookFunctions( + hookFunctionsWorkflowEvents(userId), + hookFunctionsNodeEvents(), + hookFunctionsFinalizeExecutionStatus(), + hookFunctionsSave(), + hookFunctionsPreExecute(), + ); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); } @@ -502,27 +489,20 @@ export function getWorkflowHooksWorkerExecuter( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, + optionalParameters: IWorkflowHooksOptionalParameters = {}, ): WorkflowHooks { - optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsSaveWorker(); - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } + const toMerge = [ + hookFunctionsNodeEvents(), + hookFunctionsFinalizeExecutionStatus(), + hookFunctionsSaveWorker(), + hookFunctionsPreExecute(), + ]; if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { - const pushHooks = hookFunctionsPush(); - for (const key of Object.keys(pushHooks)) { - if (hookFunctions[key] === undefined) { - hookFunctions[key] = []; - } - // eslint-disable-next-line prefer-spread - hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]); - } + toMerge.push(hookFunctionsPush()); } + const hookFunctions = mergeHookFunctions(...toMerge); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); } @@ -533,67 +513,57 @@ export function getWorkflowHooksWorkerMain( mode: WorkflowExecuteMode, executionId: string, workflowData: IWorkflowBase, - optionalParameters?: IWorkflowHooksOptionalParameters, + optionalParameters: IWorkflowHooksOptionalParameters = {}, ): WorkflowHooks { - optionalParameters = optionalParameters || {}; - const hookFunctions = hookFunctionsPreExecute(); + const hookFunctions = mergeHookFunctions( + hookFunctionsWorkflowEvents(), + hookFunctionsPreExecute(), + hookFunctionsFinalizeExecutionStatus(), + { + workflowExecuteAfter: [ + async function (this: WorkflowHooks, fullRunData: IRun): Promise { + // Don't delete executions before they are finished + if (!fullRunData.finished) return; - // TODO: why are workers pushing to frontend? - // TODO: simplifying this for now to just leave the bare minimum hooks + const saveSettings = toSaveSettings(this.workflowData.settings); - // const hookFunctions = hookFunctionsPush(); - // const preExecuteFunctions = hookFunctionsPreExecute(); - // for (const key of Object.keys(preExecuteFunctions)) { - // if (hookFunctions[key] === undefined) { - // hookFunctions[key] = []; - // } - // hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); - // } + const isManualMode = this.mode === 'manual'; + + if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { + /** + * When manual executions are not being saved, we only soft-delete + * the execution so that the user can access its binary data + * while building their workflow. + * + * The manual execution and its binary data will be hard-deleted + * on the next pruning cycle after the grace period set by + * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. + */ + await Container.get(ExecutionRepository).softDelete(this.executionId); + + return; + } + + const shouldNotSave = + (fullRunData.status === 'success' && !saveSettings.success) || + (fullRunData.status !== 'success' && !saveSettings.error); + + if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { + await Container.get(ExecutionRepository).hardDelete({ + workflowId: this.workflowData.id, + executionId: this.executionId, + }); + } + }, + ], + }, + ); // When running with worker mode, main process executes // Only workflowExecuteBefore + workflowExecuteAfter // So to avoid confusion, we are removing other hooks. hookFunctions.nodeExecuteBefore = []; hookFunctions.nodeExecuteAfter = []; - hookFunctions.workflowExecuteAfter = [ - async function (this: WorkflowHooks, fullRunData: IRun): Promise { - // Don't delete executions before they are finished - if (!fullRunData.finished) return; - - const executionStatus = determineFinalExecutionStatus(fullRunData); - fullRunData.status = executionStatus; - - const saveSettings = toSaveSettings(this.workflowData.settings); - - const isManualMode = this.mode === 'manual'; - - if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { - /** - * When manual executions are not being saved, we only soft-delete - * the execution so that the user can access its binary data - * while building their workflow. - * - * The manual execution and its binary data will be hard-deleted - * on the next pruning cycle after the grace period set by - * `EXECUTIONS_DATA_HARD_DELETE_BUFFER`. - */ - await Container.get(ExecutionRepository).softDelete(this.executionId); - - return; - } - - const shouldNotSave = - (executionStatus === 'success' && !saveSettings.success) || - (executionStatus !== 'success' && !saveSettings.error); - - if (!isManualMode && shouldNotSave && !fullRunData.waitTill) { - await Container.get(ExecutionRepository).hardDelete({ - workflowId: this.workflowData.id, - executionId: this.executionId, - }); - } - }, - ]; return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); } @@ -605,22 +575,14 @@ export function getWorkflowHooksMain( data: IWorkflowExecutionDataProcess, executionId: string, ): WorkflowHooks { - const hookFunctions = hookFunctionsSave(); - const pushFunctions = hookFunctionsPush(); - for (const key of Object.keys(pushFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], pushFunctions[key]); - } - - const preExecuteFunctions = hookFunctionsPreExecute(); - for (const key of Object.keys(preExecuteFunctions)) { - const hooks = hookFunctions[key] ?? []; - hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); - } - - if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = []; - if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = []; - + const hookFunctions = mergeHookFunctions( + hookFunctionsWorkflowEvents(), + hookFunctionsNodeEvents(), + hookFunctionsFinalizeExecutionStatus(), + hookFunctionsSave(), + hookFunctionsPush(), + hookFunctionsPreExecute(), + ); return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { pushRef: data.pushRef, retryOf: data.retryOf as string, diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index c3b3ed8693..e7850c5cb9 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -187,7 +187,6 @@ async function startExecution( const nodeTypes = Container.get(NodeTypes); const activeExecutions = Container.get(ActiveExecutions); - const eventService = Container.get(EventService); const executionRepository = Container.get(ExecutionRepository); const workflowName = workflowData ? workflowData.name : undefined; @@ -209,8 +208,6 @@ async function startExecution( */ await executionRepository.setRunning(executionId); - Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData }); - let data; try { await Container.get(PermissionChecker).check(workflowData.id, workflowData.nodes); @@ -228,6 +225,7 @@ async function startExecution( runData.executionMode, executionId, workflowData, + additionalData.userId, ); additionalDataIntegrated.executionId = executionId; additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager; @@ -310,13 +308,6 @@ async function startExecution( await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); - eventService.emit('workflow-post-execute', { - workflow: workflowData, - executionId, - userId: additionalData.userId, - runData: data, - }); - // subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here if (data.finished === true || data.status === 'waiting') { // Workflow did finish successfully diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 5d80459d93..2248d0eb45 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -21,7 +21,6 @@ import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; -import { EventService } from '@/events/event.service'; import { getWorkflowHooksMain, getWorkflowHooksWorkerExecuter, @@ -54,7 +53,6 @@ export class WorkflowRunner { private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly nodeTypes: NodeTypes, private readonly permissionChecker: PermissionChecker, - private readonly eventService: EventService, private readonly instanceSettings: InstanceSettings, private readonly manualExecutionService: ManualExecutionService, ) {} @@ -169,7 +167,6 @@ export class WorkflowRunner { await this.enqueueExecution(executionId, data, loadStaticData, realtime); } else { await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId); - this.eventService.emit('workflow-pre-execute', { executionId, data }); } // only run these when not in queue mode or when the execution is manual, @@ -182,12 +179,6 @@ export class WorkflowRunner { const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); postExecutePromise .then(async (executionData) => { - this.eventService.emit('workflow-post-execute', { - workflow: data.workflowData, - executionId, - userId: data.userId, - runData: executionData, - }); if (this.externalHooks.exists('workflow.postExecute')) { try { await this.externalHooks.run('workflow.postExecute', [