mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-19 11:01:15 +00:00
refactor(core): Add more workflow engine tests (no-changelog) (#12385)
Co-authored-by: Danny Martini <danny@n8n.io>
This commit is contained in:
committed by
GitHub
parent
09ddce0580
commit
11e8520b70
@@ -417,6 +417,17 @@ export class WorkflowExecute {
|
||||
return await this.additionalData.hooks.executeHookFunctions(hookName, parameters);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges temporary execution metadata into the final runData structure.
|
||||
* During workflow execution, metadata is collected in a temporary location
|
||||
* (executionData.metadata). This method moves that metadata to its final
|
||||
* location in the resultData.runData for each node.
|
||||
*
|
||||
* @remarks
|
||||
* - Metadata from multiple runs is preserved using run indices
|
||||
* - Existing metadata in runData is preserved and merged with new metadata
|
||||
* - If no metadata exists, the operation is a no-op
|
||||
*/
|
||||
moveNodeMetadata(): void {
|
||||
const metadata = get(this.runExecutionData, 'executionData.metadata');
|
||||
|
||||
@@ -437,14 +448,27 @@ export class WorkflowExecute {
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the incoming connection does not receive any data
|
||||
* Checks if all incoming connections to a node are empty (have no data).
|
||||
* This is used to determine if a node should be executed or skipped.
|
||||
*
|
||||
* @param runData - The execution data from all nodes in the workflow
|
||||
* @param inputConnections - Array of connections to check
|
||||
* @param runIndex - Index of the current execution run (nodes can execute multiple times)
|
||||
*
|
||||
* @returns `true` if all connections are empty (no data), `false` if any connection has data
|
||||
*
|
||||
* @remarks
|
||||
* A connection is considered empty when:
|
||||
* - The source node doesn't exist in runData
|
||||
* - The source node's data is undefined
|
||||
* - The source node's output array is empty
|
||||
* - The specified output index contains no items
|
||||
*/
|
||||
incomingConnectionIsEmpty(
|
||||
runData: IRunData,
|
||||
inputConnections: IConnection[],
|
||||
runIndex: number,
|
||||
): boolean {
|
||||
// for (const inputConnection of workflow.connectionsByDestinationNode[nodeToAdd].main[0]) {
|
||||
for (const inputConnection of inputConnections) {
|
||||
const nodeIncomingData = get(runData, [
|
||||
inputConnection.node,
|
||||
@@ -460,24 +484,29 @@ export class WorkflowExecute {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares the waiting execution data structure for a node that needs to wait for data before it can execute.
|
||||
* This function initializes arrays to store data and metadata for each connection of the node.
|
||||
*
|
||||
* @param nodeName - The name of the node to prepare waiting execution for
|
||||
* @param numberOfConnections - Number of input connections the node has
|
||||
* @param runIndex - The index of the current run (for nodes that may run multiple times)
|
||||
*/
|
||||
prepareWaitingToExecution(nodeName: string, numberOfConnections: number, runIndex: number) {
|
||||
if (!this.runExecutionData.executionData!.waitingExecutionSource) {
|
||||
this.runExecutionData.executionData!.waitingExecutionSource = {};
|
||||
}
|
||||
const executionData = this.runExecutionData.executionData!;
|
||||
|
||||
this.runExecutionData.executionData!.waitingExecution[nodeName][runIndex] = {
|
||||
main: [],
|
||||
};
|
||||
this.runExecutionData.executionData!.waitingExecutionSource[nodeName][runIndex] = {
|
||||
main: [],
|
||||
};
|
||||
executionData.waitingExecution ??= {};
|
||||
executionData.waitingExecutionSource ??= {};
|
||||
|
||||
const nodeWaiting = (executionData.waitingExecution[nodeName] ??= []);
|
||||
const nodeWaitingSource = (executionData.waitingExecutionSource[nodeName] ??= []);
|
||||
|
||||
nodeWaiting[runIndex] = { main: [] };
|
||||
nodeWaitingSource[runIndex] = { main: [] };
|
||||
|
||||
for (let i = 0; i < numberOfConnections; i++) {
|
||||
this.runExecutionData.executionData!.waitingExecution[nodeName][runIndex].main.push(null);
|
||||
|
||||
this.runExecutionData.executionData!.waitingExecutionSource[nodeName][runIndex].main.push(
|
||||
null,
|
||||
);
|
||||
nodeWaiting[runIndex].main.push(null);
|
||||
nodeWaitingSource[runIndex].main.push(null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1489,119 +1518,7 @@ export class WorkflowExecute {
|
||||
}
|
||||
|
||||
if (nodeSuccessData && executionData.node.onError === 'continueErrorOutput') {
|
||||
// If errorOutput is activated check all the output items for error data.
|
||||
// If any is found, route them to the last output as that will be the
|
||||
// error output.
|
||||
|
||||
const nodeType = workflow.nodeTypes.getByNameAndVersion(
|
||||
executionData.node.type,
|
||||
executionData.node.typeVersion,
|
||||
);
|
||||
const outputs = NodeHelpers.getNodeOutputs(
|
||||
workflow,
|
||||
executionData.node,
|
||||
nodeType.description,
|
||||
);
|
||||
const outputTypes = NodeHelpers.getConnectionTypes(outputs);
|
||||
const mainOutputTypes = outputTypes.filter(
|
||||
(output) => output === NodeConnectionType.Main,
|
||||
);
|
||||
|
||||
const errorItems: INodeExecutionData[] = [];
|
||||
const closeFunctions: CloseFunction[] = [];
|
||||
// Create a WorkflowDataProxy instance that we can get the data of the
|
||||
// item which did error
|
||||
const executeFunctions = new ExecuteContext(
|
||||
workflow,
|
||||
executionData.node,
|
||||
this.additionalData,
|
||||
this.mode,
|
||||
this.runExecutionData,
|
||||
runIndex,
|
||||
[],
|
||||
executionData.data,
|
||||
executionData,
|
||||
closeFunctions,
|
||||
this.abortController.signal,
|
||||
);
|
||||
|
||||
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
|
||||
|
||||
// Loop over all outputs except the error output as it would not contain data by default
|
||||
for (
|
||||
let outputIndex = 0;
|
||||
outputIndex < mainOutputTypes.length - 1;
|
||||
outputIndex++
|
||||
) {
|
||||
const successItems: INodeExecutionData[] = [];
|
||||
const items = nodeSuccessData[outputIndex]?.length
|
||||
? nodeSuccessData[outputIndex]
|
||||
: [];
|
||||
|
||||
while (items.length) {
|
||||
const item = items.shift();
|
||||
if (item === undefined) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let errorData: GenericValue | undefined;
|
||||
if (item.error) {
|
||||
errorData = item.error;
|
||||
item.error = undefined;
|
||||
} else if (item.json.error && Object.keys(item.json).length === 1) {
|
||||
errorData = item.json.error;
|
||||
} else if (
|
||||
item.json.error &&
|
||||
item.json.message &&
|
||||
Object.keys(item.json).length === 2
|
||||
) {
|
||||
errorData = item.json.error;
|
||||
}
|
||||
|
||||
if (errorData) {
|
||||
const pairedItemData =
|
||||
item.pairedItem && typeof item.pairedItem === 'object'
|
||||
? Array.isArray(item.pairedItem)
|
||||
? item.pairedItem[0]
|
||||
: item.pairedItem
|
||||
: undefined;
|
||||
|
||||
if (executionData!.source === null || pairedItemData === undefined) {
|
||||
// Source data is missing for some reason so we can not figure out the item
|
||||
errorItems.push(item);
|
||||
} else {
|
||||
const pairedItemInputIndex = pairedItemData.input || 0;
|
||||
|
||||
const sourceData =
|
||||
executionData!.source[NodeConnectionType.Main][pairedItemInputIndex];
|
||||
|
||||
const constPairedItem = dataProxy.$getPairedItem(
|
||||
sourceData!.previousNode,
|
||||
sourceData,
|
||||
pairedItemData,
|
||||
);
|
||||
|
||||
if (constPairedItem === null) {
|
||||
errorItems.push(item);
|
||||
} else {
|
||||
errorItems.push({
|
||||
...item,
|
||||
json: {
|
||||
...constPairedItem.json,
|
||||
...item.json,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
successItems.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
nodeSuccessData[outputIndex] = successItems;
|
||||
}
|
||||
|
||||
nodeSuccessData[mainOutputTypes.length - 1] = errorItems;
|
||||
this.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, runIndex);
|
||||
}
|
||||
|
||||
if (runNodeData.closeFunction) {
|
||||
@@ -1616,53 +1533,7 @@ export class WorkflowExecute {
|
||||
workflowId: workflow.id,
|
||||
});
|
||||
|
||||
if (nodeSuccessData?.length) {
|
||||
// Check if the output data contains pairedItem data and if not try
|
||||
// to automatically fix it
|
||||
|
||||
const isSingleInputAndOutput =
|
||||
executionData.data.main.length === 1 && executionData.data.main[0]?.length === 1;
|
||||
|
||||
const isSameNumberOfItems =
|
||||
nodeSuccessData.length === 1 &&
|
||||
executionData.data.main.length === 1 &&
|
||||
executionData.data.main[0]?.length === nodeSuccessData[0].length;
|
||||
|
||||
checkOutputData: for (const outputData of nodeSuccessData) {
|
||||
if (outputData === null) {
|
||||
continue;
|
||||
}
|
||||
for (const [index, item] of outputData.entries()) {
|
||||
if (item.pairedItem === undefined) {
|
||||
// The pairedItem data is missing, so check if it can get automatically fixed
|
||||
if (isSingleInputAndOutput) {
|
||||
// The node has one input and one incoming item, so we know
|
||||
// that all items must originate from that single
|
||||
item.pairedItem = {
|
||||
item: 0,
|
||||
};
|
||||
} else if (isSameNumberOfItems) {
|
||||
// The number of oncoming and outcoming items is identical so we can
|
||||
// make the reasonable assumption that each of the input items
|
||||
// is the origin of the corresponding output items
|
||||
item.pairedItem = {
|
||||
item: index,
|
||||
};
|
||||
} else {
|
||||
// In all other cases autofixing is not possible
|
||||
break checkOutputData;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeSuccessData === undefined) {
|
||||
// Node did not get executed
|
||||
nodeSuccessData = null;
|
||||
} else {
|
||||
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
|
||||
}
|
||||
nodeSuccessData = this.assignPairedItems(nodeSuccessData, executionData);
|
||||
|
||||
if (nodeSuccessData === null || nodeSuccessData[0][0] === undefined) {
|
||||
if (executionData.node.alwaysOutputData === true) {
|
||||
@@ -2175,6 +2046,27 @@ export class WorkflowExecute {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the final state of a workflow execution and prepares the execution result.
|
||||
* This method handles different completion scenarios: success, waiting, error, and canceled states.
|
||||
* It also manages cleanup tasks like static data updates and trigger deactivation.
|
||||
*
|
||||
* @param startedAt - The timestamp when the workflow execution started
|
||||
* @param workflow - The workflow being executed
|
||||
* @param executionError - Optional error that occurred during execution
|
||||
* @param closeFunction - Optional promise that handles cleanup of triggers/webhooks
|
||||
*
|
||||
* @returns A promise that resolves to the complete workflow execution data (IRun)
|
||||
*
|
||||
* @remarks
|
||||
* The function performs these tasks in order:
|
||||
* 1. Generates full execution data
|
||||
* 2. Sets appropriate status based on execution outcome
|
||||
* 3. Handles any static data changes
|
||||
* 4. Moves node metadata to its final location
|
||||
* 5. Executes the 'workflowExecuteAfter' hook
|
||||
* 6. Performs cleanup via closeFunction if provided
|
||||
*/
|
||||
async processSuccessExecution(
|
||||
startedAt: Date,
|
||||
workflow: Workflow,
|
||||
@@ -2240,15 +2132,187 @@ export class WorkflowExecute {
|
||||
}
|
||||
|
||||
getFullRunData(startedAt: Date): IRun {
|
||||
const fullRunData: IRun = {
|
||||
return {
|
||||
data: this.runExecutionData,
|
||||
mode: this.mode,
|
||||
startedAt,
|
||||
stoppedAt: new Date(),
|
||||
status: this.status,
|
||||
};
|
||||
}
|
||||
|
||||
return fullRunData;
|
||||
handleNodeErrorOutput(
|
||||
workflow: Workflow,
|
||||
executionData: IExecuteData,
|
||||
nodeSuccessData: INodeExecutionData[][],
|
||||
runIndex: number,
|
||||
): void {
|
||||
const nodeType = workflow.nodeTypes.getByNameAndVersion(
|
||||
executionData.node.type,
|
||||
executionData.node.typeVersion,
|
||||
);
|
||||
const outputs = NodeHelpers.getNodeOutputs(workflow, executionData.node, nodeType.description);
|
||||
const outputTypes = NodeHelpers.getConnectionTypes(outputs);
|
||||
const mainOutputTypes = outputTypes.filter((output) => output === NodeConnectionType.Main);
|
||||
|
||||
const errorItems: INodeExecutionData[] = [];
|
||||
const closeFunctions: CloseFunction[] = [];
|
||||
// Create a WorkflowDataProxy instance that we can get the data of the
|
||||
// item which did error
|
||||
const executeFunctions = new ExecuteContext(
|
||||
workflow,
|
||||
executionData.node,
|
||||
this.additionalData,
|
||||
this.mode,
|
||||
this.runExecutionData,
|
||||
runIndex,
|
||||
[],
|
||||
executionData.data,
|
||||
executionData,
|
||||
closeFunctions,
|
||||
this.abortController.signal,
|
||||
);
|
||||
|
||||
const dataProxy = executeFunctions.getWorkflowDataProxy(0);
|
||||
|
||||
// Loop over all outputs except the error output as it would not contain data by default
|
||||
for (let outputIndex = 0; outputIndex < mainOutputTypes.length - 1; outputIndex++) {
|
||||
const successItems: INodeExecutionData[] = [];
|
||||
const items = nodeSuccessData[outputIndex]?.length ? nodeSuccessData[outputIndex] : [];
|
||||
|
||||
while (items.length) {
|
||||
const item = items.shift();
|
||||
if (item === undefined) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let errorData: GenericValue | undefined;
|
||||
if (item.error) {
|
||||
errorData = item.error;
|
||||
item.error = undefined;
|
||||
} else if (item.json.error && Object.keys(item.json).length === 1) {
|
||||
errorData = item.json.error;
|
||||
} else if (item.json.error && item.json.message && Object.keys(item.json).length === 2) {
|
||||
errorData = item.json.error;
|
||||
}
|
||||
|
||||
if (errorData) {
|
||||
const pairedItemData =
|
||||
item.pairedItem && typeof item.pairedItem === 'object'
|
||||
? Array.isArray(item.pairedItem)
|
||||
? item.pairedItem[0]
|
||||
: item.pairedItem
|
||||
: undefined;
|
||||
|
||||
if (executionData.source === null || pairedItemData === undefined) {
|
||||
// Source data is missing for some reason so we can not figure out the item
|
||||
errorItems.push(item);
|
||||
} else {
|
||||
const pairedItemInputIndex = pairedItemData.input || 0;
|
||||
|
||||
const sourceData = executionData.source[NodeConnectionType.Main][pairedItemInputIndex];
|
||||
|
||||
const constPairedItem = dataProxy.$getPairedItem(
|
||||
sourceData!.previousNode,
|
||||
sourceData,
|
||||
pairedItemData,
|
||||
);
|
||||
|
||||
if (constPairedItem === null) {
|
||||
errorItems.push(item);
|
||||
} else {
|
||||
errorItems.push({
|
||||
...item,
|
||||
json: {
|
||||
...constPairedItem.json,
|
||||
...item.json,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
successItems.push(item);
|
||||
}
|
||||
}
|
||||
|
||||
nodeSuccessData[outputIndex] = successItems;
|
||||
}
|
||||
|
||||
nodeSuccessData[mainOutputTypes.length - 1] = errorItems;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assigns pairedItem information to node output items by matching them with input items.
|
||||
* PairedItem data is used to track which output items were derived from which input items.
|
||||
*
|
||||
* @param nodeSuccessData - The output data from a node execution
|
||||
* @param executionData - The execution data containing input information
|
||||
*
|
||||
* @returns The node output data with pairedItem information assigned where possible
|
||||
*
|
||||
* @remarks
|
||||
* Auto-assignment of pairedItem happens in two scenarios:
|
||||
* 1. Single input/output: When node has exactly one input item and produces output(s),
|
||||
* all outputs are marked as derived from that single input (item: 0)
|
||||
* 2. Matching items count: When number of input and output items match exactly,
|
||||
* each output item is paired with the input item at the same index
|
||||
*
|
||||
* In all other cases, if pairedItem is missing, it remains undefined as automatic
|
||||
* assignment cannot be done reliably.
|
||||
*/
|
||||
assignPairedItems(
|
||||
nodeSuccessData: INodeExecutionData[][] | null | undefined,
|
||||
executionData: IExecuteData,
|
||||
) {
|
||||
if (nodeSuccessData?.length) {
|
||||
// Check if the output data contains pairedItem data and if not try
|
||||
// to automatically fix it
|
||||
|
||||
const isSingleInputAndOutput =
|
||||
executionData.data.main.length === 1 && executionData.data.main[0]?.length === 1;
|
||||
|
||||
const isSameNumberOfItems =
|
||||
nodeSuccessData.length === 1 &&
|
||||
executionData.data.main.length === 1 &&
|
||||
executionData.data.main[0]?.length === nodeSuccessData[0].length;
|
||||
|
||||
checkOutputData: for (const outputData of nodeSuccessData) {
|
||||
if (outputData === null) {
|
||||
continue;
|
||||
}
|
||||
for (const [index, item] of outputData.entries()) {
|
||||
if (item.pairedItem === undefined) {
|
||||
// The pairedItem data is missing, so check if it can get automatically fixed
|
||||
if (isSingleInputAndOutput) {
|
||||
// The node has one input and one incoming item, so we know
|
||||
// that all items must originate from that single
|
||||
item.pairedItem = {
|
||||
item: 0,
|
||||
};
|
||||
} else if (isSameNumberOfItems) {
|
||||
// The number of oncoming and outcoming items is identical so we can
|
||||
// make the reasonable assumption that each of the input items
|
||||
// is the origin of the corresponding output items
|
||||
item.pairedItem = {
|
||||
item: index,
|
||||
};
|
||||
} else {
|
||||
// In all other cases autofixing is not possible
|
||||
break checkOutputData;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeSuccessData === undefined) {
|
||||
// Node did not get executed
|
||||
nodeSuccessData = null;
|
||||
} else {
|
||||
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
|
||||
}
|
||||
|
||||
return nodeSuccessData;
|
||||
}
|
||||
|
||||
private get isCancelled() {
|
||||
|
||||
Reference in New Issue
Block a user