diff --git a/packages/cli/commands/execute.ts b/packages/cli/commands/execute.ts index aaf0064cf7..7c69435625 100644 --- a/packages/cli/commands/execute.ts +++ b/packages/cli/commands/execute.ts @@ -2,24 +2,20 @@ import Vorpal = require('vorpal'); import { Args } from 'vorpal'; import { promises as fs } from 'fs'; import { - CredentialTypes, + ActiveExecutions, Db, + GenericHelpers, IWorkflowBase, + IWorkflowExecutionDataProcess, LoadNodesAndCredentials, NodeTypes, - GenericHelpers, + WorkflowCredentials, WorkflowHelpers, - WorkflowExecuteAdditionalData, + WorkflowRunner, } from "../src"; import { - ActiveExecutions, UserSettings, - WorkflowExecute, } from "n8n-core"; -import { - INode, - Workflow, -} from "n8n-workflow"; module.exports = (vorpal: Vorpal) => { @@ -99,22 +95,16 @@ module.exports = (vorpal: Vorpal) => { // Add the found types to an instance other parts of the application can use const nodeTypes = NodeTypes(); await nodeTypes.init(loadNodesAndCredentials.nodeTypes); - const credentialTypes = CredentialTypes(); - await credentialTypes.init(loadNodesAndCredentials.credentialTypes); if (!WorkflowHelpers.isWorkflowIdValid(workflowId)) { workflowId = undefined; } - const workflowInstance = new Workflow(workflowId, workflowData!.nodes, workflowData!.connections, true, nodeTypes, workflowData!.staticData); - // Check if the workflow contains the required "Start" node // "requiredNodeTypes" are also defined in editor-ui/views/NodeView.vue const requiredNodeTypes = ['n8n-nodes-base.start']; let startNodeFound = false; - let node: INode; - for (const nodeName of Object.keys(workflowInstance.nodes)) { - node = workflowInstance.nodes[nodeName]; + for (const node of workflowData!.nodes) { if (requiredNodeTypes.includes(node.type)) { startNodeFound = true; } @@ -127,12 +117,17 @@ module.exports = (vorpal: Vorpal) => { return Promise.resolve(); } - const mode = 'cli'; - const additionalData = await WorkflowExecuteAdditionalData.get(mode, workflowData!, workflowInstance); - const workflowExecute = new WorkflowExecute(additionalData, mode); - try { - const executionId = await workflowExecute.run(workflowInstance); + const credentials = await WorkflowCredentials(workflowData!.nodes); + + const runData: IWorkflowExecutionDataProcess = { + credentials, + executionMode: 'cli', + workflowData: workflowData!, + }; + + const workflowRunner = new WorkflowRunner(); + const executionId = await workflowRunner.run(runData); const activeExecutions = ActiveExecutions.getInstance(); const data = await activeExecutions.getPostExecutePromise(executionId); diff --git a/packages/core/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts similarity index 65% rename from packages/core/src/ActiveExecutions.ts rename to packages/cli/src/ActiveExecutions.ts index 124978d9d3..4d4d70f67e 100644 --- a/packages/core/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -1,43 +1,42 @@ import { IRun, - IRunExecutionData, - Workflow, - WorkflowExecuteMode, } from 'n8n-workflow'; import { createDeferredPromise, - IExecutingWorkflowData, IExecutionsCurrentSummary, +} from 'n8n-core'; + +import { + IExecutingWorkflowData, + IWorkflowExecutionDataProcess, } from '.'; +import { ChildProcess } from 'child_process'; + export class ActiveExecutions { private nextId = 1; private activeExecutions: { [index: string]: IExecutingWorkflowData; } = {}; - private stopExecutions: string[] = []; - /** * Add a new active execution * - * @param {Workflow} workflow - * @param {IRunExecutionData} runExecutionData - * @param {WorkflowExecuteMode} mode + * @param {ChildProcess} process + * @param {IWorkflowExecutionDataProcess} executionData * @returns {string} * @memberof ActiveExecutions */ - add(workflow: Workflow, runExecutionData: IRunExecutionData, mode: WorkflowExecuteMode): string { + add(process: ChildProcess, executionData: IWorkflowExecutionDataProcess): string { const executionId = this.nextId++; this.activeExecutions[executionId] = { - runExecutionData, + executionData, + process, startedAt: new Date(), - mode, - workflow, postExecutePromises: [], }; @@ -53,7 +52,7 @@ export class ActiveExecutions { * @returns {void} * @memberof ActiveExecutions */ - remove(executionId: string, fullRunData: IRun): void { + remove(executionId: string, fullRunData?: IRun): void { if (this.activeExecutions[executionId] === undefined) { return; } @@ -65,12 +64,6 @@ export class ActiveExecutions { // Remove from the list of active executions delete this.activeExecutions[executionId]; - - const stopExecutionIndex = this.stopExecutions.indexOf(executionId); - if (stopExecutionIndex !== -1) { - // If it was on the stop-execution list remove it - this.stopExecutions.splice(stopExecutionIndex, 1); - } } @@ -87,16 +80,20 @@ export class ActiveExecutions { return; } - if (!this.stopExecutions.includes(executionId)) { - // Add the execution to the stop list if it is not already on it - this.stopExecutions.push(executionId); - } + // In case something goes wrong make sure that promise gets first + // returned that it gets then also resolved correctly. + setTimeout(() => { + if (this.activeExecutions[executionId].process.connected) { + this.activeExecutions[executionId].process.send({ + type: 'stopExecution' + }); + } + }, 1); return this.getPostExecutePromise(executionId); } - /** * Returns a promise which will resolve with the data of the execution * with the given id @@ -105,9 +102,9 @@ export class ActiveExecutions { * @returns {Promise} * @memberof ActiveExecutions */ - async getPostExecutePromise(executionId: string): Promise { + async getPostExecutePromise(executionId: string): Promise { // Create the promise which will be resolved when the execution finished - const waitPromise = await createDeferredPromise(); + const waitPromise = await createDeferredPromise(); if (this.activeExecutions[executionId] === undefined) { throw new Error(`There is no active execution with id "${executionId}".`); @@ -119,20 +116,6 @@ export class ActiveExecutions { } - - /** - * Returns if the execution should be stopped - * - * @param {string} executionId The execution id to check - * @returns {boolean} - * @memberof ActiveExecutions - */ - shouldBeStopped(executionId: string): boolean { - return this.stopExecutions.includes(executionId); - } - - - /** * Returns all the currently active executions * @@ -142,15 +125,15 @@ export class ActiveExecutions { getActiveExecutions(): IExecutionsCurrentSummary[] { const returnData: IExecutionsCurrentSummary[] = []; - let executionData; + let data; for (const id of Object.keys(this.activeExecutions)) { - executionData = this.activeExecutions[id]; + data = this.activeExecutions[id]; returnData.push( { id, - startedAt: executionData.startedAt, - mode: executionData.mode, - workflowId: executionData.workflow.id!, + startedAt: data.startedAt, + mode: data.executionData.executionMode, + workflowId: data.executionData.workflowData.id! as string, } ); } diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 512349e82e..750b4091a4 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -4,18 +4,28 @@ import { NodeTypes, IResponseCallbackData, IWorkflowDb, + IWorkflowExecutionDataProcess, ResponseHelper, WebhookHelpers, + WorkflowCredentials, WorkflowHelpers, + WorkflowRunner, WorkflowExecuteAdditionalData, } from './'; import { ActiveWorkflows, ActiveWebhooks, + NodeExecuteFunctions, + WorkflowExecute, } from 'n8n-core'; import { + IExecuteData, + IGetExecuteTriggerFunctions, + INode, + INodeExecutionData, + IRunExecutionData, IWebhookData, IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow, WebhookHttpMethod, @@ -209,6 +219,57 @@ export class ActiveWorkflowRunner { } + /** + * Return trigger function which gets the global functions from n8n-core + * and overwrites the emit to be able to start it in subprocess + * + * @param {IWorkflowDb} workflowData + * @param {IWorkflowExecuteAdditionalDataWorkflow} additionalData + * @param {WorkflowExecuteMode} mode + * @returns {IGetExecuteTriggerFunctions} + * @memberof ActiveWorkflowRunner + */ + getExecuteTriggerFunctions(workflowData: IWorkflowDb, additionalData: IWorkflowExecuteAdditionalDataWorkflow, mode: WorkflowExecuteMode): IGetExecuteTriggerFunctions{ + return ((workflow: Workflow, node: INode) => { + const returnFunctions = NodeExecuteFunctions.getExecuteTriggerFunctions(workflow, node, additionalData, mode); + returnFunctions.emit = (data: INodeExecutionData[][]): void => { + + const nodeExecutionStack: IExecuteData[] = [ + { + node, + data: { + main: data, + } + } + ]; + + const executionData: IRunExecutionData = { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack, + waitingExecution: {}, + }, + }; + + // Start the workflow + const runData: IWorkflowExecutionDataProcess = { + credentials: additionalData.credentials, + executionMode: mode, + executionData, + workflowData, + }; + + const workflowRunner = new WorkflowRunner(); + workflowRunner.run(runData); + }; + return returnFunctions; + }); + } + /** * Makes a workflow active * @@ -240,12 +301,13 @@ export class ActiveWorkflowRunner { } const mode = 'trigger'; - const additionalData = await WorkflowExecuteAdditionalData.get(mode, workflowData, workflowInstance); + const credentials = await WorkflowCredentials(workflowData.nodes); + const additionalData = await WorkflowExecuteAdditionalData.getBase(mode, credentials); + const getTriggerFunctions = this.getExecuteTriggerFunctions(workflowData, additionalData, mode); // Add the workflows which have webhooks defined await this.addWorkflowWebhooks(workflowInstance, additionalData, mode); - - await this.activeWorkflows.add(workflowId, workflowInstance, additionalData); + await this.activeWorkflows.add(workflowId, workflowInstance, additionalData, getTriggerFunctions); if (this.activationErrors[workflowId] !== undefined) { // If there were any activation errors delete them @@ -265,6 +327,8 @@ export class ActiveWorkflowRunner { throw error; } + // If for example webhooks get created it sometimes has to save the + // id of them in the static data. So make sure that data gets persisted. await WorkflowHelpers.saveStaticData(workflowInstance!); } diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index f3ba3b43df..3f5ed18d27 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -6,15 +6,22 @@ import { IExecutionError, INode, IRun, + IRunData, IRunExecutionData, ITaskData, + IWorkflowCredentials, IWorkflowSettings, WorkflowExecuteMode, } from 'n8n-workflow'; +import { + IDeferredPromise, +} from 'n8n-core'; + + import { ObjectID, Repository } from "typeorm"; - +import { ChildProcess } from 'child_process'; import { Url } from 'url'; import { Request } from 'express'; @@ -171,6 +178,13 @@ export interface IExecutionDeleteFilter { ids?: string[]; } +export interface IExecutingWorkflowData { + executionData: IWorkflowExecutionDataProcess; + process: ChildProcess; + startedAt: Date; + postExecutePromises: Array>; +} + export interface IN8nConfig { database: IN8nConfigDatabase; endpoints: IN8nConfigEndpoints; @@ -282,6 +296,14 @@ export interface IResponseCallbackData { } +export interface ITransferNodeTypes { + [key: string]: { + className: string; + sourcePath: string; + }; +} + + export interface IWorkflowErrorData { [key: string]: IDataObject | string | number | IExecutionError; execution: { @@ -295,3 +317,25 @@ export interface IWorkflowErrorData { name: string; }; } + +export interface IProcessMessageDataHook { + hook: string; + parameters: any[]; // tslint:disable-line:no-any +} + +export interface IWorkflowExecutionDataProcess { + credentials: IWorkflowCredentials; + destinationNode?: string; + executionMode: WorkflowExecuteMode; + executionData?: IRunExecutionData; + runData?: IRunData; + retryOf?: number | string | ObjectID; + sessionId?: string; + startNodes?: string[]; + workflowData: IWorkflowBase; +} + +export interface IWorkflowExecutionDataProcessWithExecution extends IWorkflowExecutionDataProcess { + executionId: string; + nodeTypeData: ITransferNodeTypes; +} diff --git a/packages/cli/src/LoadNodesAndCredentials.ts b/packages/cli/src/LoadNodesAndCredentials.ts index d3534786c2..95f7f3eae6 100644 --- a/packages/cli/src/LoadNodesAndCredentials.ts +++ b/packages/cli/src/LoadNodesAndCredentials.ts @@ -5,6 +5,7 @@ import { import { ICredentialType, INodeType, + INodeTypeData, } from 'n8n-workflow'; import * as config from '../config'; @@ -25,9 +26,7 @@ const fsStatAsync = promisify(fsStat); class LoadNodesAndCredentialsClass { - nodeTypes: { - [key: string]: INodeType - } = {}; + nodeTypes: INodeTypeData = {}; credentialTypes: { [key: string]: ICredentialType @@ -37,7 +36,7 @@ class LoadNodesAndCredentialsClass { nodeModulesPath = ''; - async init(directory?: string) { + async init() { // Get the path to the node-modules folder to be later able // to load the credentials and nodes const checkPaths = [ @@ -172,12 +171,15 @@ class LoadNodesAndCredentialsClass { tempNode.description.icon = 'file:' + path.join(path.dirname(filePath), tempNode.description.icon.substr(5)); } - // Check if the node should be skipped + // Check if the node should be skiped if (this.excludeNodes !== undefined && this.excludeNodes.includes(fullNodeName)) { return; } - this.nodeTypes[fullNodeName] = tempNode; + this.nodeTypes[fullNodeName] = { + type: tempNode, + sourcePath: filePath, + }; } diff --git a/packages/cli/src/NodeTypes.ts b/packages/cli/src/NodeTypes.ts index e321cce6c5..aaae63effd 100644 --- a/packages/cli/src/NodeTypes.ts +++ b/packages/cli/src/NodeTypes.ts @@ -1,26 +1,25 @@ import { INodeType, INodeTypes, + INodeTypeData, } from 'n8n-workflow'; class NodeTypesClass implements INodeTypes { - nodeTypes: { - [key: string]: INodeType - } = {}; + nodeTypes: INodeTypeData = {}; - async init(nodeTypes: {[key: string]: INodeType }): Promise { + async init(nodeTypes: INodeTypeData): Promise { this.nodeTypes = nodeTypes; } getAll(): INodeType[] { - return Object.values(this.nodeTypes); + return Object.values(this.nodeTypes).map((data) => data.type); } getByName(nodeType: string): INodeType | undefined { - return this.nodeTypes[nodeType]; + return this.nodeTypes[nodeType].type; } } diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index bed3fb4c58..2e737eafbc 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -4,15 +4,16 @@ import * as history from 'connect-history-api-fallback'; import * as requestPromise from 'request-promise-native'; import { - IActivationError, + ActiveExecutions, ActiveWorkflowRunner, + CredentialTypes, + Db, + IActivationError, ICustomRequest, ICredentialsDb, ICredentialsDecryptedDb, ICredentialsDecryptedResponse, ICredentialsResponse, - CredentialTypes, - Db, IExecutionDeleteFilter, IExecutionFlatted, IExecutionFlattedDb, @@ -25,22 +26,23 @@ import { IWorkflowBase, IWorkflowShortResponse, IWorkflowResponse, + IWorkflowExecutionDataProcess, NodeTypes, Push, ResponseHelper, TestWebhooks, + WorkflowCredentials, WebhookHelpers, WorkflowExecuteAdditionalData, WorkflowHelpers, + WorkflowRunner, GenericHelpers, } from './'; import { - ActiveExecutions, Credentials, LoadNodeParameterOptions, UserSettings, - WorkflowExecute, } from 'n8n-core'; import { @@ -127,7 +129,7 @@ class App { throw new Error('Basic auth is activated but no password got defined. Please set one!'); } - const authIgnoreRegex = new RegExp(`^\/(rest|${this.endpointWebhook}|${this.endpointWebhookTest})\/.*$`) + const authIgnoreRegex = new RegExp(`^\/(rest|${this.endpointWebhook}|${this.endpointWebhookTest})\/.*$`); this.app.use((req: express.Request, res: express.Response, next: express.NextFunction) => { if (req.url.match(authIgnoreRegex)) { return next(); @@ -386,45 +388,45 @@ class App { const runData: IRunData | undefined = req.body.runData; const startNodes: string[] | undefined = req.body.startNodes; const destinationNode: string | undefined = req.body.destinationNode; - const nodeTypes = NodeTypes(); const executionMode = 'manual'; const sessionId = GenericHelpers.getSessionId(req); - // Do not supply the saved static data! Tests always run with initially empty static data. - // The reason is that it contains information like webhook-ids. If a workflow is currently - // active it would see its id and would so not create an own test-webhook. Additionally would - // it also delete the webhook at the service in the end. So that the active workflow would end - // up without still being active but not receiving and webhook requests anymore as it does - // not exist anymore. - const workflowInstance = new Workflow(workflowData.id, workflowData.nodes, workflowData.connections, false, nodeTypes, undefined, workflowData.settings); - - const additionalData = await WorkflowExecuteAdditionalData.get(executionMode, workflowData, workflowInstance, sessionId); - - const workflowExecute = new WorkflowExecute(additionalData, executionMode); - - let executionId: string; - - if (runData === undefined || startNodes === undefined || startNodes.length === 0 || destinationNode === undefined) { - // Execute all nodes - - if (WorkflowHelpers.isWorkflowIdValid(workflowData.id) === true) { - // Webhooks can only be tested with saved workflows - const needsWebhook = await this.testWebhooks.needsWebhookData(workflowData, workflowInstance, additionalData, executionMode, sessionId, destinationNode); - if (needsWebhook === true) { - return { - waitingForWebhook: true, - }; - } + // Check if workflow is saved as webhooks can only be tested with saved workflows. + // If that is the case check if any webhooks calls are present we have to wait for and + // if that is the case wait till we receive it. + if (WorkflowHelpers.isWorkflowIdValid(workflowData.id) === true && (runData === undefined || startNodes === undefined || startNodes.length === 0 || destinationNode === undefined)) { + // Webhooks can only be tested with saved workflows + const credentials = await WorkflowCredentials(workflowData.nodes); + const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, credentials); + const nodeTypes = NodeTypes(); + const workflowInstance = new Workflow(workflowData.id, workflowData.nodes, workflowData.connections, false, nodeTypes, undefined, workflowData.settings); + const needsWebhook = await this.testWebhooks.needsWebhookData(workflowData, workflowInstance, additionalData, executionMode, sessionId, destinationNode); + if (needsWebhook === true) { + return { + waitingForWebhook: true, + }; } - - // Can execute without webhook so go on - executionId = await workflowExecute.run(workflowInstance, undefined, destinationNode); - } else { - // Execute only the nodes between start and destination nodes - executionId = await workflowExecute.runPartialWorkflow(workflowInstance, runData, startNodes, destinationNode); } + // For manual testing always set to not active + workflowData.active = false; + + const credentials = await WorkflowCredentials(workflowData.nodes); + + // Start the workflow + const data: IWorkflowExecutionDataProcess = { + credentials, + destinationNode, + executionMode, + runData, + sessionId, + startNodes, + workflowData, + }; + const workflowRunner = new WorkflowRunner(); + const executionId = await workflowRunner.run(data); + return { executionId, }; @@ -444,12 +446,11 @@ class App { const nodeTypes = NodeTypes(); const executionMode = 'manual'; - const sessionId = GenericHelpers.getSessionId(req); - const loadDataInstance = new LoadNodeParameterOptions(nodeType, nodeTypes, credentials); const workflowData = loadDataInstance.getWorkflowData() as IWorkflowBase; - const additionalData = await WorkflowExecuteAdditionalData.get(executionMode, workflowData, loadDataInstance.workflow, sessionId); + const workflowCredentials = await WorkflowCredentials(workflowData.nodes); + const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, workflowCredentials); return loadDataInstance.getOptions(methodName, additionalData); })); @@ -843,13 +844,22 @@ class App { const executionMode = 'retry'; - const nodeTypes = NodeTypes(); - const workflowInstance = new Workflow(req.params.id, fullExecutionData.workflowData.nodes, fullExecutionData.workflowData.connections, false, nodeTypes, fullExecutionData.workflowData.staticData, fullExecutionData.workflowData.settings); + const credentials = await WorkflowCredentials(fullExecutionData.workflowData.nodes); - const additionalData = await WorkflowExecuteAdditionalData.get(executionMode, fullExecutionData.workflowData, workflowInstance, undefined, req.params.id); - const workflowExecute = new WorkflowExecute(additionalData, executionMode); + fullExecutionData.workflowData.active = false; - return workflowExecute.runExecutionData(workflowInstance, fullExecutionData.data); + // Start the workflow + const data: IWorkflowExecutionDataProcess = { + credentials, + executionMode, + executionData: fullExecutionData.data, + retryOf: req.params.id, + workflowData: fullExecutionData.workflowData, + }; + const workflowRunner = new WorkflowRunner(); + const executionId = await workflowRunner.run(data); + + return executionId; })); diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 082abf3267..a4faa368ff 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -1,19 +1,21 @@ import * as express from 'express'; import { + ActiveExecutions, GenericHelpers, IExecutionDb, IResponseCallbackData, IWorkflowDb, + IWorkflowExecutionDataProcess, ResponseHelper, + WorkflowRunner, + WorkflowCredentials, WorkflowExecuteAdditionalData, } from './'; import { BINARY_ENCODING, - ActiveExecutions, NodeExecuteFunctions, - WorkflowExecute, } from 'n8n-core'; import { @@ -124,8 +126,8 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo } // Prepare everything that is needed to run the workflow - const additionalData = await WorkflowExecuteAdditionalData.get(executionMode, workflowData, webhookData.workflow, sessionId); - const workflowExecute = new WorkflowExecute(additionalData, executionMode); + const credentials = await WorkflowCredentials(workflowData.nodes); + const additionalData = await WorkflowExecuteAdditionalData.getBase(executionMode, credentials); // Add the Response and Request so that this data can be accessed in the node additionalData.httpRequest = req; @@ -207,8 +209,17 @@ export function getWorkflowWebhooks(workflow: Workflow, additionalData: IWorkflo }, }; + const runData: IWorkflowExecutionDataProcess = { + credentials, + executionMode, + executionData: runExecutionData, + sessionId, + workflowData, + }; + // Start now to run the workflow - const executionId = await workflowExecute.runExecutionData(webhookData.workflow, runExecutionData); + const workflowRunner = new WorkflowRunner(); + const executionId = await workflowRunner.run(runData); // Get a promise which resolves when the workflow did execute and send then response const executePromise = activeExecutions.getPostExecutePromise(executionId) as Promise; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 4b6793ade8..e3e942f3df 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -3,14 +3,11 @@ import { IExecutionDb, IExecutionFlattedDb, IPushDataExecutionFinished, - IPushDataExecutionStarted, - IPushDataNodeExecuteAfter, - IPushDataNodeExecuteBefore, IWorkflowBase, + IWorkflowExecutionDataProcess, Push, ResponseHelper, WebhookHelpers, - WorkflowCredentials, WorkflowHelpers, } from './'; @@ -19,11 +16,13 @@ import { } from "n8n-core"; import { + IDataObject, IRun, ITaskData, + IWorkflowCredentials, IWorkflowExecuteAdditionalData, + IWorkflowExecuteHooks, WorkflowExecuteMode, - Workflow, } from 'n8n-workflow'; import * as config from '../config'; @@ -68,7 +67,7 @@ function executeErrorWorkflow(workflowData: IWorkflowBase, fullRunData: IRun, mo * @param {string} executionIdActive The id of the finished execution * @param {string} [executionIdDb] The database id of finished execution */ -function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string) { +export function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, executionIdDb?: string) { // Clone the object except the runData. That one is not supposed // to be send. Because that data got send piece by piece after // each node which finished executing @@ -94,70 +93,79 @@ function pushExecutionFinished(fullRunData: IRun, executionIdActive: string, exe } -const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowInstance: Workflow, sessionId?: string, retryOf?: string) => { +/** + * Returns the workflow execution hooks + * + * @param {WorkflowExecuteMode} mode + * @param {IWorkflowBase} workflowData + * @param {string} executionId + * @param {string} [sessionId] + * @param {string} [retryOf] + * @returns {IWorkflowExecuteHooks} + */ +const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, executionId: string, sessionId?: string, retryOf?: string): IWorkflowExecuteHooks => { return { nodeExecuteBefore: [ - async (executionId: string, nodeName: string): Promise => { + async (nodeName: string): Promise => { + // Push data to session which started workflow before each + // node which starts rendering if (sessionId === undefined) { - // Only push data to the session which started it return; } - const sendData: IPushDataNodeExecuteBefore = { + pushInstance.send('nodeExecuteBefore', { executionId, nodeName, - }; - - pushInstance.send('nodeExecuteBefore', sendData, sessionId); + }, sessionId); }, ], nodeExecuteAfter: [ - async (executionId: string, nodeName: string, data: ITaskData): Promise => { + async (nodeName: string, data: ITaskData): Promise => { + // Push data to session which started workflow after each rendered node if (sessionId === undefined) { return; } - const sendData: IPushDataNodeExecuteAfter = { + pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data, - }; - - pushInstance.send('nodeExecuteAfter', sendData, sessionId); + }, sessionId); }, ], workflowExecuteBefore: [ - async (executionId: string): Promise => { + async (): Promise => { // Push data to editor-ui once workflow finished - const sendData: IPushDataExecutionStarted = { + pushInstance.send('executionStarted', { executionId, mode, startedAt: new Date(), retryOf, workflowId: workflowData.id as string, workflowName: workflowData.name, - }; - - pushInstance.send('executionStarted', sendData); + }); } ], workflowExecuteAfter: [ - async (fullRunData: IRun, executionId: string): Promise => { + async (fullRunData: IRun, newStaticData: IDataObject): Promise => { try { - const workflowSavePromise = WorkflowHelpers.saveStaticData(workflowInstance); + if (WorkflowHelpers.isWorkflowIdValid(workflowData.id as string) === true) { + // Workflow is saved so update in database + try { + await WorkflowHelpers.saveStaticDataById(workflowData.id as string, newStaticData); + } catch (e) { + // TODO: Add proper logging! + console.error(`There was a problem saving the workflow with id "${workflowData.id}" to save changed staticData: ${e.message}`); + } + } let saveManualExecutions = config.get('executions.saveDataManualExecutions') as boolean; - if (workflowInstance.settings !== undefined && workflowInstance.settings.saveManualExecutions !== undefined) { + if (workflowData.settings !== undefined && workflowData.settings.saveManualExecutions !== undefined) { // Apply to workflow override - saveManualExecutions = workflowInstance.settings.saveManualExecutions as boolean; + saveManualExecutions = workflowData.settings.saveManualExecutions as boolean; } if (mode === 'manual' && saveManualExecutions === false) { - if (workflowSavePromise !== undefined) { - // If workflow had to be saved wait till it is done - await workflowSavePromise; - } - pushExecutionFinished(fullRunData, executionId); executeErrorWorkflow(workflowData, fullRunData, mode); return; @@ -166,9 +174,9 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI // Check config to know if execution should be saved or not let saveDataErrorExecution = config.get('executions.saveDataOnError') as string; let saveDataSuccessExecution = config.get('executions.saveDataOnSuccess') as string; - if (workflowInstance.settings !== undefined) { - saveDataErrorExecution = (workflowInstance.settings.saveDataErrorExecution as string) || saveDataErrorExecution; - saveDataSuccessExecution = (workflowInstance.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution; + if (workflowData.settings !== undefined) { + saveDataErrorExecution = (workflowData.settings.saveDataErrorExecution as string) || saveDataErrorExecution; + saveDataSuccessExecution = (workflowData.settings.saveDataSuccessExecution as string) || saveDataSuccessExecution; } const workflowDidSucceed = !fullRunData.data.resultData.error; @@ -208,11 +216,6 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI await Db.collections.Execution!.update(retryOf, { retrySuccessId: executionResult.id }); } - if (workflowSavePromise !== undefined) { - // If workflow had to be saved wait till it is done - await workflowSavePromise; - } - pushExecutionFinished(fullRunData, executionId, executionResult.id as string); executeErrorWorkflow(workflowData, fullRunData, mode, executionResult ? executionResult.id as string : undefined); } catch (error) { @@ -225,7 +228,15 @@ const hooks = (mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowI }; -export async function get(mode: WorkflowExecuteMode, workflowData: IWorkflowBase, workflowInstance: Workflow, sessionId?: string, retryOf?: string): Promise { +/** + * Returns the base additional data without webhooks + * + * @export + * @param {WorkflowExecuteMode} mode + * @param {IWorkflowCredentials} credentials + * @returns {Promise} + */ +export async function getBase(mode: WorkflowExecuteMode, credentials: IWorkflowCredentials): Promise { const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const timezone = config.get('generic.timezone') as string; @@ -238,11 +249,23 @@ export async function get(mode: WorkflowExecuteMode, workflowData: IWorkflowBase } return { - credentials: await WorkflowCredentials(workflowData.nodes), - hooks: hooks(mode, workflowData, workflowInstance, sessionId, retryOf), + credentials, encryptionKey, timezone, webhookBaseUrl, webhookTestBaseUrl, }; } + + +/** + * Returns the workflow hooks + * + * @export + * @param {IWorkflowExecutionDataProcess} data + * @param {string} executionId + * @returns {IWorkflowExecuteHooks} + */ +export function getHookMethods(data: IWorkflowExecutionDataProcess, executionId: string): IWorkflowExecuteHooks { + return hooks(data.executionMode, data.workflowData, executionId, data.sessionId, data.retryOf as string | undefined); +} diff --git a/packages/cli/src/WorkflowHelpers.ts b/packages/cli/src/WorkflowHelpers.ts index f4d8925a89..84967ba0d9 100644 --- a/packages/cli/src/WorkflowHelpers.ts +++ b/packages/cli/src/WorkflowHelpers.ts @@ -1,15 +1,14 @@ import { Db, + IWorkflowExecutionDataProcess, IWorkflowErrorData, NodeTypes, - WorkflowExecuteAdditionalData, + WorkflowCredentials, + WorkflowRunner, } from './'; import { - WorkflowExecute, -} from 'n8n-core'; - -import { + IDataObject, IExecuteData, INode, IRunExecutionData, @@ -80,10 +79,7 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData return; } - const additionalData = await WorkflowExecuteAdditionalData.get(executionMode, workflowData, workflowInstance); - // Can execute without webhook so go on - const workflowExecute = new WorkflowExecute(additionalData, executionMode); // Initialize the data of the webhook node const nodeExecutionStack: IExecuteData[] = []; @@ -115,9 +111,17 @@ export async function executeErrorWorkflow(workflowId: string, workflowErrorData }, }; - // Start now to run the workflow - await workflowExecute.runExecutionData(workflowInstance, runExecutionData); + const credentials = await WorkflowCredentials(workflowData.nodes); + const runData: IWorkflowExecutionDataProcess = { + credentials, + executionMode, + executionData: runExecutionData, + workflowData, + }; + + const workflowRunner = new WorkflowRunner(); + await workflowRunner.run(runData); } catch (error) { console.error(`ERROR: Calling Error Workflow for "${workflowErrorData.workflow.id}": ${error.message}`); } @@ -138,10 +142,7 @@ export async function saveStaticData(workflow: Workflow): Promise { if (isWorkflowIdValid(workflow.id) === true) { // Workflow is saved so update in database try { - await Db.collections.Workflow! - .update(workflow.id!, { - staticData: workflow.staticData, - }); + await saveStaticDataById(workflow.id!, workflow.staticData); workflow.staticData.__dataChanged = false; } catch (e) { // TODO: Add proper logging! @@ -150,3 +151,20 @@ export async function saveStaticData(workflow: Workflow): Promise { } } } + + + +/** + * Saves the given static data on workflow + * + * @export + * @param {(string | number)} workflowId The id of the workflow to save data on + * @param {IDataObject} newStaticData The static data to save + * @returns {Promise} + */ +export async function saveStaticDataById(workflowId: string | number, newStaticData: IDataObject): Promise { + await Db.collections.Workflow! + .update(workflowId, { + staticData: newStaticData, + }); +} diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts new file mode 100644 index 0000000000..d842c4dd89 --- /dev/null +++ b/packages/cli/src/WorkflowRunner.ts @@ -0,0 +1,191 @@ + +import { + ActiveExecutions, + IProcessMessageDataHook, + ITransferNodeTypes, + IWorkflowExecutionDataProcess, + IWorkflowExecutionDataProcessWithExecution, + NodeTypes, + Push, + WorkflowExecuteAdditionalData, +} from './'; + +import { + IProcessMessage, +} from 'n8n-core'; + +import { + IExecutionError, + INode, + IRun, + IWorkflowExecuteHooks, + WorkflowExecuteMode, +} from 'n8n-workflow'; + +import { fork } from 'child_process'; + + +export class WorkflowRunner { + activeExecutions: ActiveExecutions.ActiveExecutions; + push: Push.Push; + + + constructor() { + this.push = Push.getInstance(); + this.activeExecutions = ActiveExecutions.getInstance(); + } + + + /** + * Returns the data of the node types that are needed + * to execute the given nodes + * + * @param {INode[]} nodes + * @returns {ITransferNodeTypes} + * @memberof WorkflowRunner + */ + getNodeTypeData(nodes: INode[]): ITransferNodeTypes { + const nodeTypes = NodeTypes(); + + // Check which node-types have to be loaded + const neededNodeTypes: string[] = []; + for (const node of nodes) { + if (!neededNodeTypes.includes(node.type)) { + neededNodeTypes.push(node.type); + } + } + + // Get all the data of the needed node types that they + // can be loaded again in the process + const returnData: ITransferNodeTypes = {}; + for (const nodeTypeName of neededNodeTypes) { + if (nodeTypes.nodeTypes[nodeTypeName] === undefined) { + throw new Error(`The NodeType "${nodeTypeName}" could not be found!`); + } + + returnData[nodeTypeName] = { + className: nodeTypes.nodeTypes[nodeTypeName].type.constructor.name, + sourcePath: nodeTypes.nodeTypes[nodeTypeName].sourcePath, + }; + } + + return returnData; + } + + + /** + * The process did send a hook message so execute the appropiate hook + * + * @param {IWorkflowExecuteHooks} hookFunctions + * @param {IProcessMessageDataHook} hookData + * @memberof WorkflowRunner + */ + processHookMessage(hookFunctions: IWorkflowExecuteHooks, hookData: IProcessMessageDataHook) { + if (hookFunctions[hookData.hook] !== undefined && Array.isArray(hookFunctions[hookData.hook])) { + + for (const hookFunction of hookFunctions[hookData.hook]!) { + // TODO: Not sure if that is 100% correct or something is still missing like to wait + hookFunction.apply(this, hookData.parameters) + .catch((error: Error) => { + // Catch all errors here because when "executeHook" gets called + // we have the most time no "await" and so the errors would so + // not be uncaught by anything. + + // TODO: Add proper logging + console.error(`There was a problem executing hook: "${hookData.hook}"`); + console.error('Parameters:'); + console.error(hookData.parameters); + console.error('Error:'); + console.error(error); + }); + } + } + } + + + /** + * The process did error + * + * @param {IExecutionError} error + * @param {Date} startedAt + * @param {WorkflowExecuteMode} executionMode + * @param {string} executionId + * @memberof WorkflowRunner + */ + processError(error: IExecutionError, startedAt: Date, executionMode: WorkflowExecuteMode, executionId: string) { + const fullRunData: IRun = { + data: { + resultData: { + error, + runData: {}, + }, + }, + finished: false, + mode: executionMode, + startedAt, + stoppedAt: new Date(), + }; + + // Remove from active execution with empty data. That will + // set the execution to failed. + this.activeExecutions.remove(executionId, fullRunData); + + // Also send to Editor UI + WorkflowExecuteAdditionalData.pushExecutionFinished(fullRunData, executionId); + } + + + /** + * Run the workflow in subprocess + * + * @param {IWorkflowExecutionDataProcess} data + * @returns {Promise} + * @memberof WorkflowRunner + */ + async run(data: IWorkflowExecutionDataProcess): Promise { + const startedAt = new Date(); + const subprocess = fork('./dist/src/WorkflowRunnerProcess.js'); + + // Register the active execution + const executionId = this.activeExecutions.add(subprocess, data); + + const nodeTypeData = this.getNodeTypeData(data.workflowData.nodes); + + (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; + (data as unknown as IWorkflowExecutionDataProcessWithExecution).nodeTypeData = nodeTypeData; + + const hookFunctions = WorkflowExecuteAdditionalData.getHookMethods(data, executionId); + + // Send all data to subprocess it needs to run the workflow + subprocess.send({ type: 'startWorkflow', data } as IProcessMessage); + + // Listen to data from the subprocess + subprocess.on('message', (message: IProcessMessage) => { + if (message.type === 'end') { + this.activeExecutions.remove(executionId!, message.data.runData); + } else if (message.type === 'processError') { + + const executionError = message.data.executionError as IExecutionError; + + this.processError(executionError, startedAt, data.executionMode, executionId); + + } else if (message.type === 'processHook') { + this.processHookMessage(hookFunctions, message.data as IProcessMessageDataHook); + } + }); + + // Also get informed when the processes does exit especially when it did crash + subprocess.on('exit', (code, signal) => { + if (code !== 0) { + // Process did exit with error code, so something went wrong. + const executionError = { + message: 'Workflow execution process did crash for an unknown reason!', + } as IExecutionError; + + this.processError(executionError, startedAt, data.executionMode, executionId); + } + }); + + return executionId; + } +} diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts new file mode 100644 index 0000000000..d5d6c3f4be --- /dev/null +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -0,0 +1,208 @@ + +import { + IProcessMessageDataHook, + IWorkflowExecutionDataProcessWithExecution, + NodeTypes, + WorkflowExecuteAdditionalData, +} from './'; + +import { + IProcessMessage, + WorkflowExecute, +} from 'n8n-core'; + +import { + IDataObject, + IExecutionError, + INodeType, + INodeTypeData, + IRun, + ITaskData, + IWorkflowExecuteHooks, + Workflow, +} from 'n8n-workflow'; +import { ChildProcess } from 'child_process'; + +export class WorkflowRunnerProcess { + data: IWorkflowExecutionDataProcessWithExecution | undefined; + startedAt = new Date(); + workflow: Workflow | undefined; + workflowExecute: WorkflowExecute | undefined; + + async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise { + this.data = inputData; + let className: string; + let tempNode: INodeType; + let filePath: string; + + this.startedAt = new Date(); + + const nodeTypesData: INodeTypeData = {}; + for (const nodeTypeName of Object.keys(this.data.nodeTypeData)) { + className = this.data.nodeTypeData[nodeTypeName].className; + + filePath = this.data.nodeTypeData[nodeTypeName].sourcePath; + const tempModule = require(filePath); + + try { + tempNode = new tempModule[className]() as INodeType; + } catch (error) { + throw new Error(`Error loading node "${nodeTypeName}" from: "${filePath}"`); + } + + nodeTypesData[nodeTypeName] = { + type: tempNode, + sourcePath: filePath, + }; + } + + const nodeTypes = NodeTypes(); + await nodeTypes.init(nodeTypesData); + + this.workflow = new Workflow(this.data.workflowData.id as string | undefined, this.data.workflowData!.nodes, this.data.workflowData!.connections, this.data.workflowData!.active, nodeTypes, this.data.workflowData!.staticData); + const additionalData = await WorkflowExecuteAdditionalData.getBase(this.data.executionMode, this.data.credentials); + additionalData.hooks = this.getProcessForwardHooks(); + + + if (this.data.executionData !== undefined) { + this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode, this.data.executionData); + return this.workflowExecute.processRunExecutionData(this.workflow); + } else if (this.data.runData === undefined || this.data.startNodes === undefined || this.data.startNodes.length === 0 || this.data.destinationNode === undefined) { + // Execute all nodes + + // Can execute without webhook so go on + this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); + return this.workflowExecute.run(this.workflow, undefined, this.data.destinationNode); + } else { + // Execute only the nodes between start and destination nodes + this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode); + return this.workflowExecute.runPartialWorkflow(this.workflow, this.data.runData, this.data.startNodes, this.data.destinationNode); + } + } + + + sendHookToParentProcess(hook: string, parameters: any[]) { // tslint:disable-line:no-any + (process as unknown as ChildProcess).send({ + type: 'processHook', + data: { + hook, + parameters, + } as IProcessMessageDataHook, + } as IProcessMessage); + } + + /** + * Create a wrapper for hooks which simply forwards the data to + * the parent process where they then can be executed with access + * to database and to PushService + * + * @param {ChildProcess} process + * @returns + */ + getProcessForwardHooks(): IWorkflowExecuteHooks { + return { + nodeExecuteBefore: [ + async (nodeName: string): Promise => { + this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]); + }, + ], + nodeExecuteAfter: [ + async (nodeName: string, data: ITaskData): Promise => { + this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]); + }, + ], + workflowExecuteBefore: [ + async (): Promise => { + this.sendHookToParentProcess('workflowExecuteBefore', []); + } + ], + workflowExecuteAfter: [ + async (fullRunData: IRun, newStaticData?: IDataObject): Promise => { + this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]); + }, + ] + }; + } + +} + + + +/** + * Sends data to parent process + * + * @param {string} type The type of data to send + * @param {*} data The data + */ +function sendToParentProcess(type: string, data: any): void { // tslint:disable-line:no-any + process.send!({ + type, + data, + }); +} + + +const workflowRunner = new WorkflowRunnerProcess(); + + +// Listen to messages from parent process which send the data of +// the worflow to process +process.on('message', async (message: IProcessMessage) => { + try { + if (message.type === 'startWorkflow') { + const runData = await workflowRunner.runWorkflow(message.data); + + sendToParentProcess('end', { + runData, + }); + + // Once the workflow got executed make sure the process gets killed again + process.exit(); + } else if (message.type === 'stopExecution') { + // The workflow execution should be stopped + let fullRunData: IRun; + + if (workflowRunner.workflowExecute !== undefined) { + // Workflow started already executing + + fullRunData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt); + + // If there is any data send it to parent process + await workflowRunner.workflowExecute.processSuccessExecution(workflowRunner.startedAt, workflowRunner.workflow!); + } else { + // Workflow did not get started yet + fullRunData = { + data: { + resultData: { + runData: {}, + }, + }, + finished: true, + mode: workflowRunner.data!.executionMode, + startedAt: workflowRunner.startedAt, + stoppedAt: new Date(), + }; + + workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [fullRunData]); + } + + sendToParentProcess('end', { + fullRunData, + }); + + // Stop process + process.exit(); + } + } catch (error) { + // Catch all uncaught errors and forward them to parent process + const executionError = { + message: error.message, + stack: error.stack, + } as IExecutionError; + + sendToParentProcess('processError', { + executionError, + }); + process.exit(); + } +}); \ No newline at end of file diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 26acf451a2..0b7ae6ad0a 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -3,8 +3,9 @@ export * from './Interfaces'; export * from './LoadNodesAndCredentials'; export * from './NodeTypes'; export * from './WorkflowCredentials'; +export * from './WorkflowRunner'; - +import * as ActiveExecutions from './ActiveExecutions'; import * as ActiveWorkflowRunner from './ActiveWorkflowRunner'; import * as Db from './Db'; import * as GenericHelpers from './GenericHelpers'; @@ -16,6 +17,7 @@ import * as WebhookHelpers from './WebhookHelpers'; import * as WorkflowExecuteAdditionalData from './WorkflowExecuteAdditionalData'; import * as WorkflowHelpers from './WorkflowHelpers'; export { + ActiveExecutions, ActiveWorkflowRunner, Db, GenericHelpers, diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index 4c654eb489..0a7b3df268 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -1,13 +1,10 @@ import { + IGetExecuteTriggerFunctions, ITriggerResponse, IWorkflowExecuteAdditionalData, Workflow, } from 'n8n-workflow'; -import { - NodeExecuteFunctions, -} from './'; - export interface WorkflowData { workflow: Workflow; @@ -65,7 +62,7 @@ export class ActiveWorkflows { * @returns {Promise} * @memberof ActiveWorkflows */ - async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData): Promise { + async add(id: string, workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData, getTriggerFunctions: IGetExecuteTriggerFunctions): Promise { console.log('ADD ID (active): ' + id); this.workflowData[id] = { @@ -75,7 +72,7 @@ export class ActiveWorkflows { let triggerResponse: ITriggerResponse | undefined; for (const triggerNode of triggerNodes) { - triggerResponse = await workflow.runTrigger(triggerNode, NodeExecuteFunctions, additionalData, 'trigger'); + triggerResponse = await workflow.runTrigger(triggerNode, getTriggerFunctions, additionalData, 'trigger'); if (triggerResponse !== undefined) { // If a response was given save it this.workflowData[id].triggerResponse = triggerResponse; diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index fead342e48..22fc156130 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -8,18 +8,12 @@ import { ILoadOptionsFunctions as ILoadOptionsFunctionsBase, INodeExecutionData, INodeType, - IRun, - IRunExecutionData, ITriggerFunctions as ITriggerFunctionsBase, IWebhookFunctions as IWebhookFunctionsBase, IWorkflowSettings as IWorkflowSettingsWorkflow, - Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; -import { - IDeferredPromise -} from '.'; import * as request from 'request'; import * as requestPromise from 'request-promise-native'; @@ -29,6 +23,12 @@ interface Constructable { } +export interface IProcessMessage { + data?: any; // tslint:disable-line:no-any + type: string; +} + + export interface IExecuteFunctions extends IExecuteFunctionsBase { helpers: { prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; @@ -45,13 +45,6 @@ export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase { }; } -export interface IExecutingWorkflowData { - runExecutionData: IRunExecutionData; - startedAt: Date; - mode: WorkflowExecuteMode; - workflow: Workflow; - postExecutePromises: Array>; -} export interface IExecutionsCurrentSummary { id: string; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index c92e5a40a9..306c32ff83 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -314,33 +314,12 @@ export function getWebhookDescription(name: string, workflow: Workflow, node: IN * @param {WorkflowExecuteMode} mode * @returns {ITriggerFunctions} */ +// TODO: Check if I can get rid of: additionalData, and so then maybe also at ActiveWorkflowRunner.add export function getExecuteTriggerFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): ITriggerFunctions { return ((workflow: Workflow, node: INode) => { return { emit: (data: INodeExecutionData[][]): void => { - const workflowExecute = new WorkflowExecute(additionalData, mode); - const nodeExecutionStack: IExecuteData[] = [ - { - node, - data: { - main: data, - } - } - ]; - - const runExecutionData: IRunExecutionData = { - startData: {}, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - nodeExecutionStack, - waitingExecution: {}, - }, - }; - - workflowExecute.runExecutionData(workflow, runExecutionData); + throw new Error('Overwrite NodeExecuteFunctions.getExecuteTriggerFunctions.emit function!'); }, getCredentials(type: string): ICredentialDataDecryptedObject | undefined { return getCredentials(workflow, node, type, additionalData); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 6aed1b24f9..6fade08396 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -1,5 +1,6 @@ import { IConnection, + IDataObject, IExecuteData, IExecutionError, INode, @@ -16,21 +17,30 @@ import { Workflow, } from 'n8n-workflow'; import { - ActiveExecutions, NodeExecuteFunctions, } from './'; export class WorkflowExecute { + runExecutionData: IRunExecutionData; private additionalData: IWorkflowExecuteAdditionalData; private mode: WorkflowExecuteMode; - private activeExecutions: ActiveExecutions.ActiveExecutions; - private executionId: string | null = null; - constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode) { + constructor(additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, runExecutionData?: IRunExecutionData) { this.additionalData = additionalData; - this.activeExecutions = ActiveExecutions.getInstance(); this.mode = mode; + this.runExecutionData = runExecutionData || { + startData: { + }, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + waitingExecution: {}, + }, + }; } @@ -44,7 +54,7 @@ export class WorkflowExecute { * @returns {(Promise)} * @memberof WorkflowExecute */ - async run(workflow: Workflow, startNode?: INode, destinationNode?: string): Promise { + async run(workflow: Workflow, startNode?: INode, destinationNode?: string): Promise { // Get the nodes to start workflow execution from startNode = startNode || workflow.getStartNode(destinationNode); @@ -75,7 +85,7 @@ export class WorkflowExecute { } ]; - const runExecutionData: IRunExecutionData = { + this.runExecutionData = { startData: { destinationNode, runNodeFilter, @@ -90,7 +100,7 @@ export class WorkflowExecute { }, }; - return this.runExecutionData(workflow, runExecutionData); + return this.processRunExecutionData(workflow); } @@ -105,8 +115,7 @@ export class WorkflowExecute { * @returns {(Promise)} * @memberof WorkflowExecute */ - async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise { - + async runPartialWorkflow(workflow: Workflow, runData: IRunData, startNodes: string[], destinationNode: string): Promise { let incomingNodeConnections: INodeConnections | undefined; let connection: IConnection; @@ -185,8 +194,7 @@ export class WorkflowExecute { runNodeFilter = workflow.getParentNodes(destinationNode); runNodeFilter.push(destinationNode); - - const runExecutionData: IRunExecutionData = { + this.runExecutionData = { startData: { destinationNode, runNodeFilter, @@ -201,7 +209,7 @@ export class WorkflowExecute { }, }; - return await this.runExecutionData(workflow, runExecutionData); + return await this.processRunExecutionData(workflow); } @@ -240,7 +248,7 @@ export class WorkflowExecute { } - addNodeToBeExecuted(workflow: Workflow, runExecutionData: IRunExecutionData, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void { + addNodeToBeExecuted(workflow: Workflow, connectionData: IConnection, outputIndex: number, parentNodeName: string, nodeSuccessData: INodeExecutionData[][], runIndex: number): void { let stillDataMissing = false; // Check if node has multiple inputs as then we have to wait for all input data @@ -250,33 +258,33 @@ export class WorkflowExecute { let nodeWasWaiting = true; // Check if there is already data for the node - if (runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) { + if (this.runExecutionData.executionData!.waitingExecution[connectionData.node] === undefined) { // Node does not have data yet so create a new empty one - runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; + this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; nodeWasWaiting = false; } - if (runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) { + if (this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] === undefined) { // Node does not have data for runIndex yet so create also empty one and init it - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { + this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { main: [] }; for (let i = 0; i < workflow.connectionsByDestinationNode[connectionData.node]['main'].length; i++) { - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null); + this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.push(null); } } // Add the new data if (nodeSuccessData === null) { - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null; + this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = null; } else { - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex]; + this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[connectionData.index] = nodeSuccessData[outputIndex]; } // Check if all data exists now let thisExecutionData: INodeExecutionData[] | null; let allDataFound = true; - for (let i = 0; i < runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) { - thisExecutionData = runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i]; + for (let i = 0; i < this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main.length; i++) { + thisExecutionData = this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex].main[i]; if (thisExecutionData === null) { allDataFound = false; break; @@ -286,17 +294,17 @@ export class WorkflowExecute { if (allDataFound === true) { // All data exists for node to be executed // So add it to the execution stack - runExecutionData.executionData!.nodeExecutionStack.push({ + this.runExecutionData.executionData!.nodeExecutionStack.push({ node: workflow.nodes[connectionData.node], - data: runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] + data: this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] }); // Remove the data from waiting - delete runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]; + delete this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex]; - if (Object.keys(runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) { + if (Object.keys(this.runExecutionData.executionData!.waitingExecution[connectionData.node]).length === 0) { // No more data left for the node so also delete that one - delete runExecutionData.executionData!.waitingExecution[connectionData.node]; + delete this.runExecutionData.executionData!.waitingExecution[connectionData.node]; } return; } else { @@ -327,7 +335,7 @@ export class WorkflowExecute { continue; } - const executionStackNodes = runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name); + const executionStackNodes = this.runExecutionData.executionData!.nodeExecutionStack.map((stackData) => stackData.node.name); // Check if that node is also an output connection of the // previously processed one @@ -345,7 +353,7 @@ export class WorkflowExecute { } // Check if node got processed already - if (runExecutionData.resultData.runData[inputData.node] !== undefined) { + if (this.runExecutionData.resultData.runData[inputData.node] !== undefined) { // Node got processed already so no need to add it continue; } @@ -376,7 +384,7 @@ export class WorkflowExecute { } // Check if node got processed already - if (runExecutionData.resultData.runData[parentNode] !== undefined) { + if (this.runExecutionData.resultData.runData[parentNode] !== undefined) { // Node got processed already so we can use the // output data as input of this node break; @@ -393,7 +401,7 @@ export class WorkflowExecute { if (workflow.connectionsByDestinationNode[nodeToAdd] === undefined) { // Add only node if it does not have any inputs becuase else it will // be added by its input node later anyway. - runExecutionData.executionData!.nodeExecutionStack.push( + this.runExecutionData.executionData!.nodeExecutionStack.push( { node: workflow.getNode(nodeToAdd) as INode, data: { @@ -428,15 +436,15 @@ export class WorkflowExecute { if (stillDataMissing === true) { // Additional data is needed to run node so add it to waiting - if (!runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) { - runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; + if (!this.runExecutionData.executionData!.waitingExecution.hasOwnProperty(connectionData.node)) { + this.runExecutionData.executionData!.waitingExecution[connectionData.node] = {}; } - runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { + this.runExecutionData.executionData!.waitingExecution[connectionData.node][runIndex] = { main: connectionDataArray }; } else { // All data is there so add it directly to stack - runExecutionData.executionData!.nodeExecutionStack.push({ + this.runExecutionData.executionData!.nodeExecutionStack.push({ node: workflow.nodes[connectionData.node], data: { main: connectionDataArray @@ -450,12 +458,11 @@ export class WorkflowExecute { * Runs the given execution data. * * @param {Workflow} workflow - * @param {IRunExecutionData} runExecutionData * @returns {Promise} * @memberof WorkflowExecute */ - async runExecutionData(workflow: Workflow, runExecutionData: IRunExecutionData): Promise { - const startedAt = new Date().getTime(); + async processRunExecutionData(workflow: Workflow): Promise { + const startedAt = new Date(); const workflowIssues = workflow.checkReadyForExecution(); if (workflowIssues !== null) { @@ -471,39 +478,29 @@ export class WorkflowExecute { let startTime: number; let taskData: ITaskData; - if (runExecutionData.startData === undefined) { - runExecutionData.startData = {}; + if (this.runExecutionData.startData === undefined) { + this.runExecutionData.startData = {}; } - this.executionId = this.activeExecutions.add(workflow, runExecutionData, this.mode); - - this.executeHook('workflowExecuteBefore', [this.executionId]); + this.executeHook('workflowExecuteBefore', []); let currentExecutionTry = ''; let lastExecutionTry = ''; - // Wait for the next tick so that the executionId gets already returned. - // So it can directly be send to the editor-ui and is so aware of the - // executionId when the first push messages arrive. - process.nextTick(() => (async () => { + return (async () => { executionLoop: - while (runExecutionData.executionData!.nodeExecutionStack.length !== 0) { - if (this.activeExecutions.shouldBeStopped(this.executionId!) === true) { - // The execution should be stopped - break; - } - + while (this.runExecutionData.executionData!.nodeExecutionStack.length !== 0) { nodeSuccessData = null; executionError = undefined; - executionData = runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData; + executionData = this.runExecutionData.executionData!.nodeExecutionStack.shift() as IExecuteData; executionNode = executionData.node; - this.executeHook('nodeExecuteBefore', [this.executionId, executionNode.name]); + this.executeHook('nodeExecuteBefore', [executionNode.name]); // Get the index of the current run runIndex = 0; - if (runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { - runIndex = runExecutionData.resultData.runData[executionNode.name].length; + if (this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { + runIndex = this.runExecutionData.resultData.runData[executionNode.name].length; } currentExecutionTry = `${executionNode.name}:${runIndex}`; @@ -512,7 +509,7 @@ export class WorkflowExecute { throw new Error('Did stop execution because execution seems to be in endless loop.'); } - if (runExecutionData.startData!.runNodeFilter !== undefined && runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) { + if (this.runExecutionData.startData!.runNodeFilter !== undefined && this.runExecutionData.startData!.runNodeFilter!.indexOf(executionNode.name) === -1) { // If filter is set and node is not on filter skip it, that avoids the problem that it executes // leafs that are parallel to a selected destinationNode. Normally it would execute them because // they have the same parent and it executes all child nodes. @@ -539,7 +536,7 @@ export class WorkflowExecute { if (!executionData.data!.hasOwnProperty('main')) { // ExecutionData does not even have the connection set up so can // not have that data, so add it again to be executed later - runExecutionData.executionData!.nodeExecutionStack.push(executionData); + this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); lastExecutionTry = currentExecutionTry; continue executionLoop; } @@ -549,7 +546,7 @@ export class WorkflowExecute { // of both inputs has to be available to be able to process the node. if (executionData.data!.main!.length < connectionIndex || executionData.data!.main![connectionIndex] === null) { // Does not have the data of the connections so add back to stack - runExecutionData.executionData!.nodeExecutionStack.push(executionData); + this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); lastExecutionTry = currentExecutionTry; continue executionLoop; } @@ -591,15 +588,8 @@ export class WorkflowExecute { } } - // Check again if the execution should be stopped else it - // could take forever to stop when each try takes a long time - if (this.activeExecutions.shouldBeStopped(this.executionId!) === true) { - // The execution should be stopped - break; - } - - runExecutionData.resultData.lastNodeExecuted = executionData.node.name; - nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode); + this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name; + nodeSuccessData = await workflow.runNode(executionData.node, executionData.data, this.runExecutionData, runIndex, this.additionalData, NodeExecuteFunctions, this.mode); if (nodeSuccessData === null) { // If null gets returned it means that the node did succeed @@ -620,8 +610,8 @@ export class WorkflowExecute { // Add the data to return to the user // (currently does not get cloned as data does not get changed, maybe later we should do that?!?!) - if (!runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { - runExecutionData.resultData.runData[executionNode.name] = []; + if (!this.runExecutionData.resultData.runData.hasOwnProperty(executionNode.name)) { + this.runExecutionData.resultData.runData[executionNode.name] = []; } taskData = { startTime, @@ -642,12 +632,12 @@ export class WorkflowExecute { } } else { // Node execution did fail so add error and stop execution - runExecutionData.resultData.runData[executionNode.name].push(taskData); + this.runExecutionData.resultData.runData[executionNode.name].push(taskData); // Add the execution data again so that it can get restarted - runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); + this.runExecutionData.executionData!.nodeExecutionStack.unshift(executionData); - this.executeHook('nodeExecuteAfter', [this.executionId, executionNode.name, taskData]); + this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); break; } @@ -658,11 +648,11 @@ export class WorkflowExecute { 'main': nodeSuccessData } as ITaskDataConnections); - this.executeHook('nodeExecuteAfter', [this.executionId, executionNode.name, taskData]); + this.executeHook('nodeExecuteAfter', [executionNode.name, taskData]); - runExecutionData.resultData.runData[executionNode.name].push(taskData); + this.runExecutionData.resultData.runData[executionNode.name].push(taskData); - if (runExecutionData.startData && runExecutionData.startData.destinationNode && runExecutionData.startData.destinationNode === executionNode.name) { + if (this.runExecutionData.startData && this.runExecutionData.startData.destinationNode && this.runExecutionData.startData.destinationNode === executionNode.name) { // If destination node is defined and got executed stop execution continue; } @@ -686,7 +676,7 @@ export class WorkflowExecute { return Promise.reject(new Error(`The node "${executionNode.name}" connects to not found node "${connectionData.node}"`)); } - this.addNodeToBeExecuted(workflow, runExecutionData, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex); + this.addNodeToBeExecuted(workflow, connectionData, parseInt(outputIndex, 10), executionNode.name, nodeSuccessData!, runIndex); } } } @@ -696,45 +686,61 @@ export class WorkflowExecute { return Promise.resolve(); })() .then(async () => { - const fullRunData: IRun = { - data: runExecutionData, - mode: this.mode, - startedAt: new Date(startedAt), - stoppedAt: new Date(), - }; - - if (executionError !== undefined) { - fullRunData.data.resultData.error = executionError; - } else { - fullRunData.finished = true; - } - - this.activeExecutions.remove(this.executionId!, fullRunData); - - await this.executeHook('workflowExecuteAfter', [fullRunData, this.executionId!]); - - return fullRunData; + return this.processSuccessExecution(startedAt, workflow, executionError); }) .catch(async (error) => { - const fullRunData: IRun = { - data: runExecutionData, - mode: this.mode, - startedAt: new Date(startedAt), - stoppedAt: new Date(), - }; + const fullRunData = this.getFullRunData(startedAt); fullRunData.data.resultData.error = { message: error.message, stack: error.stack, }; - this.activeExecutions.remove(this.executionId!, fullRunData); + // Check if static data changed + let newStaticData: IDataObject | undefined; + if (workflow.staticData.__dataChanged === true) { + // Static data of workflow changed + newStaticData = workflow.staticData; + } - await this.executeHook('workflowExecuteAfter', [fullRunData, this.executionId!]); + await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); return fullRunData; - })); + }); - return this.executionId; } + + + async processSuccessExecution(startedAt: Date, workflow: Workflow, executionError?: IExecutionError): Promise { + const fullRunData = this.getFullRunData(startedAt); + + if (executionError !== undefined) { + fullRunData.data.resultData.error = executionError; + } else { + fullRunData.finished = true; + } + + // Check if static data changed + let newStaticData: IDataObject | undefined; + if (workflow.staticData.__dataChanged === true) { + // Static data of workflow changed + newStaticData = workflow.staticData; + } + + await this.executeHook('workflowExecuteAfter', [fullRunData, newStaticData]); + + return fullRunData; + } + + getFullRunData(startedAt: Date): IRun { + const fullRunData: IRun = { + data: this.runExecutionData, + mode: this.mode, + startedAt, + stoppedAt: new Date(), + }; + + return fullRunData; + } + } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a5663f2fcb..55ab02a529 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -14,11 +14,9 @@ export * from './LoadNodeParameterOptions'; export * from './NodeExecuteFunctions'; export * from './WorkflowExecute'; -import * as ActiveExecutions from './ActiveExecutions'; import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import * as UserSettings from './UserSettings'; export { - ActiveExecutions, NodeExecuteFunctions, UserSettings, }; diff --git a/packages/editor-ui/src/components/mixins/pushConnection.ts b/packages/editor-ui/src/components/mixins/pushConnection.ts index 52b095d944..bdf7014cf3 100644 --- a/packages/editor-ui/src/components/mixins/pushConnection.ts +++ b/packages/editor-ui/src/components/mixins/pushConnection.ts @@ -160,11 +160,17 @@ export const pushConnection = mixins( const runDataExecuted = pushData.data; + if (runDataExecuted.finished !== true) { // There was a problem with executing the workflow + let errorMessage = 'There was a problem executing the workflow!'; + if (runDataExecuted.data.resultData.error && runDataExecuted.data.resultData.error.message) { + errorMessage = `There was a problem executing the workflow:
"${runDataExecuted.data.resultData.error.message}"`; + } + this.$showMessage({ title: 'Problem executing workflow', - message: 'There was a problem executing the workflow!', + message: errorMessage, type: 'error', }); } else { diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index fbd807675c..b4323c6d80 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -106,6 +106,31 @@ export interface IDataObject { } +export interface IGetExecuteTriggerFunctions { + (workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): ITriggerFunctions; +} + + +export interface IGetExecuteFunctions { + (workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteFunctions; +} + + +export interface IGetExecuteSingleFunctions { + (workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, itemIndex: number, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteSingleFunctions; +} + + +export interface IGetExecuteHookFunctions { + (workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, isTest?: boolean, webhookData?: IWebhookData): IHookFunctions; +} + + +export interface IGetExecuteWebhookFunctions { + (workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, webhookData: IWebhookData): IWebhookFunctions; +} + + export interface IExecuteData { data: ITaskDataConnections; node: INode; @@ -250,11 +275,11 @@ export interface INodeExecutionData { export interface INodeExecuteFunctions { - getExecuteTriggerFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): ITriggerFunctions; - getExecuteFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteFunctions; - getExecuteSingleFunctions(workflow: Workflow, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, node: INode, itemIndex: number, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): IExecuteSingleFunctions; - getExecuteHookFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, isTest?: boolean, webhookData?: IWebhookData): IHookFunctions; - getExecuteWebhookFunctions(workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, webhookData: IWebhookData): IWebhookFunctions; + getExecuteTriggerFunctions: IGetExecuteTriggerFunctions; + getExecuteFunctions: IGetExecuteFunctions; + getExecuteSingleFunctions: IGetExecuteSingleFunctions; + getExecuteHookFunctions: IGetExecuteHookFunctions; + getExecuteWebhookFunctions: IGetExecuteWebhookFunctions; } @@ -452,17 +477,21 @@ export interface IWebhookResonseData { export type WebhookResponseData = 'allEntries' | 'firstEntryJson' | 'firstEntryBinary'; export type WebhookResponseMode = 'onReceived' | 'lastNode'; -export interface INodeTypesObject { - [key: string]: INodeType; -} - export interface INodeTypes { - init(nodeTypes?: INodeTypesObject): Promise; + nodeTypes: INodeTypeData; + init(nodeTypes?: INodeTypeData): Promise; getAll(): INodeType[]; getByName(nodeType: string): INodeType | undefined; } +export interface INodeTypeData { + [key: string]: { + type: INodeType; + sourcePath: string; + }; +} + export interface IRun { data: IRunExecutionData; finished?: boolean; @@ -537,19 +566,17 @@ export interface IWorkflowCredentials { } export interface IWorkflowExecuteHooks { - afterExecute? (data: IRun, waitingExecutionData: IWaitingForExecution): Promise; + [key: string]: Array<((...args: any[]) => Promise)> | undefined; // tslint:disable-line:no-any + nodeExecuteAfter?: Array<((nodeName: string, data: ITaskData) => Promise)>; + nodeExecuteBefore?: Array<((nodeName: string) => Promise)>; + workflowExecuteAfter?: Array<((data: IRun, newStaticData: IDataObject) => Promise)>; + workflowExecuteBefore?: Array<(() => Promise)>; } export interface IWorkflowExecuteAdditionalData { credentials: IWorkflowCredentials; encryptionKey: string; - hooks?: { - [key: string]: Array<((...args: any[]) => Promise)> | undefined; // tslint:disable-line:no-any - nodeExecuteAfter?: Array<((executionId: string, nodeName: string, data: ITaskData) => Promise)>; - nodeExecuteBefore?: Array<((nodeName: string, executionId: string) => Promise)>; - workflowExecuteAfter?: Array<((data: IRun, executionId: string) => Promise)>; - workflowExecuteBefore?: Array<((executionId: string) => Promise)>; - }; + hooks?: IWorkflowExecuteHooks; httpResponse?: express.Response; httpRequest?: express.Request; timezone: string; diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index bd3c7ad467..7bc674a714 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1,6 +1,7 @@ import { IConnections, + IGetExecuteTriggerFunctions, INode, NodeHelpers, INodes, @@ -954,14 +955,14 @@ export class Workflow { * when the node has data. * * @param {INode} node - * @param {INodeExecuteFunctions} nodeExecuteFunctions + * @param {IGetExecuteTriggerFunctions} getTriggerFunctions * @param {IWorkflowExecuteAdditionalData} additionalData * @param {WorkflowExecuteMode} mode * @returns {(Promise)} * @memberof Workflow */ - async runTrigger(node: INode, nodeExecuteFunctions: INodeExecuteFunctions, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): Promise { - const thisArgs = nodeExecuteFunctions.getExecuteTriggerFunctions(this, node, additionalData, mode); + async runTrigger(node: INode, getTriggerFunctions: IGetExecuteTriggerFunctions, additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode): Promise { + const triggerFunctions = getTriggerFunctions(this, node, additionalData, mode); const nodeType = this.nodeTypes.getByName(node.type); @@ -976,11 +977,11 @@ export class Workflow { if (mode === 'manual') { // In manual mode we do not just start the trigger function we also // want to be able to get informed as soon as the first data got emitted - const triggerReponse = await nodeType.trigger!.call(thisArgs); + const triggerReponse = await nodeType.trigger!.call(triggerFunctions); // Add the manual trigger response which resolves when the first time data got emitted triggerReponse!.manualTriggerResponse = new Promise((resolve) => { - thisArgs.emit = ((resolve) => (data: INodeExecutionData[][]) => { + triggerFunctions.emit = ((resolve) => (data: INodeExecutionData[][]) => { resolve(data); })(resolve); }); @@ -988,7 +989,7 @@ export class Workflow { return triggerReponse; } else { // In all other modes simply start the trigger - return nodeType.trigger!.call(thisArgs); + return nodeType.trigger!.call(triggerFunctions); } } @@ -1089,7 +1090,7 @@ export class Workflow { } else if (nodeType.trigger) { if (mode === 'manual') { // In manual mode start the trigger - const triggerResponse = await this.runTrigger(node, nodeExecuteFunctions, additionalData, mode); + const triggerResponse = await this.runTrigger(node, nodeExecuteFunctions.getExecuteTriggerFunctions, additionalData, mode); if (triggerResponse === undefined) { return null;