fix(core): Use AbortController to notify nodes to abort execution (#6141)

and add support for cancelling ongoing operations inside a node.

---------
Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-11-24 18:17:06 +01:00
committed by GitHub
parent 0ec67dabf7
commit d2c18c5727
10 changed files with 72 additions and 58 deletions

View File

@@ -2508,6 +2508,19 @@ const getCommonWorkflowFunctions = (
prepareOutputData: async (outputData) => [outputData],
});
const executionCancellationFunctions = (
abortSignal?: AbortSignal,
): Pick<IExecuteFunctions, 'onExecutionCancellation' | 'getExecutionCancelSignal'> => ({
getExecutionCancelSignal: () => abortSignal,
onExecutionCancellation: (handler) => {
const fn = () => {
abortSignal?.removeEventListener('abort', fn);
handler();
};
abortSignal?.addEventListener('abort', fn);
},
});
const getRequestHelperFunctions = (
workflow: Workflow,
node: INode,
@@ -3087,10 +3100,12 @@ export function getExecuteFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
getCredentials(
@@ -3512,10 +3527,12 @@ export function getExecuteSingleFunctions(
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;

View File

@@ -1,9 +1,8 @@
/* eslint-disable @typescript-eslint/prefer-optional-chain */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */
import { setMaxListeners } from 'events';
import PCancelable from 'p-cancelable';
import type {
@@ -44,23 +43,14 @@ import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
export class WorkflowExecute {
runExecutionData: IRunExecutionData;
private status: ExecutionStatus = 'new';
private additionalData: IWorkflowExecuteAdditionalData;
private mode: WorkflowExecuteMode;
private status: ExecutionStatus;
private readonly abortController = new AbortController();
constructor(
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData?: IRunExecutionData,
) {
this.additionalData = additionalData;
this.mode = mode;
this.status = 'new';
this.runExecutionData = runExecutionData || {
private readonly additionalData: IWorkflowExecuteAdditionalData,
private readonly mode: WorkflowExecuteMode,
private runExecutionData: IRunExecutionData = {
startData: {},
resultData: {
runData: {},
@@ -73,8 +63,8 @@ export class WorkflowExecute {
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
},
) {}
/**
* Executes the given workflow.
@@ -830,11 +820,16 @@ export class WorkflowExecute {
let closeFunction: Promise<void> | undefined;
return new PCancelable(async (resolve, reject, onCancel) => {
let gotCancel = false;
// Let as many nodes listen to the abort signal, without getting the MaxListenersExceededWarning
setMaxListeners(Infinity, this.abortController.signal);
onCancel.shouldReject = false;
onCancel(() => {
gotCancel = true;
this.status = 'canceled';
this.abortController.abort();
const fullRunData = this.getFullRunData(startedAt);
void this.executeHook('workflowExecuteAfter', [fullRunData]);
setTimeout(() => resolve(fullRunData), 10);
});
const returnPromise = (async () => {
@@ -881,10 +876,10 @@ export class WorkflowExecute {
this.additionalData.executionTimeoutTimestamp !== undefined &&
Date.now() >= this.additionalData.executionTimeoutTimestamp
) {
gotCancel = true;
this.status = 'canceled';
}
if (gotCancel) {
if (this.status === 'canceled') {
return;
}
@@ -1014,9 +1009,6 @@ export class WorkflowExecute {
}
for (let tryIndex = 0; tryIndex < maxTries; tryIndex++) {
if (gotCancel) {
return;
}
try {
if (tryIndex !== 0) {
// Reset executionError from previous error try
@@ -1052,6 +1044,7 @@ export class WorkflowExecute {
this.additionalData,
NodeExecuteFunctions,
this.mode,
this.abortController.signal,
);
nodeSuccessData = runNodeData.data;
@@ -1089,6 +1082,7 @@ export class WorkflowExecute {
this.additionalData,
executionData,
this.mode,
this.abortController.signal,
);
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
@@ -1644,7 +1638,7 @@ export class WorkflowExecute {
return;
})()
.then(async () => {
if (gotCancel && executionError === undefined) {
if (this.status === 'canceled' && executionError === undefined) {
return this.processSuccessExecution(
startedAt,
workflow,