fix(core): Make node execution order configurable, and backward-compatible (#6507)

* fix(core): Make node execution order configurable, and backward-compatible

*  Also add new Merge-Node behaviour

*  Fix typo

* Fix lint issue

* update labels

* rename legacy to v0

* remove the unnecessary log

* default all new workflows to use v1 execution-order

* remove the controller changes

* clone default settings to avoid it getting modified

---------

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2023-07-05 18:47:34 +02:00
committed by GitHub
parent f0dfc3cf4e
commit d97edbcffa
18 changed files with 2156 additions and 1042 deletions

View File

@@ -143,24 +143,8 @@ export class WorkflowExecute {
return this.processRunExecutionData(workflow);
}
forceInputNodeExecution(workflow: Workflow, node: INode): boolean {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
// Check if the incoming nodes should be forced to execute
let forceInputNodeExecution = nodeType.description.forceInputNodeExecution;
if (forceInputNodeExecution !== undefined) {
if (typeof forceInputNodeExecution === 'string') {
forceInputNodeExecution = !!workflow.expression.getSimpleParameterValue(
node,
forceInputNodeExecution,
this.mode,
this.additionalData.timezone,
{ $version: node.typeVersion },
);
}
return forceInputNodeExecution;
}
return false;
forceInputNodeExecution(workflow: Workflow): boolean {
return workflow.settings.executionOrder !== 'v1';
}
/**
@@ -379,6 +363,7 @@ export class WorkflowExecute {
runIndex: number,
): void {
let stillDataMissing = false;
const enqueueFn = workflow.settings.executionOrder === 'v1' ? 'unshift' : 'push';
let waitingNodeIndex: number | undefined;
// Check if node has multiple inputs as then we have to wait for all input data
@@ -510,7 +495,7 @@ export class WorkflowExecute {
];
}
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionStackItem);
this.runExecutionData.executionData!.nodeExecutionStack[enqueueFn](executionStackItem);
// Remove the data from waiting
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][
@@ -554,8 +539,7 @@ export class WorkflowExecute {
// are already on the list to be processed.
// If that is not the case add it.
const node = workflow.getNode(connectionData.node);
const forceInputNodeExecution = this.forceInputNodeExecution(workflow, node!);
const forceInputNodeExecution = this.forceInputNodeExecution(workflow);
for (
let inputIndex = 0;
@@ -680,7 +664,7 @@ export class WorkflowExecute {
if (addEmptyItem) {
// Add only node if it does not have any inputs because else it will
// be added by its input node later anyway.
this.runExecutionData.executionData!.nodeExecutionStack.unshift({
this.runExecutionData.executionData!.nodeExecutionStack[enqueueFn]({
node: workflow.getNode(nodeToAdd) as INode,
data: {
main: [
@@ -744,7 +728,7 @@ export class WorkflowExecute {
};
} else {
// All data is there so add it directly to stack
this.runExecutionData.executionData!.nodeExecutionStack.unshift({
this.runExecutionData.executionData!.nodeExecutionStack[enqueueFn]({
node: workflow.nodes[connectionData.node],
data: {
main: connectionDataArray,
@@ -774,6 +758,7 @@ export class WorkflowExecute {
Logger.verbose('Workflow execution started', { workflowId: workflow.id });
const startedAt = new Date();
const forceInputNodeExecution = this.forceInputNodeExecution(workflow);
this.status = 'running';
@@ -937,8 +922,6 @@ export class WorkflowExecute {
continue;
}
const node = workflow.getNode(executionNode.name);
// Check if all the data which is needed to run the node is available
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
// Check if the node has incoming connections
@@ -971,7 +954,7 @@ export class WorkflowExecute {
continue executionLoop;
}
if (this.forceInputNodeExecution(workflow, node!)) {
if (forceInputNodeExecution) {
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
@@ -1295,53 +1278,60 @@ export class WorkflowExecute {
);
}
const connectionDestinationNode = workflow.getNode(connectionData.node);
const forceInputNodeExecution = this.forceInputNodeExecution(
workflow,
connectionDestinationNode!,
);
if (
nodeSuccessData![outputIndex] &&
(nodeSuccessData![outputIndex].length !== 0 ||
(connectionData.index > 0 && forceInputNodeExecution))
) {
// Add the node only if it did execute or if connected to second "optional" input
const nodeToAdd = workflow.getNode(connectionData.node);
nodesToAdd.push({
position: nodeToAdd?.position || [0, 0],
connection: connectionData,
outputIndex: parseInt(outputIndex, 10),
});
if (workflow.settings.executionOrder === 'v1') {
const nodeToAdd = workflow.getNode(connectionData.node);
nodesToAdd.push({
position: nodeToAdd?.position || [0, 0],
connection: connectionData,
outputIndex: parseInt(outputIndex, 10),
});
} else {
this.addNodeToBeExecuted(
workflow,
connectionData,
parseInt(outputIndex, 10),
executionNode.name,
nodeSuccessData!,
runIndex,
);
}
}
}
}
// Always execute the node that is more to the top-left first
nodesToAdd.sort((a, b) => {
if (a.position[1] < b.position[1]) {
return 1;
}
if (a.position[1] > b.position[1]) {
return -1;
}
if (workflow.settings.executionOrder === 'v1') {
// Always execute the node that is more to the top-left first
nodesToAdd.sort((a, b) => {
if (a.position[1] < b.position[1]) {
return 1;
}
if (a.position[1] > b.position[1]) {
return -1;
}
if (a.position[0] > b.position[0]) {
return -1;
if (a.position[0] > b.position[0]) {
return -1;
}
return 0;
});
for (const nodeData of nodesToAdd) {
this.addNodeToBeExecuted(
workflow,
nodeData.connection,
nodeData.outputIndex,
executionNode.name,
nodeSuccessData!,
runIndex,
);
}
return 0;
});
for (const nodeData of nodesToAdd) {
this.addNodeToBeExecuted(
workflow,
nodeData.connection,
nodeData.outputIndex,
executionNode.name,
nodeSuccessData!,
runIndex,
);
}
}
}
@@ -1382,7 +1372,10 @@ export class WorkflowExecute {
);
// Check if the node is only allowed execute if all inputs received data
let requiredInputs = nodeType.description.requiredInputs;
let requiredInputs =
workflow.settings.executionOrder === 'v1'
? nodeType.description.requiredInputs
: undefined;
if (requiredInputs !== undefined) {
if (typeof requiredInputs === 'string') {
requiredInputs = workflow.expression.getSimpleParameterValue(

View File

@@ -4,15 +4,15 @@ import { WorkflowExecute } from '@/WorkflowExecute';
import * as Helpers from './helpers';
import { initLogger } from './helpers/utils';
import { predefinedWorkflowExecuteTests } from './helpers/constants';
import { legacyWorkflowExecuteTests, v1WorkflowExecuteTests } from './helpers/constants';
describe('WorkflowExecute', () => {
beforeAll(() => {
initLogger();
});
describe('run', () => {
const tests: WorkflowTestData[] = predefinedWorkflowExecuteTests;
describe('v0 execution order', () => {
const tests: WorkflowTestData[] = legacyWorkflowExecuteTests;
const executionMode = 'manual';
const nodeTypes = Helpers.NodeTypes();
@@ -25,6 +25,9 @@ describe('WorkflowExecute', () => {
connections: testData.input.workflowData.connections,
active: false,
nodeTypes,
settings: {
executionOrder: 'v0',
},
});
const waitPromise = await createDeferredPromise<IRun>();
@@ -71,6 +74,70 @@ describe('WorkflowExecute', () => {
}
});
describe('v1 execution order', () => {
const tests: WorkflowTestData[] = v1WorkflowExecuteTests;
const executionMode = 'manual';
const nodeTypes = Helpers.NodeTypes();
for (const testData of tests) {
test(testData.description, async () => {
const workflowInstance = new Workflow({
id: 'test',
nodes: testData.input.workflowData.nodes,
connections: testData.input.workflowData.connections,
active: false,
nodeTypes,
settings: {
executionOrder: 'v1',
},
});
const waitPromise = await createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(
waitPromise,
nodeExecutionOrder,
);
const workflowExecute = new WorkflowExecute(additionalData, executionMode);
const executionData = await workflowExecute.run(workflowInstance);
const result = await waitPromise.promise();
// Check if the data from WorkflowExecute is identical to data received
// by the webhooks
expect(executionData).toEqual(result);
// Check if the output data of the nodes is correct
for (const nodeName of Object.keys(testData.output.nodeData)) {
if (result.data.resultData.runData[nodeName] === undefined) {
throw new Error(`Data for node "${nodeName}" is missing!`);
}
const resultData = result.data.resultData.runData[nodeName].map((nodeData) => {
if (nodeData.data === undefined) {
return null;
}
return nodeData.data.main[0]!.map((entry) => entry.json);
});
// expect(resultData).toEqual(testData.output.nodeData[nodeName]);
expect(resultData).toEqual(testData.output.nodeData[nodeName]);
}
// Check if the nodes did execute in the correct order
expect(nodeExecutionOrder).toEqual(testData.output.nodeExecutionOrder);
// Check if other data has correct value
expect(result.finished).toEqual(true);
expect(result.data.executionData!.contextData).toEqual({});
expect(result.data.executionData!.nodeExecutionStack).toEqual([]);
});
}
});
//run tests on json files from specified directory, default 'workflows'
//workflows must have pinned data that would be used to test output after execution
describe('run test workflows', () => {
@@ -87,6 +154,7 @@ describe('WorkflowExecute', () => {
connections: testData.input.workflowData.connections,
active: false,
nodeTypes,
settings: testData.input.workflowData.settings,
});
const waitPromise = await createDeferredPromise<IRun>();

File diff suppressed because it is too large Load Diff