diff --git a/packages/cli/src/__tests__/workflow-helpers.test.ts b/packages/cli/src/__tests__/manual-execution.service.test.ts similarity index 69% rename from packages/cli/src/__tests__/workflow-helpers.test.ts rename to packages/cli/src/__tests__/manual-execution.service.test.ts index e24cfa1f68..383a8dc87c 100644 --- a/packages/cli/src/__tests__/workflow-helpers.test.ts +++ b/packages/cli/src/__tests__/manual-execution.service.test.ts @@ -1,8 +1,11 @@ +import { mock } from 'jest-mock-extended'; import type { Workflow, IWorkflowExecutionDataProcess } from 'n8n-workflow'; -import { getExecutionStartNode } from '@/workflow-helpers'; +import { ManualExecutionService } from '@/manual-execution.service'; + +describe('ManualExecutionService', () => { + const manualExecutionService = new ManualExecutionService(mock()); -describe('WorkflowHelpers', () => { describe('getExecutionStartNode', () => { it('Should return undefined', () => { const data = { @@ -16,9 +19,10 @@ describe('WorkflowHelpers', () => { }; }, } as unknown as Workflow; - const executionStartNode = getExecutionStartNode(data, workflow); + const executionStartNode = manualExecutionService.getExecutionStartNode(data, workflow); expect(executionStartNode).toBeUndefined(); }); + it('Should return startNode', () => { const data = { pinData: { @@ -37,7 +41,7 @@ describe('WorkflowHelpers', () => { return undefined; }, } as unknown as Workflow; - const executionStartNode = getExecutionStartNode(data, workflow); + const executionStartNode = manualExecutionService.getExecutionStartNode(data, workflow); expect(executionStartNode).toEqual({ name: 'node2', }); diff --git a/packages/cli/src/manual-execution.service.ts b/packages/cli/src/manual-execution.service.ts new file mode 100644 index 0000000000..65174d20b5 --- /dev/null +++ b/packages/cli/src/manual-execution.service.ts @@ -0,0 +1,124 @@ +import * as a from 'assert/strict'; +import { + DirectedGraph, + filterDisabledNodes, + recreateNodeExecutionStack, + WorkflowExecute, +} from 'n8n-core'; +import type { + IPinData, + IRun, + IRunExecutionData, + IWorkflowExecuteAdditionalData, + IWorkflowExecutionDataProcess, + Workflow, +} from 'n8n-workflow'; +import type PCancelable from 'p-cancelable'; +import { Service } from 'typedi'; + +import { Logger } from '@/logging/logger.service'; + +@Service() +export class ManualExecutionService { + constructor(private readonly logger: Logger) {} + + getExecutionStartNode(data: IWorkflowExecutionDataProcess, workflow: Workflow) { + let startNode; + if ( + data.startNodes?.length === 1 && + Object.keys(data.pinData ?? {}).includes(data.startNodes[0].name) + ) { + startNode = workflow.getNode(data.startNodes[0].name) ?? undefined; + } + + return startNode; + } + + // eslint-disable-next-line @typescript-eslint/promise-function-async + runManually( + data: IWorkflowExecutionDataProcess, + workflow: Workflow, + additionalData: IWorkflowExecuteAdditionalData, + executionId: string, + pinData?: IPinData, + ): PCancelable { + if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) { + this.logger.debug( + `Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`, + { executionId }, + ); + const startNodes = data.startNodes.map((startNode) => { + const node = workflow.getNode(startNode.name); + a.ok(node, `Could not find a node named "${startNode.name}" in the workflow.`); + return node; + }); + const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] }; + + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack( + filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)), + new Set(startNodes), + runData, + data.pinData ?? {}, + ); + const executionData: IRunExecutionData = { + resultData: { runData, pinData }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack, + waitingExecution, + waitingExecutionSource, + }, + }; + + const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData); + return workflowExecute.processRunExecutionData(workflow); + } else if ( + data.runData === undefined || + data.startNodes === undefined || + data.startNodes.length === 0 + ) { + // Full Execution + // TODO: When the old partial execution logic is removed this block can + // be removed and the previous one can be merged into + // `workflowExecute.runPartialWorkflow2`. + // Partial executions then require either a destination node from which + // everything else can be derived, or a triggerToStartFrom with + // triggerData. + this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, { + executionId, + }); + // Execute all nodes + + const startNode = this.getExecutionStartNode(data, workflow); + + // Can execute without webhook so go on + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData); + } else { + // Partial Execution + this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); + // Execute only the nodes between start and destination nodes + const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); + + if (data.partialExecutionVersion === '1') { + return workflowExecute.runPartialWorkflow2( + workflow, + data.runData, + data.pinData, + data.dirtyNodeNames, + data.destinationNode, + ); + } else { + return workflowExecute.runPartialWorkflow( + workflow, + data.runData, + data.startNodes, + data.destinationNode, + data.pinData, + ); + } + } + } +} diff --git a/packages/cli/src/workflow-helpers.ts b/packages/cli/src/workflow-helpers.ts index 7cbce6b8a2..addae4e290 100644 --- a/packages/cli/src/workflow-helpers.ts +++ b/packages/cli/src/workflow-helpers.ts @@ -7,9 +7,7 @@ import type { NodeApiError, WorkflowExecuteMode, WorkflowOperationError, - Workflow, NodeOperationError, - IWorkflowExecutionDataProcess, } from 'n8n-workflow'; import { Container } from 'typedi'; import { v4 as uuid } from 'uuid'; @@ -223,18 +221,6 @@ export async function replaceInvalidCredentials(workflow: WorkflowEntity): Promi return workflow; } -export function getExecutionStartNode(data: IWorkflowExecutionDataProcess, workflow: Workflow) { - let startNode; - if ( - data.startNodes?.length === 1 && - Object.keys(data.pinData ?? {}).includes(data.startNodes[0].name) - ) { - startNode = workflow.getNode(data.startNodes[0].name) ?? undefined; - } - - return startNode; -} - export async function getVariables(): Promise { const variables = await Container.get(VariablesService).getAllCached(); return Object.freeze( diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 7dc65e341c..973d512e62 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -2,15 +2,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import * as a from 'assert/strict'; -import { - DirectedGraph, - ErrorReporter, - InstanceSettings, - WorkflowExecute, - filterDisabledNodes, - recreateNodeExecutionStack, -} from 'n8n-core'; +import { ErrorReporter, InstanceSettings, WorkflowExecute } from 'n8n-core'; import type { ExecutionError, IDeferredPromise, @@ -20,8 +12,6 @@ import type { WorkflowExecuteMode, WorkflowHooks, IWorkflowExecutionDataProcess, - IRunExecutionData, - IWorkflowExecuteAdditionalData, } from 'n8n-workflow'; import { ExecutionCancelledError, Workflow } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; @@ -37,12 +27,12 @@ import type { ScalingService } from '@/scaling/scaling.service'; import type { Job, JobData } from '@/scaling/scaling.types'; import { PermissionChecker } from '@/user-management/permission-checker'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; -import * as WorkflowHelpers from '@/workflow-helpers'; import { generateFailedExecutionFromError } from '@/workflow-helpers'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { ExecutionNotFoundError } from './errors/execution-not-found-error'; import { EventService } from './events/event.service'; +import { ManualExecutionService } from './manual-execution.service'; @Service() export class WorkflowRunner { @@ -61,6 +51,7 @@ export class WorkflowRunner { private readonly permissionChecker: PermissionChecker, private readonly eventService: EventService, private readonly instanceSettings: InstanceSettings, + private readonly manualExecutionService: ManualExecutionService, ) {} /** The process did error */ @@ -295,7 +286,13 @@ export class WorkflowRunner { ); workflowExecution = workflowExecute.processRunExecutionData(workflow); } else { - workflowExecution = this.runManually(data, workflow, additionalData, executionId, pinData); + workflowExecution = this.manualExecutionService.runManually( + data, + workflow, + additionalData, + executionId, + pinData, + ); } this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); @@ -458,92 +455,4 @@ export class WorkflowRunner { this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); } - - // eslint-disable-next-line @typescript-eslint/promise-function-async - runManually( - data: IWorkflowExecutionDataProcess, - workflow: Workflow, - additionalData: IWorkflowExecuteAdditionalData, - executionId: string, - pinData?: IPinData, - ) { - if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) { - this.logger.debug( - `Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`, - { executionId }, - ); - const startNodes = data.startNodes.map((data) => { - const node = workflow.getNode(data.name); - a.ok(node, `Could not find a node named "${data.name}" in the workflow.`); - return node; - }); - const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] }; - - const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack( - filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)), - new Set(startNodes), - runData, - data.pinData ?? {}, - ); - const executionData: IRunExecutionData = { - resultData: { runData, pinData }, - executionData: { - contextData: {}, - metadata: {}, - nodeExecutionStack, - waitingExecution, - waitingExecutionSource, - }, - }; - - const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData); - return workflowExecute.processRunExecutionData(workflow); - } else if ( - data.runData === undefined || - data.startNodes === undefined || - data.startNodes.length === 0 - ) { - // Full Execution - // TODO: When the old partial execution logic is removed this block can - // be removed and the previous one can be merged into - // `workflowExecute.runPartialWorkflow2`. - // Partial executions then require either a destination node from which - // everything else can be derived, or a triggerToStartFrom with - // triggerData. - this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, { - executionId, - }); - // Execute all nodes - - const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow); - - // Can execute without webhook so go on - const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); - return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData); - } else { - // Partial Execution - this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); - // Execute only the nodes between start and destination nodes - const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); - - if (data.partialExecutionVersion === '1') { - return workflowExecute.runPartialWorkflow2( - workflow, - data.runData, - data.pinData, - data.dirtyNodeNames, - data.destinationNode, - ); - } else { - return workflowExecute.runPartialWorkflow( - workflow, - data.runData, - data.startNodes, - data.destinationNode, - data.pinData, - ); - } - } - } }