feat(core): Add support for pairedItem (beta) (#3012)

*  Add pairedItem support

* 👕 Fix lint issue

* 🐛 Fix resolution in frontend

* 🐛 Fix resolution issue

* 🐛 Fix resolution in frontend

* 🐛 Fix another resolution issue in frontend

*  Try to automatically add pairedItem data if possible

*  Cleanup

*  Display expression errors in editor UI

* 🐛 Fix issue that it did not display errors in production

* 🐛 Fix auto-fix of missing pairedItem data

* 🐛 Fix frontend resolution for not executed nodes

*  Fail execution on pairedItem resolve issue and display information
about itemIndex and runIndex

*  Allow that pairedItem is only set to number if runIndex is 0

*  Improve Expression Errors

*  Remove no longer needed code

*  Make errors more helpful

*  Add additional errors

* 👕 Fix lint issue

*  Add pairedItem support to core nodes

*  Improve support in Merge-Node

*  Fix issue with not correctly converted incoming pairedItem data

* 🐛 Fix frontend resolve issue

* 🐛 Fix frontend parameter name display issue

*  Improve errors

* 👕 Fix lint issue

*  Improve errors

*  Make it possible to display parameter name in error messages

*  Improve error messages

*  Fix error message

*  Improve error messages

*  Add another error message

*  Simplify
This commit is contained in:
Jan Oberhauser
2022-06-03 17:25:07 +02:00
committed by GitHub
parent 450a9aafea
commit bdb84130d6
52 changed files with 1317 additions and 152 deletions

View File

@@ -22,9 +22,12 @@ import {
IRun,
IRunData,
IRunExecutionData,
ISourceData,
ITaskData,
ITaskDataConnections,
ITaskDataConnectionsSource,
IWaitingForExecution,
IWaitingForExecutionSource,
IWorkflowExecuteAdditionalData,
LoggerProxy as Logger,
NodeApiError,
@@ -61,6 +64,7 @@ export class WorkflowExecute {
contextData: {},
nodeExecutionStack: [],
waitingExecution: {},
waitingExecutionSource: {},
},
};
}
@@ -106,6 +110,7 @@ export class WorkflowExecute {
],
],
},
source: null,
},
];
@@ -121,6 +126,7 @@ export class WorkflowExecute {
contextData: {},
nodeExecutionStack,
waitingExecution: {},
waitingExecutionSource: {},
},
};
@@ -157,10 +163,12 @@ export class WorkflowExecute {
// the data from runData
const nodeExecutionStack: IExecuteData[] = [];
const waitingExecution: IWaitingForExecution = {};
const waitingExecutionSource: IWaitingForExecutionSource = {};
for (const startNode of startNodes) {
incomingNodeConnections = workflow.connectionsByDestinationNode[startNode];
const incomingData: INodeExecutionData[][] = [];
let incomingSourceData: ITaskDataConnectionsSource | null = null;
if (incomingNodeConnections === undefined) {
// If it has no incoming data add the default empty data
@@ -171,6 +179,7 @@ export class WorkflowExecute {
]);
} else {
// Get the data of the incoming connections
incomingSourceData = { main: [] };
for (const connections of incomingNodeConnections.main) {
for (let inputIndex = 0; inputIndex < connections.length; inputIndex++) {
connection = connections[inputIndex];
@@ -178,6 +187,9 @@ export class WorkflowExecute {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
runData[connection.node][runIndex].data![connection.type][connection.index]!,
);
incomingSourceData.main.push({
previousNode: connection.node,
});
}
}
}
@@ -187,6 +199,7 @@ export class WorkflowExecute {
data: {
main: incomingData,
},
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
@@ -201,12 +214,15 @@ export class WorkflowExecute {
if (waitingExecution[destinationNode] === undefined) {
waitingExecution[destinationNode] = {};
waitingExecutionSource[destinationNode] = {};
}
if (waitingExecution[destinationNode][runIndex] === undefined) {
waitingExecution[destinationNode][runIndex] = {};
waitingExecutionSource[destinationNode][runIndex] = {};
}
if (waitingExecution[destinationNode][runIndex][connection.type] === undefined) {
waitingExecution[destinationNode][runIndex][connection.type] = [];
waitingExecutionSource[destinationNode][runIndex][connection.type] = [];
}
if (runData[connection.node] !== undefined) {
@@ -215,8 +231,14 @@ export class WorkflowExecute {
waitingExecution[destinationNode][runIndex][connection.type].push(
runData[connection.node][runIndex].data![connection.type][connection.index],
);
waitingExecutionSource[destinationNode][runIndex][connection.type].push({
previousNode: connection.node,
previousNodeOutput: connection.index || undefined,
previousNodeRun: runIndex || undefined,
} as ISourceData);
} else {
waitingExecution[destinationNode][runIndex][connection.type].push(null);
waitingExecutionSource[destinationNode][runIndex][connection.type].push(null);
}
}
}
@@ -241,6 +263,7 @@ export class WorkflowExecute {
contextData: {},
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
},
};
@@ -303,12 +326,17 @@ export class WorkflowExecute {
// Node has multiple inputs
let nodeWasWaiting = true;
if (this.runExecutionData.executionData!.waitingExecutionSource === null) {
this.runExecutionData.executionData!.waitingExecutionSource = {};
}
// Check if there is already data for the node
if (
this.runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined
) {
// Node does not have data yet so create a new empty one
this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node] = {};
nodeWasWaiting = false;
}
if (
@@ -319,6 +347,10 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
main: [],
};
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][runIndex] =
{
main: [],
};
for (
let i = 0;
i < workflow.connectionsByDestinationNode[connectionData.node].main.length;
@@ -327,6 +359,10 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][
runIndex
].main.push(null);
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
].main.push(null);
}
}
@@ -335,10 +371,20 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[
connectionData.index
] = null;
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
].main[connectionData.index] = null;
} else {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[
connectionData.index
] = nodeSuccessData[outputIndex];
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
].main[connectionData.index] = {
previousNode: parentNodeName,
previousNodeOutput: outputIndex || undefined,
previousNodeRun: runIndex || undefined,
};
}
// Check if all data exists now
@@ -364,15 +410,32 @@ export class WorkflowExecute {
if (allDataFound) {
// All data exists for node to be executed
// So add it to the execution stack
this.runExecutionData.executionData!.nodeExecutionStack.push({
const executionStackItem = {
node: workflow.nodes[connectionData.node],
data: this.runExecutionData.executionData!.waitingExecution[connectionData.node][
runIndex
],
});
source:
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
],
} as IExecuteData;
if (this.runExecutionData.executionData!.waitingExecutionSource !== null) {
executionStackItem.source =
this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
];
}
this.runExecutionData.executionData!.nodeExecutionStack.push(executionStackItem);
// Remove the data from waiting
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
delete this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node][
runIndex
];
if (
Object.keys(this.runExecutionData.executionData!.waitingExecution[connectionData.node])
@@ -380,6 +443,7 @@ export class WorkflowExecute {
) {
// No more data left for the node so also delete that one
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node];
delete this.runExecutionData.executionData!.waitingExecutionSource[connectionData.node];
}
return;
}
@@ -534,6 +598,15 @@ export class WorkflowExecute {
],
],
},
source: {
main: [
{
previousNode: parentNodeName,
previousNodeOutput: outputIndex || undefined,
previousNodeRun: runIndex || undefined,
},
],
},
});
}
}
@@ -571,6 +644,15 @@ export class WorkflowExecute {
data: {
main: connectionDataArray,
},
source: {
main: [
{
previousNode: parentNodeName,
previousNodeOutput: outputIndex || undefined,
previousNodeRun: runIndex || undefined,
},
],
},
});
}
}
@@ -660,6 +742,7 @@ export class WorkflowExecute {
data: {
main: executionData.data.main,
} as ITaskDataConnections,
source: [],
},
],
},
@@ -691,6 +774,29 @@ export class WorkflowExecute {
this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
executionNode = executionData.node;
// Update the pairedItem information on items
const newTaskDataConnections: ITaskDataConnections = {};
for (const inputName of Object.keys(executionData.data)) {
newTaskDataConnections[inputName] = executionData.data[inputName].map(
(input, inputIndex) => {
if (input === null) {
return input;
}
return input.map((item, itemIndex) => {
return {
...item,
pairedItem: {
item: itemIndex,
input: inputIndex || undefined,
},
};
});
},
);
}
executionData.data = newTaskDataConnections;
Logger.debug(`Start processing node "${executionNode.name}"`, {
node: executionNode.name,
workflowId: workflow.id,
@@ -767,9 +873,6 @@ export class WorkflowExecute {
}
}
// Clone input data that nodes can not mess up data of parallel nodes which receive the same data
// TODO: Should only clone if multiple nodes get the same data or when it gets returned to frontned
// is very slow so only do if needed
startTime = new Date().getTime();
let maxTries = 1;
@@ -813,8 +916,7 @@ export class WorkflowExecute {
workflowId: workflow.id,
});
const runNodeData = await workflow.runNode(
executionData.node,
executionData.data,
executionData,
this.runExecutionData,
runIndex,
this.additionalData,
@@ -834,6 +936,30 @@ export class WorkflowExecute {
workflowId: workflow.id,
});
// Check if the output data contains pairedItem data
checkOutputData: for (const outputData of nodeSuccessData as INodeExecutionData[][]) {
if (outputData === null) {
continue;
}
for (const item of outputData) {
if (!item.pairedItem) {
// The pairedItem is missing so check if it can get automatically fixed
if (
executionData.data.main.length !== 1 ||
executionData.data.main[0]?.length !== 1
) {
// Automatically fixing is only possible if there is only one
// input and one input item
break checkOutputData;
}
item.pairedItem = {
item: 0,
};
}
}
}
if (nodeSuccessData === undefined) {
// Node did not get executed
nodeSuccessData = null;
@@ -885,6 +1011,7 @@ export class WorkflowExecute {
taskData = {
startTime,
executionTime: new Date().getTime() - startTime,
source: executionData.source === null ? [] : executionData.source.main,
};
if (executionError !== undefined) {