refactor(core): Consolidate execution lifecycle hooks even more + additional tests (#12898)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2025-02-04 10:17:44 +01:00
committed by GitHub
parent 9bcbc2c2cc
commit 65ec6ae0c8
13 changed files with 364 additions and 310 deletions

View File

@@ -10,7 +10,6 @@ import type {
ITaskData,
IWorkflowBase,
IWorkflowExecuteHooks,
IWorkflowHooksOptionalParameters,
WorkflowExecuteMode,
IWorkflowExecutionDataProcess,
Workflow,
@@ -32,7 +31,13 @@ import {
prepareExecutionDataForDbUpdate,
updateExistingExecution,
} from './shared/shared-hook-functions';
import { toSaveSettings } from './to-save-settings';
import { type ExecutionSaveSettings, toSaveSettings } from './to-save-settings';
type HooksSetupParameters = {
saveSettings: ExecutionSaveSettings;
pushRef?: string;
retryOf?: string;
};
function mergeHookFunctions(...hookFunctions: IWorkflowExecuteHooks[]): IWorkflowExecuteHooks {
const result: IWorkflowExecuteHooks = {
@@ -91,19 +96,16 @@ function hookFunctionsNodeEvents(): IWorkflowExecuteHooks {
/**
* Returns hook functions to push data to Editor-UI
*/
function hookFunctionsPush(): IWorkflowExecuteHooks {
function hookFunctionsPush({ pushRef, retryOf }: HooksSetupParameters): IWorkflowExecuteHooks {
if (!pushRef) return {};
const logger = Container.get(Logger);
const pushInstance = Container.get(Push);
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { pushRef, executionId } = this;
const { executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
@@ -115,12 +117,8 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
const { pushRef, executionId } = this;
const { executionId } = this;
// Push data to session which started workflow after each rendered node
if (pushRef === undefined) {
return;
}
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId,
pushRef,
@@ -135,7 +133,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
],
workflowExecuteBefore: [
async function (this: WorkflowHooks, _workflow, data): Promise<void> {
const { pushRef, executionId } = this;
const { executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
@@ -143,9 +141,6 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId,
});
// Push data to session which started the workflow
if (pushRef === undefined) {
return;
}
pushInstance.send(
{
type: 'executionStarted',
@@ -153,7 +148,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
retryOf,
workflowId,
workflowName,
flattedRunData: data?.resultData.runData
@@ -167,9 +162,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun): Promise<void> {
const { pushRef, executionId } = this;
if (pushRef === undefined) return;
const { executionId } = this;
const { id: workflowId } = this.workflowData;
logger.debug('Executing hook (hookFunctionsPush)', {
executionId,
@@ -192,7 +185,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
};
}
function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
function hookFunctionsExternalHooks(): IWorkflowExecuteHooks {
const externalHooks = Container.get(ExternalHooks);
return {
workflowExecuteBefore: [
@@ -200,6 +193,21 @@ function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
await externalHooks.run('workflow.preExecute', [workflow, this.mode]);
},
],
workflowExecuteAfter: [
async function (this: WorkflowHooks, fullRunData: IRun) {
await externalHooks.run('workflow.postExecute', [
fullRunData,
this.workflowData,
this.executionId,
]);
},
],
};
}
function hookFunctionsSaveProgress({ saveSettings }: HooksSetupParameters): IWorkflowExecuteHooks {
if (!saveSettings.progress) return {};
return {
nodeExecuteAfter: [
async function (
this: WorkflowHooks,
@@ -208,12 +216,11 @@ function hookFunctionsPreExecute(): IWorkflowExecuteHooks {
executionData: IRunExecutionData,
): Promise<void> {
await saveExecutionProgress(
this.workflowData,
this.workflowData.id,
this.executionId,
nodeName,
data,
executionData,
this.pushRef,
);
},
],
@@ -231,11 +238,29 @@ function hookFunctionsFinalizeExecutionStatus(): IWorkflowExecuteHooks {
};
}
function hookFunctionsStatistics(): IWorkflowExecuteHooks {
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
return {
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
/**
* Returns hook functions to save workflow execution and call error workflow
*/
function hookFunctionsSave(): IWorkflowExecuteHooks {
function hookFunctionsSave({
pushRef,
retryOf,
saveSettings,
}: HooksSetupParameters): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const errorReporter = Container.get(ErrorReporter);
const executionRepository = Container.get(ExecutionRepository);
const workflowStaticDataService = Container.get(WorkflowStaticDataService);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
return {
workflowExecuteAfter: [
@@ -257,12 +282,12 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
await workflowStaticDataService.saveStaticDataById(
this.workflowData.id,
newStaticData,
);
} catch (e) {
Container.get(ErrorReporter).error(e);
errorReporter.error(e);
logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (hookFunctionsSave)`,
@@ -271,8 +296,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
}
}
const saveSettings = toSaveSettings(this.workflowData.settings);
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
/**
* When manual executions are not being saved, we only soft-delete
@@ -283,7 +306,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
await executionRepository.softDelete(this.executionId);
return;
}
@@ -298,10 +321,10 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
fullRunData,
this.mode,
this.executionId,
this.retryOf,
retryOf,
);
await Container.get(ExecutionRepository).hardDelete({
await executionRepository.hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
@@ -315,12 +338,12 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal: fullRunData.status,
retryOf: this.retryOf,
retryOf,
});
// When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef;
fullExecutionData.data.pushRef = pushRef;
}
await updateExistingExecution({
@@ -335,7 +358,7 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
fullRunData,
this.mode,
this.executionId,
this.retryOf,
retryOf,
);
}
} finally {
@@ -346,11 +369,6 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
@@ -359,8 +377,13 @@ function hookFunctionsSave(): IWorkflowExecuteHooks {
* for running with queues. Manual executions should never run on queues as
* they are always executed in the main process.
*/
function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
function hookFunctionsSaveWorker({
pushRef,
retryOf,
}: HooksSetupParameters): IWorkflowExecuteHooks {
const logger = Container.get(Logger);
const errorReporter = Container.get(ErrorReporter);
const workflowStaticDataService = Container.get(WorkflowStaticDataService);
const workflowStatisticsService = Container.get(WorkflowStatisticsService);
return {
workflowExecuteAfter: [
@@ -380,16 +403,16 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
if (!isManualMode && isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
await Container.get(WorkflowStaticDataService).saveStaticDataById(
await workflowStaticDataService.saveStaticDataById(
this.workflowData.id,
newStaticData,
);
} catch (e) {
Container.get(ErrorReporter).error(e);
errorReporter.error(e);
logger.error(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem saving the workflow with id "${this.workflowData.id}" to save changed staticData: "${e.message}" (workflowExecuteAfter)`,
{ pushRef: this.pushRef, workflowId: this.workflowData.id },
{ workflowId: this.workflowData.id },
);
}
}
@@ -404,7 +427,7 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
fullRunData,
this.mode,
this.executionId,
this.retryOf,
retryOf,
);
}
@@ -414,12 +437,12 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
runData: fullRunData,
workflowData: this.workflowData,
workflowStatusFinal: fullRunData.status,
retryOf: this.retryOf,
retryOf,
});
// When going into the waiting state, store the pushRef in the execution-data
if (fullRunData.waitTill && isManualMode) {
fullExecutionData.data.pushRef = this.pushRef;
fullExecutionData.data.pushRef = pushRef;
}
await updateExistingExecution({
@@ -434,21 +457,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
});
}
},
async function (this: WorkflowHooks, fullRunData: IRun) {
const externalHooks = Container.get(ExternalHooks);
try {
await externalHooks.run('workflow.postExecute', [
fullRunData,
this.workflowData,
this.executionId,
]);
} catch {}
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
workflowStatisticsService.emit('nodeFetchedData', { workflowId, node });
},
],
};
}
@@ -463,12 +471,15 @@ export function getWorkflowHooksIntegrated(
workflowData: IWorkflowBase,
userId?: string,
): WorkflowHooks {
const saveSettings = toSaveSettings(workflowData.settings);
const hookFunctions = mergeHookFunctions(
hookFunctionsWorkflowEvents(userId),
hookFunctionsNodeEvents(),
hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSave(),
hookFunctionsPreExecute(),
hookFunctionsSave({ saveSettings }),
hookFunctionsSaveProgress({ saveSettings }),
hookFunctionsStatistics(),
hookFunctionsExternalHooks(),
);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
}
@@ -480,21 +491,25 @@ export function getWorkflowHooksWorkerExecuter(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters: IWorkflowHooksOptionalParameters = {},
{ pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): WorkflowHooks {
const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const toMerge = [
hookFunctionsNodeEvents(),
hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSaveWorker(),
hookFunctionsPreExecute(),
hookFunctionsSaveWorker(optionalParameters),
hookFunctionsSaveProgress(optionalParameters),
hookFunctionsStatistics(),
hookFunctionsExternalHooks(),
];
if (mode === 'manual' && Container.get(InstanceSettings).isWorker) {
toMerge.push(hookFunctionsPush());
toMerge.push(hookFunctionsPush(optionalParameters));
}
const hookFunctions = mergeHookFunctions(...toMerge);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
}
/**
@@ -504,11 +519,15 @@ export function getWorkflowHooksWorkerMain(
mode: WorkflowExecuteMode,
executionId: string,
workflowData: IWorkflowBase,
optionalParameters: IWorkflowHooksOptionalParameters = {},
{ pushRef, retryOf }: Omit<HooksSetupParameters, 'saveSettings'> = {},
): WorkflowHooks {
const saveSettings = toSaveSettings(workflowData.settings);
const optionalParameters = { pushRef, retryOf, saveSettings };
const executionRepository = Container.get(ExecutionRepository);
const hookFunctions = mergeHookFunctions(
hookFunctionsWorkflowEvents(),
hookFunctionsPreExecute(),
hookFunctionsSaveProgress(optionalParameters),
hookFunctionsExternalHooks(),
hookFunctionsFinalizeExecutionStatus(),
{
workflowExecuteAfter: [
@@ -516,8 +535,6 @@ export function getWorkflowHooksWorkerMain(
// Don't delete executions before they are finished
if (!fullRunData.finished) return;
const saveSettings = toSaveSettings(this.workflowData.settings);
const isManualMode = this.mode === 'manual';
if (isManualMode && !saveSettings.manual && !fullRunData.waitTill) {
@@ -530,7 +547,7 @@ export function getWorkflowHooksWorkerMain(
* on the next pruning cycle after the grace period set by
* `EXECUTIONS_DATA_HARD_DELETE_BUFFER`.
*/
await Container.get(ExecutionRepository).softDelete(this.executionId);
await executionRepository.softDelete(this.executionId);
return;
}
@@ -540,7 +557,7 @@ export function getWorkflowHooksWorkerMain(
(fullRunData.status !== 'success' && !saveSettings.error);
if (!isManualMode && shouldNotSave && !fullRunData.waitTill) {
await Container.get(ExecutionRepository).hardDelete({
await executionRepository.hardDelete({
workflowId: this.workflowData.id,
executionId: this.executionId,
});
@@ -556,7 +573,7 @@ export function getWorkflowHooksWorkerMain(
hookFunctions.nodeExecuteBefore = [];
hookFunctions.nodeExecuteAfter = [];
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData, optionalParameters);
return new WorkflowHooks(hookFunctions, mode, executionId, workflowData);
}
/**
@@ -566,16 +583,18 @@ export function getWorkflowHooksMain(
data: IWorkflowExecutionDataProcess,
executionId: string,
): WorkflowHooks {
const { pushRef, retryOf } = data;
const saveSettings = toSaveSettings(data.workflowData.settings);
const optionalParameters = { pushRef, retryOf: retryOf ?? undefined, saveSettings };
const hookFunctions = mergeHookFunctions(
hookFunctionsWorkflowEvents(),
hookFunctionsNodeEvents(),
hookFunctionsFinalizeExecutionStatus(),
hookFunctionsSave(),
hookFunctionsPush(),
hookFunctionsPreExecute(),
hookFunctionsSave(optionalParameters),
hookFunctionsPush(optionalParameters),
hookFunctionsSaveProgress(optionalParameters),
hookFunctionsStatistics(),
hookFunctionsExternalHooks(),
);
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData, {
pushRef: data.pushRef,
retryOf: data.retryOf as string,
});
return new WorkflowHooks(hookFunctions, data.executionMode, executionId, data.workflowData);
}