From b37e5142b24f955477e33998a43ba070e4d0765b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 10 Dec 2024 16:38:24 +0100 Subject: [PATCH] refactor(core): Encapsulate manual execution flow in `WorkflowRunner` (#12135) --- packages/cli/src/workflow-runner.ts | 171 +++++++++++++++------------- 1 file changed, 90 insertions(+), 81 deletions(-) diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 19ae201a8d..43beee1d05 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -20,6 +20,7 @@ import type { WorkflowHooks, IWorkflowExecutionDataProcess, IRunExecutionData, + IWorkflowExecuteAdditionalData, } from 'n8n-workflow'; import { ErrorReporterProxy as ErrorReporter, @@ -295,88 +296,8 @@ export class WorkflowRunner { data.executionData, ); workflowExecution = workflowExecute.processRunExecutionData(workflow); - } else if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) { - this.logger.debug( - `Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`, - { executionId }, - ); - const startNodes = data.startNodes.map((data) => { - const node = workflow.getNode(data.name); - a.ok(node, `Could not find a node named "${data.name}" in the workflow.`); - return node; - }); - const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] }; - - const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack( - filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)), - new Set(startNodes), - runData, - data.pinData ?? {}, - ); - const executionData: IRunExecutionData = { - resultData: { runData, pinData }, - executionData: { - contextData: {}, - metadata: {}, - nodeExecutionStack, - waitingExecution, - waitingExecutionSource, - }, - }; - - const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData); - workflowExecution = workflowExecute.processRunExecutionData(workflow); - } else if ( - data.runData === undefined || - data.startNodes === undefined || - data.startNodes.length === 0 - ) { - // Full Execution - // TODO: When the old partial execution logic is removed this block can - // be removed and the previous one can be merged into - // `workflowExecute.runPartialWorkflow2`. - // Partial executions then require either a destination node from which - // everything else can be derived, or a triggerToStartFrom with - // triggerData. - this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, { - executionId, - }); - // Execute all nodes - - const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow); - - // Can execute without webhook so go on - const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); - workflowExecution = workflowExecute.run( - workflow, - startNode, - data.destinationNode, - data.pinData, - ); } else { - // Partial Execution - this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); - // Execute only the nodes between start and destination nodes - const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); - - if (data.partialExecutionVersion === '1') { - workflowExecution = workflowExecute.runPartialWorkflow2( - workflow, - data.runData, - data.pinData, - data.dirtyNodeNames, - data.destinationNode, - ); - } else { - workflowExecution = workflowExecute.runPartialWorkflow( - workflow, - data.runData, - data.startNodes, - data.destinationNode, - data.pinData, - ); - } + workflowExecution = this.runManually(data, workflow, additionalData, executionId, pinData); } this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); @@ -539,4 +460,92 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); } + + // eslint-disable-next-line @typescript-eslint/promise-function-async + runManually( + data: IWorkflowExecutionDataProcess, + workflow: Workflow, + additionalData: IWorkflowExecuteAdditionalData, + executionId: string, + pinData?: IPinData, + ) { + if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) { + this.logger.debug( + `Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`, + { executionId }, + ); + const startNodes = data.startNodes.map((data) => { + const node = workflow.getNode(data.name); + a.ok(node, `Could not find a node named "${data.name}" in the workflow.`); + return node; + }); + const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] }; + + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack( + filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)), + new Set(startNodes), + runData, + data.pinData ?? {}, + ); + const executionData: IRunExecutionData = { + resultData: { runData, pinData }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack, + waitingExecution, + waitingExecutionSource, + }, + }; + + const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData); + return workflowExecute.processRunExecutionData(workflow); + } else if ( + data.runData === undefined || + data.startNodes === undefined || + data.startNodes.length === 0 + ) { + // Full Execution + // TODO: When the old partial execution logic is removed this block can + // be removed and the previous one can be merged into + // `workflowExecute.runPartialWorkflow2`. + // Partial executions then require either a destination node from which + // everything else can be derived, or a triggerToStartFrom with + // triggerData. + this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, { + executionId, + }); + // Execute all nodes + + const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow); + + // Can execute without webhook so go on + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData); + } else { + // Partial Execution + this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); + // Execute only the nodes between start and destination nodes + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + + if (data.partialExecutionVersion === '1') { + return workflowExecute.runPartialWorkflow2( + workflow, + data.runData, + data.pinData, + data.dirtyNodeNames, + data.destinationNode, + ); + } else { + return workflowExecute.runPartialWorkflow( + workflow, + data.runData, + data.startNodes, + data.destinationNode, + data.pinData, + ); + } + } + } }