refactor(core): Split WorkflowExecute.runNode into smaller methods (#17864)

This commit is contained in:
Danny Martini
2025-08-19 10:20:00 +01:00
committed by GitHub
parent 961fc538d7
commit dd55201ee6
3 changed files with 1390 additions and 165 deletions

View File

@@ -3,7 +3,7 @@ import { nodeConfig } from '@n8n/eslint-config/node';
export default defineConfig(
nodeConfig,
globalIgnores(['bin/*.js', 'nodes-testing/*.ts']),
globalIgnores(['bin/*.js', 'nodes-testing/*.ts', 'coverage/*']),
{
rules: {
// TODO: Lower the complexity threshold

File diff suppressed because it is too large Load Diff

View File

@@ -1089,54 +1089,38 @@ export class WorkflowExecute {
return customOperation;
}
/** Executes the given node */
// eslint-disable-next-line complexity
async runNode(
workflow: Workflow,
executionData: IExecuteData,
runExecutionData: IRunExecutionData,
runIndex: number,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
if (node.disabled === true) {
// If node is disabled simply pass the data through
// return NodeRunHelpers.
if (inputData.hasOwnProperty('main') && inputData.main.length > 0) {
// If the node is disabled simply return the data from the first main input
if (inputData.main[0] === null) {
return { data: undefined };
}
return { data: [inputData.main[0]] };
/**
* Handles execution of disabled nodes by passing through input data
*/
private handleDisabledNode(inputData: ITaskDataConnections): IRunNodeResponse {
if (inputData.hasOwnProperty('main') && inputData.main.length > 0) {
// If the node is disabled simply return the data from the first main input
if (inputData.main[0] === null) {
return { data: undefined };
}
return { data: undefined };
return { data: [inputData.main[0]] };
}
return { data: undefined };
}
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const isDeclarativeNode = nodeType.description.requestDefaults !== undefined;
const customOperation = this.getCustomOperation(node, nodeType);
let connectionInputData: INodeExecutionData[] = [];
private prepareConnectionInputData(
workflow: Workflow,
nodeType: INodeType,
customOperation: ReturnType<WorkflowExecute['getCustomOperation']>,
inputData: ITaskDataConnections,
): INodeExecutionData[] | null {
if (
nodeType.execute ||
customOperation ||
(!nodeType.poll && !nodeType.trigger && !nodeType.webhook)
) {
// Only stop if first input is empty for execute runs. For all others run anyways
// because then it is a trigger node. As they only pass data through and so the input-data
// becomes output-data it has to be possible.
if (inputData.main?.length > 0) {
// We always use the data of main input and the first input for execute
connectionInputData = inputData.main[0] as INodeExecutionData[];
if (!inputData.main?.length) {
return null;
}
// We always use the data of main input and the first input for execute
let connectionInputData = inputData.main[0] as INodeExecutionData[];
const forceInputNodeExecution = workflow.settings.executionOrder !== 'v1';
if (!forceInputNodeExecution) {
// If the nodes do not get force executed data of some inputs may be missing
@@ -1150,11 +1134,20 @@ export class WorkflowExecute {
}
if (connectionInputData.length === 0) {
// No data for node so return
return { data: undefined };
return null;
}
return connectionInputData;
}
// For poll, trigger, and webhook nodes, we don't need to process input data
return [];
}
/**
* Handles re-throwing errors from previous node execution attempts
*/
private rethrowLastNodeError(runExecutionData: IRunExecutionData, node: INode): void {
if (
runExecutionData.resultData.lastNodeExecuted === node.name &&
runExecutionData.resultData.error !== undefined
@@ -1173,7 +1166,12 @@ export class WorkflowExecute {
error.stack = runExecutionData.resultData.error.stack;
throw error;
}
}
/**
* Handles executeOnce logic by limiting input data to first item only
*/
private handleExecuteOnce(node: INode, inputData: ITaskDataConnections): ITaskDataConnections {
if (node.executeOnce === true) {
// If node should be executed only once so use only the first input item
const newInputData: ITaskDataConnections = {};
@@ -1182,14 +1180,257 @@ export class WorkflowExecute {
return input && input.slice(0, 1);
});
}
inputData = newInputData;
return newInputData;
}
return inputData;
}
/**
* Validates execution data for JSON compatibility and reports issues to Sentry
*/
private reportJsonIncompatibleOutput(
data: INodeExecutionData[][] | null,
workflow: Workflow,
node: INode,
): void {
if (Container.get(GlobalConfig).sentry.backendDsn) {
// If data is not json compatible then log it as incorrect output
// Does not block the execution from continuing
const jsonCompatibleResult = isJsonCompatible(data, new Set(['pairedItem']));
if (!jsonCompatibleResult.isValid) {
Container.get(ErrorReporter).error('node execution returned incorrect data', {
shouldBeLogged: false,
extra: {
nodeName: node.name,
nodeType: node.type,
nodeVersion: node.typeVersion,
workflowId: workflow.id,
workflowName: workflow.name ?? 'Unnamed workflow',
executionId: this.additionalData.executionId ?? 'unsaved-execution',
errorPath: jsonCompatibleResult.errorPath,
errorMessage: jsonCompatibleResult.errorMessage,
},
});
}
}
}
private async executeNode(
workflow: Workflow,
node: INode,
nodeType: INodeType,
customOperation: ReturnType<WorkflowExecute['getCustomOperation']>,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
inputData: ITaskDataConnections,
executionData: IExecuteData,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const closeFunctions: CloseFunction[] = [];
const context = new ExecuteContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executionData,
closeFunctions,
abortSignal,
);
let data: INodeExecutionData[][] | null = null;
if (customOperation) {
data = await customOperation.call(context);
} else if (nodeType.execute) {
data =
nodeType instanceof Node
? await nodeType.execute(context)
: await nodeType.execute.call(context);
}
if (nodeType.execute || customOperation) {
const closeFunctions: CloseFunction[] = [];
const context = new ExecuteContext(
this.reportJsonIncompatibleOutput(data, workflow, node);
const closeFunctionsResults = await Promise.allSettled(
closeFunctions.map(async (fn) => await fn()),
);
const closingErrors = closeFunctionsResults
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
.map((result) => result.reason);
if (closingErrors.length > 0) {
if (closingErrors[0] instanceof Error) throw closingErrors[0];
throw new ApplicationError("Error on execution node's close function(s)", {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
cause: closingErrors,
});
}
return { data, hints: context.hints };
}
/**
* Executes a poll node
*/
private async executePollNode(
workflow: Workflow,
node: INode,
nodeType: INodeType,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
inputData: ITaskDataConnections,
): Promise<IRunNodeResponse> {
if (mode === 'manual') {
// In manual mode run the poll function
const context = new PollContext(workflow, node, additionalData, mode, 'manual');
return { data: await nodeType.poll!.call(context) };
}
// In any other mode pass data through as it already contains the result of the poll
return { data: inputData.main as INodeExecutionData[][] };
}
/**
* Executes a trigger node
*/
private async executeTriggerNode(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
inputData: ITaskDataConnections,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
if (mode === 'manual') {
// In manual mode start the trigger
const triggerResponse = await Container.get(TriggersAndPollers).runTrigger(
workflow,
node,
NodeExecuteFunctions.getExecuteTriggerFunctions,
additionalData,
mode,
'manual',
);
if (triggerResponse === undefined) {
return { data: null };
}
let closeFunction;
if (triggerResponse.closeFunction) {
// In manual mode we return the trigger closeFunction. That allows it to be called directly
// but we do not have to wait for it to finish. That is important for things like queue-nodes.
// There the full close will may be delayed till a message gets acknowledged after the execution.
// If we would not be able to wait for it to close would it cause problems with "own" mode as the
// process would be killed directly after it and so the acknowledge would not have been finished yet.
closeFunction = triggerResponse.closeFunction;
// Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers
abortSignal?.addEventListener('abort', closeFunction);
}
if (triggerResponse.manualTriggerFunction !== undefined) {
// If a manual trigger function is defined call it and wait till it did run
await triggerResponse.manualTriggerFunction();
}
const response = await triggerResponse.manualTriggerResponse!;
if (response.length === 0) {
return { data: null, closeFunction };
}
return { data: response, closeFunction };
}
// For trigger nodes in any mode except "manual" do we simply pass the data through
return { data: inputData.main as INodeExecutionData[][] };
}
private async executeDeclarativeNodeInTest(
workflow: Workflow,
node: INode,
nodeType: INodeType,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
inputData: ITaskDataConnections,
executionData: IExecuteData,
): Promise<IRunNodeResponse> {
// NOTE: This block is only called by nodes tests.
// In the application, declarative nodes get assigned a `.execute` method in NodeTypes.
const context = new ExecuteContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executionData,
[],
);
const routingNode = new RoutingNode(context, nodeType);
const data = await routingNode.runNode();
return { data };
}
/**
* Figures out the node type and state and calls the right execution
* implementation.
*/
async runNode(
workflow: Workflow,
executionData: IExecuteData,
runExecutionData: IRunExecutionData,
runIndex: number,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): Promise<IRunNodeResponse> {
const { node } = executionData;
let inputData = executionData.data;
if (node.disabled === true) {
return this.handleDisabledNode(inputData);
}
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
const customOperation = this.getCustomOperation(node, nodeType);
const connectionInputData = this.prepareConnectionInputData(
workflow,
nodeType,
customOperation,
inputData,
);
if (connectionInputData === null) {
return { data: undefined };
}
this.rethrowLastNodeError(runExecutionData, node);
inputData = this.handleExecuteOnce(node, inputData);
const isDeclarativeNode = nodeType.description.requestDefaults !== undefined;
if (nodeType.execute || customOperation) {
return await this.executeNode(
workflow,
node,
nodeType,
customOperation,
additionalData,
mode,
runExecutionData,
@@ -1197,137 +1438,44 @@ export class WorkflowExecute {
connectionInputData,
inputData,
executionData,
closeFunctions,
abortSignal,
);
}
let data;
if (nodeType.poll) {
return await this.executePollNode(workflow, node, nodeType, additionalData, mode, inputData);
}
if (customOperation) {
data = await customOperation.call(context);
} else if (nodeType.execute) {
data =
nodeType instanceof Node
? await nodeType.execute(context)
: await nodeType.execute.call(context);
}
if (Container.get(GlobalConfig).sentry.backendDsn) {
// If data is not json compatible then log it as incorrect output
// Does not block the execution from continuing
const jsonCompatibleResult = isJsonCompatible(data, new Set(['pairedItem']));
if (!jsonCompatibleResult.isValid) {
Container.get(ErrorReporter).error('node execution returned incorrect data', {
shouldBeLogged: false,
extra: {
nodeName: node.name,
nodeType: node.type,
nodeVersion: node.typeVersion,
workflowId: workflow.id,
workflowName: workflow.name ?? 'Unnamed workflow',
executionId: this.additionalData.executionId ?? 'unsaved-execution',
errorPath: jsonCompatibleResult.errorPath,
errorMessage: jsonCompatibleResult.errorMessage,
},
});
}
}
const closeFunctionsResults = await Promise.allSettled(
closeFunctions.map(async (fn) => await fn()),
if (nodeType.trigger) {
return await this.executeTriggerNode(
workflow,
node,
additionalData,
mode,
inputData,
abortSignal,
);
}
const closingErrors = closeFunctionsResults
.filter((result): result is PromiseRejectedResult => result.status === 'rejected')
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
.map((result) => result.reason);
if (closingErrors.length > 0) {
if (closingErrors[0] instanceof Error) throw closingErrors[0];
throw new ApplicationError("Error on execution node's close function(s)", {
extra: { nodeName: node.name },
tags: { nodeType: node.type },
cause: closingErrors,
});
}
return { data, hints: context.hints };
} else if (nodeType.poll) {
if (mode === 'manual') {
// In manual mode run the poll function
const context = new PollContext(workflow, node, additionalData, mode, 'manual');
return { data: await nodeType.poll.call(context) };
}
// In any other mode pass data through as it already contains the result of the poll
return { data: inputData.main as INodeExecutionData[][] };
} else if (nodeType.trigger) {
if (mode === 'manual') {
// In manual mode start the trigger
const triggerResponse = await Container.get(TriggersAndPollers).runTrigger(
workflow,
node,
NodeExecuteFunctions.getExecuteTriggerFunctions,
additionalData,
mode,
'manual',
);
if (triggerResponse === undefined) {
return { data: null };
}
let closeFunction;
if (triggerResponse.closeFunction) {
// In manual mode we return the trigger closeFunction. That allows it to be called directly
// but we do not have to wait for it to finish. That is important for things like queue-nodes.
// There the full close will may be delayed till a message gets acknowledged after the execution.
// If we would not be able to wait for it to close would it cause problems with "own" mode as the
// process would be killed directly after it and so the acknowledge would not have been finished yet.
closeFunction = triggerResponse.closeFunction;
// Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers
abortSignal?.addEventListener('abort', closeFunction);
}
if (triggerResponse.manualTriggerFunction !== undefined) {
// If a manual trigger function is defined call it and wait till it did run
await triggerResponse.manualTriggerFunction();
}
const response = await triggerResponse.manualTriggerResponse!;
if (response.length === 0) {
return { data: null, closeFunction };
}
return { data: response, closeFunction };
}
// For trigger nodes in any mode except "manual" do we simply pass the data through
return { data: inputData.main as INodeExecutionData[][] };
} else if (nodeType.webhook && !isDeclarativeNode) {
if (nodeType.webhook && !isDeclarativeNode) {
// Check if the node have requestDefaults(Declarative Node),
// else for webhook nodes always simply pass the data through
// as webhook method would be called by WebhookService
return { data: inputData.main as INodeExecutionData[][] };
} else {
// NOTE: This block is only called by nodes tests.
// In the application, declarative nodes get assigned a `.execute` method in NodeTypes.
const context = new ExecuteContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executionData,
[],
);
const routingNode = new RoutingNode(context, nodeType);
const data = await routingNode.runNode();
return { data };
}
return await this.executeDeclarativeNodeInTest(
workflow,
node,
nodeType,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executionData,
);
}
/**