Run workflows in own independent subprocess

This commit is contained in:
Jan Oberhauser
2019-08-08 20:38:25 +02:00
parent abb0a52b08
commit d59a043e3f
21 changed files with 926 additions and 369 deletions

View File

@@ -1,5 +1,6 @@
import {
IConnection,
IDataObject,
IExecuteData,
IExecutionError,
INode,
@@ -16,21 +17,30 @@ import {
Workflow,
} from 'n8n-workflow';
import {
ActiveExecutions,
NodeExecuteFunctions,
} from './';
export class WorkflowExecute {
runExecutionData: IRunExecutionData;
private additionalData: IWorkflowExecuteAdditionalData;
private mode: WorkflowExecuteMode;
private activeExecutions: ActiveExecutions.ActiveExecutions;
private executionId: string | null = null;
constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode) {
constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, runExecutionData?: IRunExecutionData) {
this.additionalData = additionalData;
this.activeExecutions = ActiveExecutions.getInstance();
this.mode = mode;
this.runExecutionData = runExecutionData || {
startData: {
},
resultData: {
runData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
waitingExecution: {},
},
};
}
@@ -44,7 +54,7 @@ export class WorkflowExecute {
* @returns {(Promise<string>)}
* @memberof WorkflowExecute
*/
async run(workflow: Workflow, startNode?: INode, destinationNode?: string): Promise<string> {
async run(workflow: Workflow, startNode?: INode, destinationNode?: string): Promise<IRun> {
// Get the nodes to start workflow execution from
startNode = startNode || workflow.getStartNode(destinationNode);
@@ -75,7 +85,7 @@ export class WorkflowExecute {
}
];
const runExecutionData: IRunExecutionData = {
this.runExecutionData = {
startData: {
destinationNode,
runNodeFilter,
@@ -90,7 +100,7 @@ export class WorkflowExecute {
},
};
return this.runExecutionData(workflow, runExecutionData);
return this.processRunExecutionData(workflow);
}
@@ -105,8 +115,7 @@ export class WorkflowExecute {
* @returns {(Promise<string>)}
* @memberof WorkflowExecute
*/
async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise<string> {
async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise<IRun> {
let incomingNodeConnections: INodeConnections | undefined;
let connection: IConnection;
@@ -185,8 +194,7 @@ export class WorkflowExecute {
runNodeFilter = workflow.getParentNodes(destinationNode);
runNodeFilter.push(destinationNode);
const runExecutionData: IRunExecutionData = {
this.runExecutionData = {
startData: {
destinationNode,
runNodeFilter,
@@ -201,7 +209,7 @@ export class WorkflowExecute {
},
};
return await this.runExecutionData(workflow, runExecutionData);
return await this.processRunExecutionData(workflow);
}
@@ -240,7 +248,7 @@ export class WorkflowExecute {
}
addNodeToBeExecuted(workflow: Workflow, runExecutionData: IRunExecutionData, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void {
addNodeToBeExecuted(workflow: Workflow, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void {
let stillDataMissing = false;
// Check if node has multiple inputs as then we have to wait for all input data
@@ -250,33 +258,33 @@ export class WorkflowExecute {
let nodeWasWaiting = true;
// Check if there is already data for the node
if (runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) {
if (this.runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) {
// Node does not have data yet so create a new empty one
runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
nodeWasWaiting = false;
}
if (runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) {
if (this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) {
// Node does not have data for runIndex yet so create also empty one and init it
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
main: []
};
for (let i = 0; i < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; i++) {
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null);
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null);
}
}
// Add the new data
if (nodeSuccessData === null) {
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null;
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null;
} else {
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex];
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex];
}
// Check if all data exists now
let thisExecutionData: INodeExecutionData[] | null;
let allDataFound = true;
for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) {
thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i];
for (let i = 0; i < this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) {
thisExecutionData = this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i];
if (thisExecutionData === null) {
allDataFound = false;
break;
@@ -286,17 +294,17 @@ export class WorkflowExecute {
if (allDataFound === true) {
// All data exists for node to be executed
// So add it to the execution stack
runExecutionData.executionData!.nodeExecutionStack.push({
this.runExecutionData.executionData!.nodeExecutionStack.push({
node: workflow.nodes[connectionData.node],
data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]
data: this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]
});
// Remove the data from waiting
delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex];
if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) {
if (Object.keys(this.runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) {
// No more data left for the node so also delete that one
delete runExecutionData.executionData!.waitingExecution[connectionData.node];
delete this.runExecutionData.executionData!.waitingExecution[connectionData.node];
}
return;
} else {
@@ -327,7 +335,7 @@ export class WorkflowExecute {
continue;
}
const executionStackNodes = runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name);
const executionStackNodes = this.runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name);
// Check if that node is also an output connection of the
// previously processed one
@@ -345,7 +353,7 @@ export class WorkflowExecute {
}
// Check if node got processed already
if (runExecutionData.resultData.runData[inputData.node] !== undefined) {
if (this.runExecutionData.resultData.runData[inputData.node] !== undefined) {
// Node got processed already so no need to add it
continue;
}
@@ -376,7 +384,7 @@ export class WorkflowExecute {
}
// Check if node got processed already
if (runExecutionData.resultData.runData[parentNode] !== undefined) {
if (this.runExecutionData.resultData.runData[parentNode] !== undefined) {
// Node got processed already so we can use the
// output data as input of this node
break;
@@ -393,7 +401,7 @@ export class WorkflowExecute {
if (workflow.connectionsByDestinationNode[nodeToAdd] === undefined) {
// Add only node if it does not have any inputs becuase else it will
// be added by its input node later anyway.
runExecutionData.executionData!.nodeExecutionStack.push(
this.runExecutionData.executionData!.nodeExecutionStack.push(
{
node: workflow.getNode(nodeToAdd) as INode,
data: {
@@ -428,15 +436,15 @@ export class WorkflowExecute {
if (stillDataMissing === true) {
// Additional data is needed to run node so add it to waiting
if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) {
runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
if (!this.runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) {
this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {};
}
runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = {
main: connectionDataArray
};
} else {
// All data is there so add it directly to stack
runExecutionData.executionData!.nodeExecutionStack.push({
this.runExecutionData.executionData!.nodeExecutionStack.push({
node: workflow.nodes[connectionData.node],
data: {
main: connectionDataArray
@@ -450,12 +458,11 @@ export class WorkflowExecute {
* Runs the given execution data.
*
* @param {Workflow} workflow
* @param {IRunExecutionData} runExecutionData
* @returns {Promise<string>}
* @memberof WorkflowExecute
*/
async runExecutionData(workflow: Workflow, runExecutionData: IRunExecutionData): Promise<string> {
const startedAt = new Date().getTime();
async processRunExecutionData(workflow: Workflow): Promise<IRun> {
const startedAt = new Date();
const workflowIssues = workflow.checkReadyForExecution();
if (workflowIssues !== null) {
@@ -471,39 +478,29 @@ export class WorkflowExecute {
let startTime: number;
let taskData: ITaskData;
if (runExecutionData.startData === undefined) {
runExecutionData.startData = {};
if (this.runExecutionData.startData === undefined) {
this.runExecutionData.startData = {};
}
this.executionId = this.activeExecutions.add(workflow, runExecutionData, this.mode);
this.executeHook('workflowExecuteBefore', [this.executionId]);
this.executeHook('workflowExecuteBefore', []);
let currentExecutionTry = '';
let lastExecutionTry = '';
// Wait for the next tick so that the executionId gets already returned.
// So it can directly be send to the editor-ui and is so aware of the
// executionId when the first push messages arrive.
process.nextTick(() => (async () => {
return (async () => {
executionLoop:
while (runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
if (this.activeExecutions.shouldBeStopped(this.executionId!) === true) {
// The execution should be stopped
break;
}
while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) {
nodeSuccessData = null;
executionError = undefined;
executionData = runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData;
executionNode = executionData.node;
this.executeHook('nodeExecuteBefore', [this.executionId, executionNode.name]);
this.executeHook('nodeExecuteBefore', [executionNode.name]);
// Get the index of the current run
runIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
runIndex = runExecutionData.resultData.runData[executionNode.name].length;
if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
runIndex = this.runExecutionData.resultData.runData[executionNode.name].length;
}
currentExecutionTry = `${executionNode.name}:${runIndex}`;
@@ -512,7 +509,7 @@ export class WorkflowExecute {
throw new Error('Did stop execution because execution seems to be in endless loop.');
}
if (runExecutionData.startData!.runNodeFilter !== undefined && runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) {
if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) {
// If filter is set and node is not on filter skip it, that avoids the problem that it executes
// leafs that are parallel to a selected destinationNode. Normally it would execute them because
// they have the same parent and it executes all child nodes.
@@ -539,7 +536,7 @@ export class WorkflowExecute {
if (!executionData.data!.hasOwnProperty('main')) {
// ExecutionData does not even have the connection set up so can
// not have that data, so add it again to be executed later
runExecutionData.executionData!.nodeExecutionStack.push(executionData);
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
@@ -549,7 +546,7 @@ export class WorkflowExecute {
// of both inputs has to be available to be able to process the node.
if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) {
// Does not have the data of the connections so add back to stack
runExecutionData.executionData!.nodeExecutionStack.push(executionData);
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
@@ -591,15 +588,8 @@ export class WorkflowExecute {
}
}
// Check again if the execution should be stopped else it
// could take forever to stop when each try takes a long time
if (this.activeExecutions.shouldBeStopped(this.executionId!) === true) {
// The execution should be stopped
break;
}
runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode);
if (nodeSuccessData === null) {
// If null gets returned it means that the node did succeed
@@ -620,8 +610,8 @@ export class WorkflowExecute {
// Add the data to return to the user
// (currently does not get cloned as data does not get changed, maybe later we should do that?!?!)
if (!runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
runExecutionData.resultData.runData[executionNode.name] = [];
if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) {
this.runExecutionData.resultData.runData[executionNode.name] = [];
}
taskData = {
startTime,
@@ -642,12 +632,12 @@ export class WorkflowExecute {
}
} else {
// Node execution did fail so add error and stop execution
runExecutionData.resultData.runData[executionNode.name].push(taskData);
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
// Add the execution data again so that it can get restarted
runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData);
this.executeHook('nodeExecuteAfter', [this.executionId, executionNode.name, taskData]);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]);
break;
}
@@ -658,11 +648,11 @@ export class WorkflowExecute {
'main': nodeSuccessData
} as ITaskDataConnections);
this.executeHook('nodeExecuteAfter', [this.executionId, executionNode.name, taskData]);
this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]);
runExecutionData.resultData.runData[executionNode.name].push(taskData);
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (runExecutionData.startData && runExecutionData.startData.destinationNode && runExecutionData.startData.destinationNode === executionNode.name) {
if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) {
// If destination node is defined and got executed stop execution
continue;
}
@@ -686,7 +676,7 @@ export class WorkflowExecute {
return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`));
}
this.addNodeToBeExecuted(workflow, runExecutionData, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex);
this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex);
}
}
}
@@ -696,45 +686,61 @@ export class WorkflowExecute {
return Promise.resolve();
})()
.then(async () => {
const fullRunData: IRun = {
data: runExecutionData,
mode: this.mode,
startedAt: new Date(startedAt),
stoppedAt: new Date(),
};
if (executionError !== undefined) {
fullRunData.data.resultData.error = executionError;
} else {
fullRunData.finished = true;
}
this.activeExecutions.remove(this.executionId!, fullRunData);
await this.executeHook('workflowExecuteAfter', [fullRunData, this.executionId!]);
return fullRunData;
return this.processSuccessExecution(startedAt, workflow, executionError);
})
.catch(async (error) => {
const fullRunData: IRun = {
data: runExecutionData,
mode: this.mode,
startedAt: new Date(startedAt),
stoppedAt: new Date(),
};
const fullRunData = this.getFullRunData(startedAt);
fullRunData.data.resultData.error = {
message: error.message,
stack: error.stack,
};
this.activeExecutions.remove(this.executionId!, fullRunData);
// Check if static data changed
let newStaticData: IDataObject | undefined;
if (workflow.staticData.__dataChanged === true) {
// Static data of workflow changed
newStaticData = workflow.staticData;
}
await this.executeHook('workflowExecuteAfter', [fullRunData, this.executionId!]);
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
return fullRunData;
}));
});
return this.executionId;
}
async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: IExecutionError): Promise<IRun> {
const fullRunData = this.getFullRunData(startedAt);
if (executionError !== undefined) {
fullRunData.data.resultData.error = executionError;
} else {
fullRunData.finished = true;
}
// Check if static data changed
let newStaticData: IDataObject | undefined;
if (workflow.staticData.__dataChanged === true) {
// Static data of workflow changed
newStaticData = workflow.staticData;
}
await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]);
return fullRunData;
}
getFullRunData(startedAt: Date): IRun {
const fullRunData: IRun = {
data: this.runExecutionData,
mode: this.mode,
startedAt,
stoppedAt: new Date(),
};
return fullRunData;
}
}