feat(core)!: Change data processing for multi-input-nodes (#4238)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Jan Oberhauser
2023-06-23 12:07:52 +02:00
committed by GitHub
parent 9194d8bb0e
commit b8458a53f6
13 changed files with 7032 additions and 67 deletions

View File

@@ -143,6 +143,26 @@ export class WorkflowExecute {
return this.processRunExecutionData(workflow);
}
forceInputNodeExecution(workflow: Workflow, node: INode): boolean {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
// Check if the incoming nodes should be forced to execute
let forceInputNodeExecution = nodeType.description.forceInputNodeExecution;
if (forceInputNodeExecution !== undefined) {
if (typeof forceInputNodeExecution === 'string') {
forceInputNodeExecution = !!workflow.expression.getSimpleParameterValue(
node,
forceInputNodeExecution,
this.mode,
this.additionalData.timezone,
{ $version: node.typeVersion },
);
}
return forceInputNodeExecution;
}
return false;
}
/**
* Executes the given workflow but only
*
@@ -329,6 +349,27 @@ export class WorkflowExecute {
return true;
}
prepareWaitingToExecution(nodeName: string, numberOfConnections: number, runIndex: number) {
if (!this.runExecutionData.executionData!.waitingExecutionSource) {
this.runExecutionData.executionData!.waitingExecutionSource = {};
}
this.runExecutionData.executionData!.waitingExecution[nodeName][runIndex] = {
main: [],
};
this.runExecutionData.executionData!.waitingExecutionSource[nodeName][runIndex] = {
main: [],
};
for (let i = 0; i < numberOfConnections; i++) {
this.runExecutionData.executionData!.waitingExecution[nodeName][runIndex].main.push(null);
this.runExecutionData.executionData!.waitingExecutionSource[nodeName][runIndex].main.push(
null,
);
}
}
addNodeToBeExecuted(
workflow: Workflow,
connectionData: IConnection,
@@ -338,6 +379,7 @@ export class WorkflowExecute {
runIndex: number,
): void {
let stillDataMissing = false;
let waitingNodeIndex: number | undefined;
// Check if node has multiple inputs as then we have to wait for all input data
// to be present before we can add it to the node-execution-stack
@@ -358,47 +400,64 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node] = {};
nodeWasWaiting = false;
}
if (
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] ===
undefined
) {
// Node does not have data for runIndex yet so create also empty one and init it
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
main: [],
};
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][runIndex] =
{
main: [],
};
for (
let i = 0;
i < workflow.connectionsByDestinationNode[connectionData.node].main.length;
i++
) {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][
runIndex
].main.push(null);
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
].main.push(null);
// Figure out if the node is already waiting with partial data to which to add the
// data to or if a new entry has to get created
let createNewWaitingEntry = true;
if (
Object.keys(this.runExecutionData.executionData!.waitingExecution[connectionData.node])
.length > 0
) {
// Check if there is already data for the input on all of the waiting nodes
for (const index of Object.keys(
this.runExecutionData.executionData!.waitingExecution[connectionData.node],
)) {
if (
!this.runExecutionData.executionData!.waitingExecution[connectionData.node][
parseInt(index)
].main[connectionData.index]
) {
// Data for the input is missing so we can add it to the existing entry
createNewWaitingEntry = false;
waitingNodeIndex = parseInt(index);
break;
}
}
}
if (waitingNodeIndex === undefined) {
waitingNodeIndex = Object.values(
this.runExecutionData.executionData!.waitingExecution[connectionData.node],
).length;
}
if (createNewWaitingEntry) {
// There is currently no node waiting that does not already have data for
// the given input, so create a new entry
this.prepareWaitingToExecution(
connectionData.node,
workflow.connectionsByDestinationNode[connectionData.node].main.length,
waitingNodeIndex,
);
}
// Add the new data
if (nodeSuccessData === null) {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[
connectionData.index
] = null;
this.runExecutionData.executionData!.waitingExecution[connectionData.node][
waitingNodeIndex
].main[connectionData.index] = null;
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
waitingNodeIndex
].main[connectionData.index] = null;
} else {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[
connectionData.index
] = nodeSuccessData[outputIndex];
this.runExecutionData.executionData!.waitingExecution[connectionData.node][
waitingNodeIndex
].main[connectionData.index] = nodeSuccessData[outputIndex];
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
waitingNodeIndex
].main[connectionData.index] = {
previousNode: parentNodeName,
previousNodeOutput: outputIndex || undefined,
@@ -412,14 +471,14 @@ export class WorkflowExecute {
for (
let i = 0;
i <
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main
.length;
this.runExecutionData.executionData!.waitingExecution[connectionData.node][waitingNodeIndex]
.main.length;
i++
) {
thisExecutionData =
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[
i
];
this.runExecutionData.executionData!.waitingExecution[connectionData.node][
waitingNodeIndex
].main[i];
if (thisExecutionData === null) {
allDataFound = false;
break;
@@ -433,11 +492,11 @@ export class WorkflowExecute {
const executionStackItem = {
node: workflow.nodes[connectionData.node],
data: this.runExecutionData.executionData!.waitingExecution[connectionData.node][
runIndex
waitingNodeIndex
],
source:
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
waitingNodeIndex
],
} as IExecuteData;
@@ -447,16 +506,18 @@ export class WorkflowExecute {
) {
executionStackItem.source =
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
waitingNodeIndex
];
}
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionStackItem);
// Remove the data from waiting
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][
waitingNodeIndex
];
delete this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
waitingNodeIndex
];
if (
@@ -492,6 +553,10 @@ export class WorkflowExecute {
// checked. So we have to go through all the inputs and check if they
// are already on the list to be processed.
// If that is not the case add it.
const node = workflow.getNode(connectionData.node);
const forceInputNodeExecution = this.forceInputNodeExecution(workflow, node!);
for (
let inputIndex = 0;
inputIndex < workflow.connectionsByDestinationNode[connectionData.node].main.length;
@@ -540,6 +605,12 @@ export class WorkflowExecute {
continue;
}
if (!forceInputNodeExecution) {
// Do not automatically follow all incoming nodes and force them
// to execute
continue;
}
// Check if any of the parent nodes does not have any inputs. That
// would mean that it has to get added to the list of nodes to process.
const parentNodes = workflow.getParentNodes(inputData.node, 'main', -1);
@@ -650,14 +721,26 @@ export class WorkflowExecute {
}
if (stillDataMissing) {
waitingNodeIndex = waitingNodeIndex!;
// Additional data is needed to run node so add it to waiting
if (
!this.runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)
) {
this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
}
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
main: connectionDataArray,
this.prepareWaitingToExecution(
connectionData.node,
workflow.connectionsByDestinationNode[connectionData.node].main.length,
waitingNodeIndex,
);
this.runExecutionData.executionData!.waitingExecution[connectionData.node][waitingNodeIndex] =
{
main: connectionDataArray,
};
this.runExecutionData.executionData!.waitingExecutionSource![connectionData.node][
waitingNodeIndex
].main[connectionData.index] = {
previousNode: parentNodeName,
previousNodeOutput: outputIndex || undefined,
previousNodeRun: runIndex || undefined,
};
} else {
// All data is there so add it directly to stack
@@ -854,6 +937,8 @@ export class WorkflowExecute {
continue;
}
const node = workflow.getNode(executionNode.name);
// Check if all the data which is needed to run the node is available
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
// Check if the node has incoming connections
@@ -886,17 +971,20 @@ export class WorkflowExecute {
continue executionLoop;
}
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (
executionData.data.main.length < connectionIndex ||
executionData.data.main[connectionIndex] === null
) {
// Does not have the data of the connections so add back to stack
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
if (this.forceInputNodeExecution(workflow, node!)) {
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (
executionData.data.main.length < connectionIndex ||
executionData.data.main[connectionIndex] === null
) {
// Does not have the data of the connections so add back to stack
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
}
}
}
@@ -1207,9 +1295,16 @@ export class WorkflowExecute {
);
}
const connectionDestinationNode = workflow.getNode(connectionData.node);
const forceInputNodeExecution = this.forceInputNodeExecution(
workflow,
connectionDestinationNode!,
);
if (
nodeSuccessData![outputIndex] &&
(nodeSuccessData![outputIndex].length !== 0 || connectionData.index > 0)
(nodeSuccessData![outputIndex].length !== 0 ||
(connectionData.index > 0 && forceInputNodeExecution))
) {
// Add the node only if it did execute or if connected to second "optional" input
const nodeToAdd = workflow.getNode(connectionData.node);
@@ -1260,6 +1355,163 @@ export class WorkflowExecute {
taskData,
this.runExecutionData,
]);
let waitingNodes: string[] = Object.keys(
this.runExecutionData.executionData!.waitingExecution,
);
if (
this.runExecutionData.executionData!.nodeExecutionStack.length === 0 &&
waitingNodes.length
) {
// There are no more nodes in the execution stack. Check if there are
// waiting nodes that do not require data on all inputs and execute them,
// one by one.
// TODO: Should this also care about workflow position (top-left first?)
for (let i = 0; i < waitingNodes.length; i++) {
const nodeName = waitingNodes[i];
const checkNode = workflow.getNode(nodeName);
if (!checkNode) {
continue;
}
const nodeType = workflow.nodeTypes.getByNameAndVersion(
checkNode.type,
checkNode.typeVersion,
);
// Check if the node is only allowed execute if all inputs received data
let requiredInputs = nodeType.description.requiredInputs;
if (requiredInputs !== undefined) {
if (typeof requiredInputs === 'string') {
requiredInputs = workflow.expression.getSimpleParameterValue(
checkNode,
requiredInputs,
this.mode,
this.additionalData.timezone,
{ $version: checkNode.typeVersion },
undefined,
[],
) as number[];
}
if (
(requiredInputs !== undefined &&
Array.isArray(requiredInputs) &&
requiredInputs.length === nodeType.description.inputs.length) ||
requiredInputs === nodeType.description.inputs.length
) {
// All inputs are required, but not all have data so do not continue
continue;
}
}
const parentNodes = workflow.getParentNodes(nodeName);
// Check if input nodes (of same run) got already executed
// eslint-disable-next-line @typescript-eslint/no-loop-func
const parentIsWaiting = parentNodes.some((value) => waitingNodes.includes(value));
if (parentIsWaiting) {
// Execute node later as one of its dependencies is still outstanding
continue;
}
const runIndexes = Object.keys(
this.runExecutionData.executionData!.waitingExecution[nodeName],
).sort();
// The run-index of the earliest outstanding one
const firstRunIndex = parseInt(runIndexes[0]);
// Find all the inputs which received any kind of data, even if it was an empty
// array as this shows that the parent nodes executed but they did not have any
// data to pass on.
const inputsWithData = this.runExecutionData
.executionData!.waitingExecution[nodeName][firstRunIndex].main.map((data, index) =>
data === null ? null : index,
)
.filter((data) => data !== null);
if (requiredInputs !== undefined) {
// Certain inputs are required that the node can execute
if (Array.isArray(requiredInputs)) {
// Specific inputs are required (array of input indexes)
let inputDataMissing = false;
for (const requiredInput of requiredInputs) {
if (!inputsWithData.includes(requiredInput)) {
inputDataMissing = true;
break;
}
}
if (inputDataMissing) {
continue;
}
} else {
// A certain amout of inputs are required (amount of inputs)
if (inputsWithData.length < requiredInputs) {
continue;
}
}
}
const taskDataMain = this.runExecutionData.executionData!.waitingExecution[nodeName][
firstRunIndex
].main.map((data) => {
// For the inputs for which never any data got received set it to an empty array
return data === null ? [] : data;
});
if (taskDataMain.filter((data) => data.length).length !== 0) {
// Add the node to be executed
// Make sure that each input at least receives an empty array
if (taskDataMain.length < nodeType.description.inputs.length) {
for (; taskDataMain.length < nodeType.description.inputs.length; ) {
taskDataMain.push([]);
}
}
this.runExecutionData.executionData!.nodeExecutionStack.push({
node: workflow.nodes[nodeName],
data: {
main: taskDataMain,
},
source:
this.runExecutionData.executionData!.waitingExecutionSource![nodeName][
firstRunIndex
],
});
}
// Remove the node from waiting
delete this.runExecutionData.executionData!.waitingExecution[nodeName][firstRunIndex];
delete this.runExecutionData.executionData!.waitingExecutionSource![nodeName][
firstRunIndex
];
if (
Object.keys(this.runExecutionData.executionData!.waitingExecution[nodeName])
.length === 0
) {
// No more data left for the node so also delete that one
delete this.runExecutionData.executionData!.waitingExecution[nodeName];
delete this.runExecutionData.executionData!.waitingExecutionSource![nodeName];
}
if (taskDataMain.filter((data) => data.length).length !== 0) {
// Node to execute got found and added to stop
break;
} else {
// Node to add did not get found, rather an empty one removed so continue with search
waitingNodes = Object.keys(this.runExecutionData.executionData!.waitingExecution);
// Set counter to start again from the beginning. Set it to -1 as it auto increments
// after run. So only like that will we end up again ot 0.
i = -1;
}
}
}
}
return;