refactor(core): Move all execution lifecycle telemetry events to lifecycle hooks (no-changelog) (#12816)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-01-28 13:45:30 +01:00
committed by GitHub
parent 49b8693d7c
commit 69a97bd32d
4 changed files with 273 additions and 245 deletions

View File

@@ -25,6 +25,7 @@ import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.serv
import { mockInstance } from '@test/mocking'; import { mockInstance } from '@test/mocking';
import { import {
getWorkflowHooksIntegrated,
getWorkflowHooksMain, getWorkflowHooksMain,
getWorkflowHooksWorkerExecuter, getWorkflowHooksWorkerExecuter,
getWorkflowHooksWorkerMain, getWorkflowHooksWorkerMain,
@@ -85,6 +86,8 @@ describe('Execution Lifecycle Hooks', () => {
const now = new Date('2025-01-13T18:25:50.267Z'); const now = new Date('2025-01-13T18:25:50.267Z');
jest.useFakeTimers({ now }); jest.useFakeTimers({ now });
let hooks: WorkflowHooks;
beforeEach(() => { beforeEach(() => {
jest.clearAllMocks(); jest.clearAllMocks();
workflowData.settings = {}; workflowData.settings = {};
@@ -101,8 +104,62 @@ describe('Execution Lifecycle Hooks', () => {
}; };
}); });
const workflowEventTests = () => {
describe('workflowExecuteBefore', () => {
it('should emit workflow-pre-execute events', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
expect(eventService.emit).toHaveBeenCalledWith('workflow-pre-execute', {
executionId,
data: workflowData,
});
});
});
describe('workflowExecuteAfter', () => {
it('should emit workflow-post-execute events', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(eventService.emit).toHaveBeenCalledWith('workflow-post-execute', {
executionId,
runData: successfulRun,
workflow: workflowData,
});
});
});
};
const nodeEventsTests = () => {
describe('nodeExecuteBefore', () => {
it('should emit node-pre-execute event', async () => {
await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]);
expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', {
executionId,
workflow: workflowData,
nodeName,
});
});
});
describe('nodeExecuteAfter', () => {
it('should emit node-post-execute event', async () => {
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', {
executionId,
workflow: workflowData,
nodeName,
});
});
});
};
describe('getWorkflowHooksMain', () => { describe('getWorkflowHooksMain', () => {
let hooks: WorkflowHooks;
beforeEach(() => { beforeEach(() => {
hooks = getWorkflowHooksMain( hooks = getWorkflowHooksMain(
{ {
@@ -115,6 +172,9 @@ describe('Execution Lifecycle Hooks', () => {
); );
}); });
workflowEventTests();
nodeEventsTests();
it('should setup the correct set of hooks', () => { it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks).toBeInstanceOf(WorkflowHooks);
expect(hooks.mode).toBe('manual'); expect(hooks.mode).toBe('manual');
@@ -126,10 +186,10 @@ describe('Execution Lifecycle Hooks', () => {
const { hookFunctions } = hooks; const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(2); expect(hookFunctions.nodeExecuteBefore).toHaveLength(2);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(3); expect(hookFunctions.nodeExecuteAfter).toHaveLength(3);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(2); expect(hookFunctions.workflowExecuteBefore).toHaveLength(3);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(2); expect(hookFunctions.workflowExecuteAfter).toHaveLength(4);
expect(hookFunctions.nodeFetchedData).toHaveLength(1); expect(hookFunctions.nodeFetchedData).toHaveLength(1);
expect(hookFunctions.sendResponse).toBeUndefined(); expect(hookFunctions.sendResponse).toHaveLength(0);
}); });
describe('nodeExecuteBefore', () => { describe('nodeExecuteBefore', () => {
@@ -141,16 +201,6 @@ describe('Execution Lifecycle Hooks', () => {
pushRef, pushRef,
); );
}); });
it('should emit node-pre-execute event', async () => {
await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]);
expect(eventService.emit).toHaveBeenCalledWith('node-pre-execute', {
executionId,
workflow: workflowData,
nodeName,
});
});
}); });
describe('nodeExecuteAfter', () => { describe('nodeExecuteAfter', () => {
@@ -167,20 +217,6 @@ describe('Execution Lifecycle Hooks', () => {
); );
}); });
it('should emit node-post-execute event', async () => {
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
expect(eventService.emit).toHaveBeenCalledWith('node-post-execute', {
executionId,
workflow: workflowData,
nodeName,
});
});
it('should save execution progress when enabled', async () => { it('should save execution progress when enabled', async () => {
workflowData.settings = { saveExecutionProgress: true }; workflowData.settings = { saveExecutionProgress: true };
@@ -230,12 +266,6 @@ describe('Execution Lifecycle Hooks', () => {
); );
}); });
it('should not call eventService', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
expect(eventService.emit).not.toHaveBeenCalled();
});
it('should run workflow.preExecute external hook', async () => { it('should run workflow.preExecute external hook', async () => {
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]); await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
@@ -249,7 +279,6 @@ describe('Execution Lifecycle Hooks', () => {
describe('workflowExecuteAfter', () => { describe('workflowExecuteAfter', () => {
it('should send executionFinished push event', async () => { it('should send executionFinished push event', async () => {
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]); await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(eventService.emit).not.toHaveBeenCalled();
expect(push.send).toHaveBeenCalledWith( expect(push.send).toHaveBeenCalledWith(
{ {
type: 'executionFinished', type: 'executionFinished',
@@ -320,15 +349,6 @@ describe('Execution Lifecycle Hooks', () => {
); );
}); });
it('should handle errors when updating execution', async () => {
const error = new Error('Failed to update execution');
executionRepository.updateExistingExecution.mockRejectedValueOnce(error);
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(errorReporter.error).toHaveBeenCalledWith(error);
});
it('should not delete unfinished executions', async () => { it('should not delete unfinished executions', async () => {
const unfinishedRun = mock<IRun>({ finished: false, status: 'running' }); const unfinishedRun = mock<IRun>({ finished: false, status: 'running' });
@@ -457,11 +477,28 @@ describe('Execution Lifecycle Hooks', () => {
}); });
}); });
}); });
describe("when pushRef isn't set", () => {
beforeEach(() => {
hooks = getWorkflowHooksMain({ executionMode, workflowData }, executionId);
});
it('should not send any push events', async () => {
await hooks.executeHookFunctions('nodeExecuteBefore', [nodeName]);
await hooks.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
runExecutionData,
]);
await hooks.executeHookFunctions('workflowExecuteBefore', [workflow, runExecutionData]);
await hooks.executeHookFunctions('workflowExecuteAfter', [successfulRun, {}]);
expect(push.send).not.toHaveBeenCalled();
});
});
}); });
describe('getWorkflowHooksWorkerMain', () => { describe('getWorkflowHooksWorkerMain', () => {
let hooks: WorkflowHooks;
beforeEach(() => { beforeEach(() => {
hooks = getWorkflowHooksWorkerMain(executionMode, executionId, workflowData, { hooks = getWorkflowHooksWorkerMain(executionMode, executionId, workflowData, {
pushRef, pushRef,
@@ -469,6 +506,8 @@ describe('Execution Lifecycle Hooks', () => {
}); });
}); });
workflowEventTests();
it('should setup the correct set of hooks', () => { it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks); expect(hooks).toBeInstanceOf(WorkflowHooks);
expect(hooks.mode).toBe('manual'); expect(hooks.mode).toBe('manual');
@@ -480,8 +519,10 @@ describe('Execution Lifecycle Hooks', () => {
const { hookFunctions } = hooks; const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(0); expect(hookFunctions.nodeExecuteBefore).toHaveLength(0);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(0); expect(hookFunctions.nodeExecuteAfter).toHaveLength(0);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(1); expect(hookFunctions.workflowExecuteBefore).toHaveLength(2);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(1); expect(hookFunctions.workflowExecuteAfter).toHaveLength(3);
expect(hookFunctions.nodeFetchedData).toHaveLength(0);
expect(hookFunctions.sendResponse).toHaveLength(0);
}); });
describe('workflowExecuteBefore', () => { describe('workflowExecuteBefore', () => {
@@ -535,8 +576,6 @@ describe('Execution Lifecycle Hooks', () => {
}); });
describe('getWorkflowHooksWorkerExecuter', () => { describe('getWorkflowHooksWorkerExecuter', () => {
let hooks: WorkflowHooks;
beforeEach(() => { beforeEach(() => {
hooks = getWorkflowHooksWorkerExecuter(executionMode, executionId, workflowData, { hooks = getWorkflowHooksWorkerExecuter(executionMode, executionId, workflowData, {
pushRef, pushRef,
@@ -544,6 +583,25 @@ describe('Execution Lifecycle Hooks', () => {
}); });
}); });
nodeEventsTests();
it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toEqual('test-push-ref');
expect(hooks.retryOf).toEqual('test-retry-of');
const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(2);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(3);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(2);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(4);
expect(hookFunctions.nodeFetchedData).toHaveLength(1);
expect(hookFunctions.sendResponse).toHaveLength(0);
});
describe('saving static data', () => { describe('saving static data', () => {
it('should skip saving static data for manual executions', async () => { it('should skip saving static data for manual executions', async () => {
hooks.mode = 'manual'; hooks.mode = 'manual';
@@ -614,4 +672,30 @@ describe('Execution Lifecycle Hooks', () => {
}); });
}); });
}); });
describe('getWorkflowHooksIntegrated', () => {
beforeEach(() => {
hooks = getWorkflowHooksIntegrated(executionMode, executionId, workflowData, undefined);
});
workflowEventTests();
nodeEventsTests();
it('should setup the correct set of hooks', () => {
expect(hooks).toBeInstanceOf(WorkflowHooks);
expect(hooks.mode).toBe('manual');
expect(hooks.executionId).toBe(executionId);
expect(hooks.workflowData).toEqual(workflowData);
expect(hooks.pushRef).toBeUndefined();
expect(hooks.retryOf).toBeUndefined();
const { hookFunctions } = hooks;
expect(hookFunctions.nodeExecuteBefore).toHaveLength(1);
expect(hookFunctions.nodeExecuteAfter).toHaveLength(2);
expect(hookFunctions.workflowExecuteBefore).toHaveLength(2);
expect(hookFunctions.workflowExecuteAfter).toHaveLength(3);
expect(hookFunctions.nodeFetchedData).toHaveLength(1);
expect(hookFunctions.sendResponse).toHaveLength(0);
});
});
}); });

View File

@@ -34,6 +34,60 @@ import {
} from './shared/shared-hook-functions'; } from './shared/shared-hook-functions';
import { toSaveSettings } from './to-save-settings'; import { toSaveSettings } from './to-save-settings';
function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks {
const result: IWorkflowExecuteHooks = {
nodeExecuteBefore: [],
nodeExecuteAfter: [],
workflowExecuteBefore: [],
workflowExecuteAfter: [],
sendResponse: [],
nodeFetchedData: [],
};
for (const hooks of hookFunctions) {
for (const key in hooks) {
if (!result[key] || !hooks[key]) continue;
result[key].push(...hooks[key]);
}
}
return result;
}
function hookFunctionsWorkflowEvents(userId?: string): IWorkflowExecuteHooks {
const eventService = Container.get(EventService);
return {
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
const { executionId, workflowData } = this;
eventService.emit('workflow-pre-execute', { executionId, data: workflowData });
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('workflow-post-execute', { executionId, runData, workflow, userId });
},
],
};
}
function hookFunctionsNodeEvents(): IWorkflowExecuteHooks {
const eventService = Container.get(EventService);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
};
}
/** /**
* Returns hook functions to push data to Editor-UI * Returns hook functions to push data to Editor-UI
*/ */
@@ -166,29 +220,24 @@ function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
}; };
} }
/** This should ideally be added before any other `workflowExecuteAfter` hook to ensure all hooks get the same execution status */
function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks {
return {
workflowExecuteAfter: [
async function (fullRunData: IRun) {
fullRunData.status = determineFinalExecutionStatus(fullRunData);
},
],
};
}
/** /**
* Returns hook functions to save workflow execution and call error workflow * Returns hook functions to save workflow execution and call error workflow
*/ */
function hookFunctionsSave(): IWorkflowExecuteHooks { function hookFunctionsSave(): IWorkflowExecuteHooks {
const logger = Container.get(Logger); const logger = Container.get(Logger);
const workflowStatisticsService = Container.get(WorkflowStatisticsService); const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return { return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [],
workflowExecuteAfter: [ workflowExecuteAfter: [
async function ( async function (
this: WorkflowHooks, this: WorkflowHooks,
@@ -222,9 +271,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
} }
} }
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings); const saveSettings = toSaveSettings(this.workflowData.settings);
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) { if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
@@ -243,8 +289,8 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
} }
const shouldNotSave = const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) || (fullRunData.status === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error); (fullRunData.status !== 'success' && !saveSettings.error);
if (shouldNotSave && !fullRunData.waitTill && !isManualMode) { if (shouldNotSave && !fullRunData.waitTill && !isManualMode) {
executeErrorWorkflow( executeErrorWorkflow(
@@ -268,7 +314,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
const fullExecutionData = prepareExecutionDataForDbUpdate({ const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData, runData: fullRunData,
workflowData: this.workflowData, workflowData: this.workflowData,
workflowStatusFinal: executionStatus, workflowStatusFinal: fullRunData.status,
retryOf: this.retryOf, retryOf: this.retryOf,
}); });
@@ -283,23 +329,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
executionData: fullExecutionData, executionData: fullExecutionData,
}); });
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} catch (error) {
Container.get(ErrorReporter).error(error);
logger.error(`Failed saving execution data to DB on execution ID ${this.executionId}`, {
executionId: this.executionId,
workflowId: this.workflowData.id,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
error,
});
if (!isManualMode) { if (!isManualMode) {
executeErrorWorkflow( executeErrorWorkflow(
this.workflowData, this.workflowData,
@@ -333,29 +362,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks { function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const logger = Container.get(Logger); const logger = Container.get(Logger);
const workflowStatisticsService = Container.get(WorkflowStatisticsService); const workflowStatisticsService = Container.get(WorkflowStatisticsService);
const eventService = Container.get(EventService);
return { return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-pre-execute', { executionId, workflow, nodeName });
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('node-post-execute', { executionId, workflow, nodeName });
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
const { executionId, workflowData } = this;
eventService.emit('workflow-pre-execute', { executionId, data: workflowData });
},
],
workflowExecuteAfter: [ workflowExecuteAfter: [
async function ( async function (
this: WorkflowHooks, this: WorkflowHooks,
@@ -387,13 +394,10 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
} }
} }
const workflowStatusFinal = determineFinalExecutionStatus(fullRunData);
fullRunData.status = workflowStatusFinal;
if ( if (
!isManualMode && !isManualMode &&
workflowStatusFinal !== 'success' && fullRunData.status !== 'success' &&
workflowStatusFinal !== 'waiting' fullRunData.status !== 'waiting'
) { ) {
executeErrorWorkflow( executeErrorWorkflow(
this.workflowData, this.workflowData,
@@ -409,7 +413,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
const fullExecutionData = prepareExecutionDataForDbUpdate({ const fullExecutionData = prepareExecutionDataForDbUpdate({
runData: fullRunData, runData: fullRunData,
workflowData: this.workflowData, workflowData: this.workflowData,
workflowStatusFinal, workflowStatusFinal: fullRunData.status,
retryOf: this.retryOf, retryOf: this.retryOf,
}); });
@@ -423,16 +427,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id, workflowId: this.workflowData.id,
executionData: fullExecutionData, executionData: fullExecutionData,
}); });
} catch (error) {
if (!isManualMode) {
executeErrorWorkflow(
this.workflowData,
fullRunData,
this.mode,
this.executionId,
this.retryOf,
);
}
} finally { } finally {
workflowStatisticsService.emit('workflowExecutionCompleted', { workflowStatisticsService.emit('workflowExecutionCompleted', {
workflowData: this.workflowData, workflowData: this.workflowData,
@@ -440,15 +434,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
}); });
} }
}, },
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;
eventService.emit('workflow-post-execute', {
workflow,
executionId,
runData,
});
},
async function (this: WorkflowHooks, fullRunData: IRun) { async function (this: WorkflowHooks, fullRunData: IRun) {
const externalHooks = Container.get(ExternalHooks); const externalHooks = Container.get(ExternalHooks);
if (externalHooks.exists('workflow.postExecute')) { if (externalHooks.exists('workflow.postExecute')) {
@@ -485,13 +470,15 @@ export function getWorkflowHooksIntegrated(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
executionId: string, executionId: string,
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
userId?: string,
): WorkflowHooks { ): WorkflowHooks {
const hookFunctions = hookFunctionsSave(); const hookFunctions = mergeHookFunctions(
const preExecuteFunctions = hookFunctionsPreExecute(); hookFunctionsWorkflowEvents(userId),
for (const key of Object.keys(preExecuteFunctions)) { hookFunctionsNodeEvents(),
const hooks = hookFunctions[key] ?? []; hookFunctionsFinalizeExecutionStatus(),
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); hookFunctionsSave(),
} hookFunctionsPreExecute(),
);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
} }
@@ -502,27 +489,20 @@ export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
executionId: string, executionId: string,
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters, optionalParameters: IWorkflowHooksOptionalParameters = {},
): WorkflowHooks { ): WorkflowHooks {
optionalParameters = optionalParameters || {}; const toMerge = [
const hookFunctions = hookFunctionsSaveWorker(); hookFunctionsNodeEvents(),
const preExecuteFunctions = hookFunctionsPreExecute(); hookFunctionsFinalizeExecutionStatus(),
for (const key of Object.keys(preExecuteFunctions)) { hookFunctionsSaveWorker(),
const hooks = hookFunctions[key] ?? []; hookFunctionsPreExecute(),
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]); ];
}
if (mode === 'manual' && Container.get(InstanceSettings).isWorker) { if (mode === 'manual' && Container.get(InstanceSettings).isWorker) {
const pushHooks = hookFunctionsPush(); toMerge.push(hookFunctionsPush());
for (const key of Object.keys(pushHooks)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
// eslint-disable-next-line prefer-spread
hookFunctions[key].push.apply(hookFunctions[key], pushHooks[key]);
}
} }
const hookFunctions = mergeHookFunctions(...toMerge);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
} }
@@ -533,67 +513,57 @@ export function getWorkflowHooksWorkerMain(
mode: WorkflowExecuteMode, mode: WorkflowExecuteMode,
executionId: string, executionId: string,
workflowData: IWorkflowBase, workflowData: IWorkflowBase,
optionalParameters?: IWorkflowHooksOptionalParameters, optionalParameters: IWorkflowHooksOptionalParameters = {},
): WorkflowHooks { ): WorkflowHooks {
optionalParameters = optionalParameters || {}; const hookFunctions = mergeHookFunctions(
const hookFunctions = hookFunctionsPreExecute(); hookFunctionsWorkflowEvents(),
hookFunctionsPreExecute(),
hookFunctionsFinalizeExecutionStatus(),
{
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
// TODO: why are workers pushing to frontend? const saveSettings = toSaveSettings(this.workflowData.settings);
// TODO: simplifying this for now to just leave the bare minimum hooks
// const hookFunctions = hookFunctionsPush(); const isManualMode = this.mode === 'manual';
// const preExecuteFunctions = hookFunctionsPreExecute();
// for (const key of Object.keys(preExecuteFunctions)) { if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
// if (hookFunctions[key] === undefined) { /**
// hookFunctions[key] = []; * When manual executions are not being saved, we only soft-delete
// } * the execution so that the user can access its binary data
// hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]); * while building their workflow.
// } *
* The manual execution and its binary data will be hard-deleted
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
return;
}
const shouldNotSave =
(fullRunData.status === 'success' && !saveSettings.success) ||
(fullRunData.status !== 'success' && !saveSettings.error);
if (!isManualMode && shouldNotSave && !fullRunData.waitTill) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
}
},
],
},
);
// When running with worker mode, main process executes // When running with worker mode, main process executes
// Only workflowExecuteBefore + workflowExecuteAfter // Only workflowExecuteBefore + workflowExecuteAfter
// So to avoid confusion, we are removing other hooks. // So to avoid confusion, we are removing other hooks.
hookFunctions.nodeExecuteBefore = []; hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = []; hookFunctions.nodeExecuteAfter = [];
hookFunctions.workflowExecuteAfter = [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const executionStatus = determineFinalExecutionStatus(fullRunData);
fullRunData.status = executionStatus;
const saveSettings = toSaveSettings(this.workflowData.settings);
const isManualMode = this.mode === 'manual';
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
/**
* When manual executions are not being saved, we only soft-delete
* the execution so that the user can access its binary data
* while building their workflow.
*
* The manual execution and its binary data will be hard-deleted
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
return;
}
const shouldNotSave =
(executionStatus === 'success' && !saveSettings.success) ||
(executionStatus !== 'success' && !saveSettings.error);
if (!isManualMode && shouldNotSave && !fullRunData.waitTill) {
await Container.get(ExecutionRepository).hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
}
},
];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters); return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
} }
@@ -605,22 +575,14 @@ export function getWorkflowHooksMain(
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
executionId: string, executionId: string,
): WorkflowHooks { ): WorkflowHooks {
const hookFunctions = hookFunctionsSave(); const hookFunctions = mergeHookFunctions(
const pushFunctions = hookFunctionsPush(); hookFunctionsWorkflowEvents(),
for (const key of Object.keys(pushFunctions)) { hookFunctionsNodeEvents(),
const hooks = hookFunctions[key] ?? []; hookFunctionsFinalizeExecutionStatus(),
hooks.push.apply(hookFunctions[key], pushFunctions[key]); hookFunctionsSave(),
} hookFunctionsPush(),
hookFunctionsPreExecute(),
const preExecuteFunctions = hookFunctionsPreExecute(); );
for (const key of Object.keys(preExecuteFunctions)) {
const hooks = hookFunctions[key] ?? [];
hooks.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
if (!hookFunctions.nodeExecuteBefore) hookFunctions.nodeExecuteBefore = [];
if (!hookFunctions.nodeExecuteAfter) hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, { return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
pushRef: data.pushRef, pushRef: data.pushRef,
retryOf: data.retryOf as string, retryOf: data.retryOf as string,

View File

@@ -187,7 +187,6 @@ async function startExecution(
const nodeTypes = Container.get(NodeTypes); const nodeTypes = Container.get(NodeTypes);
const activeExecutions = Container.get(ActiveExecutions); const activeExecutions = Container.get(ActiveExecutions);
const eventService = Container.get(EventService);
const executionRepository = Container.get(ExecutionRepository); const executionRepository = Container.get(ExecutionRepository);
const workflowName = workflowData ? workflowData.name : undefined; const workflowName = workflowData ? workflowData.name : undefined;
@@ -209,8 +208,6 @@ async function startExecution(
*/ */
await executionRepository.setRunning(executionId); await executionRepository.setRunning(executionId);
Container.get(EventService).emit('workflow-pre-execute', { executionId, data: runData });
let data; let data;
try { try {
await Container.get(PermissionChecker).check(workflowData.id, workflowData.nodes); await Container.get(PermissionChecker).check(workflowData.id, workflowData.nodes);
@@ -228,6 +225,7 @@ async function startExecution(
runData.executionMode, runData.executionMode,
executionId, executionId,
workflowData, workflowData,
additionalData.userId,
); );
additionalDataIntegrated.executionId = executionId; additionalDataIntegrated.executionId = executionId;
additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager; additionalDataIntegrated.parentCallbackManager = options.parentCallbackManager;
@@ -310,13 +308,6 @@ async function startExecution(
await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]); await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);
eventService.emit('workflow-post-execute', {
workflow: workflowData,
executionId,
userId: additionalData.userId,
runData: data,
});
// subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here // subworkflow either finished, or is in status waiting due to a wait node, both cases are considered successes here
if (data.finished === true || data.status === 'waiting') { if (data.finished === true || data.status === 'waiting') {
// Workflow did finish successfully // Workflow did finish successfully

View File

@@ -21,7 +21,6 @@ import { ActiveExecutions } from '@/active-executions';
import config from '@/config'; import config from '@/config';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import { ExecutionNotFoundError } from '@/errors/execution-not-found-error';
import { EventService } from '@/events/event.service';
import { import {
getWorkflowHooksMain, getWorkflowHooksMain,
getWorkflowHooksWorkerExecuter, getWorkflowHooksWorkerExecuter,
@@ -54,7 +53,6 @@ export class WorkflowRunner {
private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly nodeTypes: NodeTypes, private readonly nodeTypes: NodeTypes,
private readonly permissionChecker: PermissionChecker, private readonly permissionChecker: PermissionChecker,
private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
private readonly manualExecutionService: ManualExecutionService, private readonly manualExecutionService: ManualExecutionService,
) {} ) {}
@@ -169,7 +167,6 @@ export class WorkflowRunner {
await this.enqueueExecution(executionId, data, loadStaticData, realtime); await this.enqueueExecution(executionId, data, loadStaticData, realtime);
} else { } else {
await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId); await this.runMainProcess(executionId, data, loadStaticData, restartExecutionId);
this.eventService.emit('workflow-pre-execute', { executionId, data });
} }
// only run these when not in queue mode or when the execution is manual, // only run these when not in queue mode or when the execution is manual,
@@ -182,12 +179,6 @@ export class WorkflowRunner {
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
postExecutePromise postExecutePromise
.then(async (executionData) => { .then(async (executionData) => {
this.eventService.emit('workflow-post-execute', {
workflow: data.workflowData,
executionId,
userId: data.userId,
runData: executionData,
});
if (this.externalHooks.exists('workflow.postExecute')) { if (this.externalHooks.exists('workflow.postExecute')) {
try { try {
await this.externalHooks.run('workflow.postExecute', [ await this.externalHooks.run('workflow.postExecute', [