🐛 Fixing execution history for integrated workflows. (#1469)

* Fixing execution history for integrated workflows.

*  Minor improvements

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Omar Ajoue
2021-02-20 13:51:06 +01:00
committed by GitHub
parent 7df9694b3c
commit 6ee9501d16
4 changed files with 139 additions and 66 deletions

View File

@@ -1,4 +1,5 @@
import { import {
ActiveExecutions,
CredentialsHelper, CredentialsHelper,
Db, Db,
ExternalHooks, ExternalHooks,
@@ -282,6 +283,7 @@ export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowEx
}; };
} }
/** /**
* Returns hook functions to save workflow execution and call error workflow * Returns hook functions to save workflow execution and call error workflow
* *
@@ -321,7 +323,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
if (isManualMode && saveManualExecutions === false) { if (isManualMode && saveManualExecutions === false) {
// Data is always saved, so we remove from database // Data is always saved, so we remove from database
Db.collections.Execution!.delete(this.executionId); await Db.collections.Execution!.delete(this.executionId);
return; return;
} }
@@ -387,58 +389,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
} }
/** export async function getRunData(workflowData: IWorkflowBase, inputData?: INodeExecutionData[]): Promise<IWorkflowExecutionDataProcess> {
* Executes the workflow with the given ID
*
* @export
* @param {string} workflowId The id of the workflow to execute
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]): Promise<Array<INodeExecutionData[] | null>> {
const mode = 'integrated'; const mode = 'integrated';
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(`No information about the workflow to execute found. Please provide either the "id" or "code"!`);
}
if (Db.collections!.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
let workflowData: IWorkflowBase | undefined;
if (workflowInfo.id !== undefined) {
workflowData = await Db.collections!.Workflow!.findOne(workflowInfo.id);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowInfo.id}" does not exist.`);
}
} else {
workflowData = workflowInfo.code;
}
const externalHooks = ExternalHooks();
await externalHooks.init();
const nodeTypes = NodeTypes();
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({ id: workflowInfo.id, name: workflowName, nodes: workflowData!.nodes, connections: workflowData!.connections, active: workflowData!.active, nodeTypes, staticData: workflowData!.staticData });
// Does not get used so set it simply to empty string
const executionId = '';
// Get the needed credentials for the current workflow as they will differ to the ones of the
// calling workflow.
const credentials = await WorkflowCredentials(workflowData!.nodes);
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(mode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
// Find Start-Node // Find Start-Node
const requiredNodeTypes = ['n8n-nodes-base.start']; const requiredNodeTypes = ['n8n-nodes-base.start'];
let startNode: INode | undefined; let startNode: INode | undefined;
@@ -485,16 +438,107 @@ export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additi
}, },
}; };
// Get the needed credentials for the current workflow as they will differ to the ones of the
// calling workflow.
const credentials = await WorkflowCredentials(workflowData!.nodes);
const runData: IWorkflowExecutionDataProcess = {
credentials,
executionMode: mode,
executionData: runExecutionData,
// @ts-ignore
workflowData,
};
return runData;
}
export async function getWorkflowData(workflowInfo: IExecuteWorkflowInfo): Promise<IWorkflowBase> {
if (workflowInfo.id === undefined && workflowInfo.code === undefined) {
throw new Error(`No information about the workflow to execute found. Please provide either the "id" or "code"!`);
}
if (Db.collections!.Workflow === null) {
// The first time executeWorkflow gets called the Database has
// to get initialized first
await Db.init();
}
let workflowData: IWorkflowBase | undefined;
if (workflowInfo.id !== undefined) {
workflowData = await Db.collections!.Workflow!.findOne(workflowInfo.id);
if (workflowData === undefined) {
throw new Error(`The workflow with the id "${workflowInfo.id}" does not exist.`);
}
} else {
workflowData = workflowInfo.code;
}
return workflowData!;
}
/**
* Executes the workflow with the given ID
*
* @export
* @param {string} workflowId The id of the workflow to execute
* @param {IWorkflowExecuteAdditionalData} additionalData
* @param {INodeExecutionData[]} [inputData]
* @returns {(Promise<Array<INodeExecutionData[] | null>>)}
*/
export async function executeWorkflow(workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[], parentExecutionId?: string, loadedWorkflowData?: IWorkflowBase, loadedRunData?: IWorkflowExecutionDataProcess): Promise<Array<INodeExecutionData[] | null> | IRun> {
const externalHooks = ExternalHooks();
await externalHooks.init();
const nodeTypes = NodeTypes();
const workflowData = loadedWorkflowData !== undefined ? loadedWorkflowData : await getWorkflowData(workflowInfo);
const workflowName = workflowData ? workflowData.name : undefined;
const workflow = new Workflow({ id: workflowInfo.id, name: workflowName, nodes: workflowData!.nodes, connections: workflowData!.connections, active: workflowData!.active, nodeTypes, staticData: workflowData!.staticData });
const runData = loadedRunData !== undefined ? loadedRunData : await getRunData(workflowData, inputData);
let executionId;
if (parentExecutionId !== undefined) {
executionId = parentExecutionId;
} else {
executionId = parentExecutionId !== undefined ? parentExecutionId : await ActiveExecutions.getInstance().add(runData);
}
const runExecutionData = runData.executionData as IRunExecutionData;
// Get the needed credentials for the current workflow as they will differ to the ones of the
// calling workflow.
const credentials = await WorkflowCredentials(workflowData!.nodes);
// Create new additionalData to have different workflow loaded and to call
// different webooks
const additionalDataIntegrated = await getBase(credentials);
additionalDataIntegrated.hooks = getWorkflowHooksIntegrated(runData.executionMode, executionId, workflowData!, { parentProcessMode: additionalData.hooks!.mode });
// Execute the workflow // Execute the workflow
const workflowExecute = new WorkflowExecute(additionalDataIntegrated, mode, runExecutionData); const workflowExecute = new WorkflowExecute(additionalDataIntegrated, runData.executionMode, runExecutionData);
const data = await workflowExecute.processRunExecutionData(workflow); const data = await workflowExecute.processRunExecutionData(workflow);
await externalHooks.run('workflow.postExecute', [data, workflowData]); await externalHooks.run('workflow.postExecute', [data, workflowData]);
if (data.finished === true) { if (data.finished === true) {
// Workflow did finish successfully // Workflow did finish successfully
if (parentExecutionId !== undefined) {
return data;
} else {
await ActiveExecutions.getInstance().remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main; return returnData!.data!.main;
}
} else { } else {
// Workflow did fail // Workflow did fail
const error = new Error(data.data.resultData.error!.message); const error = new Error(data.data.resultData.error!.message);

View File

@@ -437,7 +437,7 @@ export class WorkflowRunner {
// Listen to data from the subprocess // Listen to data from the subprocess
subprocess.on('message', (message: IProcessMessage) => { subprocess.on('message', async (message: IProcessMessage) => {
if (message.type === 'end') { if (message.type === 'end') {
clearTimeout(executionTimeout); clearTimeout(executionTimeout);
this.activeExecutions.remove(executionId!, message.data.runData); this.activeExecutions.remove(executionId!, message.data.runData);
@@ -454,6 +454,11 @@ export class WorkflowRunner {
const timeoutError = { message: 'Workflow execution timed out!' } as IExecutionError; const timeoutError = { message: 'Workflow execution timed out!' } as IExecutionError;
this.processError(timeoutError, startedAt, data.executionMode, executionId); this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (message.type === 'startExecution') {
const executionId = await this.activeExecutions.add(message.data.runData);
subprocess.send({ type: 'executionId', data: {executionId} } as IProcessMessage);
} else if (message.type === 'finishExecution') {
await this.activeExecutions.remove(message.data.executionId, message.data.result);
} }
}); });

View File

@@ -7,6 +7,7 @@ import {
IWorkflowExecutionDataProcessWithExecution, IWorkflowExecutionDataProcessWithExecution,
NodeTypes, NodeTypes,
WorkflowExecuteAdditionalData, WorkflowExecuteAdditionalData,
WorkflowHelpers,
} from './'; } from './';
import { import {
@@ -17,12 +18,15 @@ import {
import { import {
IDataObject, IDataObject,
IExecuteData, IExecuteData,
IExecuteWorkflowInfo,
IExecutionError, IExecutionError,
INodeExecutionData,
INodeType, INodeType,
INodeTypeData, INodeTypeData,
IRun, IRun,
IRunExecutionData, IRunExecutionData,
ITaskData, ITaskData,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks, IWorkflowExecuteHooks,
Workflow, Workflow,
WorkflowHooks, WorkflowHooks,
@@ -35,6 +39,7 @@ export class WorkflowRunnerProcess {
startedAt = new Date(); startedAt = new Date();
workflow: Workflow | undefined; workflow: Workflow | undefined;
workflowExecute: WorkflowExecute | undefined; workflowExecute: WorkflowExecute | undefined;
executionIdCallback: (executionId: string) => void | undefined;
async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> { async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> {
@@ -96,6 +101,23 @@ export class WorkflowRunnerProcess {
const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials); const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.credentials);
additionalData.hooks = this.getProcessForwardHooks(); additionalData.hooks = this.getProcessForwardHooks();
const executeWorkflowFunction = additionalData.executeWorkflow;
additionalData.executeWorkflow = async (workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[] | undefined): Promise<Array<INodeExecutionData[] | null> | IRun> => {
const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData(workflowInfo);
const runData = await WorkflowExecuteAdditionalData.getRunData(workflowData, inputData);
await sendToParentProcess('startExecution', { runData });
const executionId: string = await new Promise((resolve) => {
this.executionIdCallback = (executionId: string) => {
resolve(executionId);
};
});
const result: IRun = await executeWorkflowFunction(workflowInfo, additionalData, inputData, executionId, workflowData, runData);
await sendToParentProcess('finishExecution', { executionId, result });
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result);
return returnData!.data!.main;
};
if (this.data.executionData !== undefined) { if (this.data.executionData !== undefined) {
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode, this.data.executionData); this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode, this.data.executionData);
return this.workflowExecute.processRunExecutionData(this.workflow); return this.workflowExecute.processRunExecutionData(this.workflow);
@@ -257,6 +279,8 @@ process.on('message', async (message: IProcessMessage) => {
// Stop process // Stop process
process.exit(); process.exit();
} else if (message.type === 'executionId') {
workflowRunner.executionIdCallback(message.data.executionId);
} }
} catch (error) { } catch (error) {
// Catch all uncaught errors and forward them to parent process // Catch all uncaught errors and forward them to parent process

View File

@@ -735,7 +735,7 @@ export interface IWorkflowExecuteAdditionalData {
credentials: IWorkflowCredentials; credentials: IWorkflowCredentials;
credentialsHelper: ICredentialsHelper; credentialsHelper: ICredentialsHelper;
encryptionKey: string; encryptionKey: string;
executeWorkflow: (workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[]) => Promise<any>; // tslint:disable-line:no-any executeWorkflow: (workflowInfo: IExecuteWorkflowInfo, additionalData: IWorkflowExecuteAdditionalData, inputData?: INodeExecutionData[], parentExecutionId?: string, loadedWorkflowData?: IWorkflowBase, loadedRunData?: any) => Promise<any>; // tslint:disable-line:no-any
// hooks?: IWorkflowExecuteHooks; // hooks?: IWorkflowExecuteHooks;
hooks?: WorkflowHooks; hooks?: WorkflowHooks;
httpResponse?: express.Response; httpResponse?: express.Response;