diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts index 57b9cfb62d..75a1cc55d0 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts @@ -11,6 +11,7 @@ process.env.N8N_RUNNERS_ENABLED = 'false'; // 1 means the output has run data // ►► denotes the node that the user wants to execute to // XX denotes that the node is disabled +// DR denotes that the node is dirty // PD denotes that the node has pinned data import { mock } from 'jest-mock-extended'; @@ -295,6 +296,72 @@ describe('WorkflowExecute', () => { expect(fullRunData.data.resultData.runData).not.toHaveProperty(node1.name); }); + // + // ┌───────┐1 ┌────┐1 + // │trigger├───►set1├─┐ ┌─────┐ ►► + // └───────┘ └────┘ └─► │1 ┌───────────┐ + // DR │merge├───►destination│ + // ┌────┐1┌─► │ └───────────┘ + // │set2├─┘ └─────┘ + // └────┘ + test('deletes run data of children of dirty nodes as well', async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const nodeExecutionOrder: string[] = []; + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder); + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn()); + + const recreateNodeExecutionStackSpy = jest.spyOn( + partialExecutionUtils, + 'recreateNodeExecutionStack', + ); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const set1 = createNodeData({ name: 'set1' }); + const set2 = createNodeData({ name: 'set2' }); + const merge = createNodeData({ name: 'merge' }); + const destination = createNodeData({ name: 'destination' }); + const workflow = new DirectedGraph() + .addNodes(trigger, set1, set2, merge, destination) + .addConnections( + { from: trigger, to: set1 }, + { from: trigger, to: set2 }, + { from: set1, to: merge, inputIndex: 0 }, + { from: set2, to: merge, inputIndex: 1 }, + { from: merge, to: destination }, + ) + .toWorkflow({ name: '', active: false, nodeTypes }); + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])], + [set1.name]: [toITaskData([{ data: { node: 'set1' } }])], + [set2.name]: [toITaskData([{ data: { node: 'set2' } }])], + [merge.name]: [toITaskData([{ data: { node: 'merge' } }])], + [destination.name]: [toITaskData([{ data: { node: 'destination' } }])], + }; + const dirtyNodeNames = [set2.name]; + const destinationNode = destination.name; + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + destinationNode, + ); + + // ASSERT + expect(recreateNodeExecutionStackSpy).toHaveBeenCalledTimes(1); + expect(recreateNodeExecutionStackSpy).toHaveBeenCalledWith( + expect.any(DirectedGraph), + new Set([merge, set2]), + pick(runData, [trigger.name, set1.name]), + pinData, + ); + }); + // XX ►► // ┌───────┐1 ┌─────┐1 ┌─────┐ // │trigger├──────►node1├──────►node2│ @@ -452,10 +519,23 @@ describe('WorkflowExecute', () => { ); // ASSERT + const subgraph = new DirectedGraph() + .addNodes(trigger, node1) + .addConnections({ from: trigger, to: node1 }); + expect(cleanRunDataSpy).toHaveBeenCalledTimes(2); expect(cleanRunDataSpy).toHaveBeenNthCalledWith( 1, runData, - new DirectedGraph().addNodes(trigger, node1).addConnections({ from: trigger, to: node1 }), + subgraph, + // first call with the dirty nodes, which are an empty set in this case + new Set(), + ); + expect(cleanRunDataSpy).toHaveBeenNthCalledWith( + 2, + pick(runData, [trigger.name, node1.name]), + subgraph, + // second call with start nodes, which is the destination node in this + // case new Set([node1]), ); }); diff --git a/packages/core/src/execution-engine/partial-execution-utils/clean-run-data.ts b/packages/core/src/execution-engine/partial-execution-utils/clean-run-data.ts index 46f6eea52d..c7d0551e94 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/clean-run-data.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/clean-run-data.ts @@ -4,28 +4,30 @@ import type { DirectedGraph } from './directed-graph'; /** * Returns new run data that does not contain data for any node that is a child - * of any start node. + * of any of the passed nodes. This is useful for cleaning run data after start + * nodes or dirty nodes. + * * This does not mutate the `runData` being passed in. */ export function cleanRunData( runData: IRunData, graph: DirectedGraph, - startNodes: Set, + nodesToClean: Set, ): IRunData { const newRunData: IRunData = { ...runData }; - for (const startNode of startNodes) { - delete newRunData[startNode.name]; + for (const nodeToClean of nodesToClean) { + delete newRunData[nodeToClean.name]; - const children = graph.getChildren(startNode); - for (const node of [startNode, ...children]) { + const children = graph.getChildren(nodeToClean); + for (const node of [nodeToClean, ...children]) { delete newRunData[node.name]; // Delete runData for subNodes const subNodeConnections = graph.getParentConnections(node); for (const subNodeConnection of subNodeConnections) { // Sub nodes never use the Main connection type, so this filters out - // the connection that goes upstream of the startNode. + // the connection that goes upstream of the node to clean. if (subNodeConnection.type === NodeConnectionType.Main) { continue; } diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index 25f2d049c4..cdb23c79fe 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -5,7 +5,6 @@ import { Container } from '@n8n/di'; import * as assert from 'assert/strict'; import { setMaxListeners } from 'events'; -import { omit } from 'lodash'; import get from 'lodash/get'; import type { ExecutionBaseError, @@ -51,6 +50,7 @@ import { ExecutionCancelledError, Node, UnexpectedError, + UserError, } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; @@ -396,7 +396,7 @@ export class WorkflowExecute { // 1. Find the Trigger const trigger = findTriggerForPartialExecution(workflow, destinationNodeName); if (trigger === undefined) { - throw new ApplicationError('Connect a trigger to run this node'); + throw new UserError('Connect a trigger to run this node'); } // 2. Find the Subgraph @@ -404,7 +404,8 @@ export class WorkflowExecute { const filteredNodes = graph.getNodes(); // 3. Find the Start Nodes - runData = omit(runData, dirtyNodeNames); + const dirtyNodes = new Set(workflow.getNodes(dirtyNodeNames)); + runData = cleanRunData(runData, graph, dirtyNodes); let startNodes = findStartNodes({ graph, trigger, destination, runData, pinData }); // 4. Detect Cycles diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index ff7403c1ae..082f1831e9 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -289,6 +289,27 @@ export class Workflow { return null; } + /** + * Returns the nodes with the given names if they exist. + * If a node cannot be found it will be ignored, meaning the returned array + * of nodes can be smaller than the array of names. + */ + getNodes(nodeNames: string[]): INode[] { + const nodes: INode[] = []; + for (const name of nodeNames) { + const node = this.getNode(name); + if (!node) { + console.warn( + `Could not find a node with the name ${name} in the workflow. This was passed in as a dirty node name.`, + ); + continue; + } + nodes.push(node); + } + + return nodes; + } + /** * Returns the pinData of the node with the given name if it exists * diff --git a/packages/workflow/test/Workflow.test.ts b/packages/workflow/test/Workflow.test.ts index 1a4c0639cb..31889b8770 100644 --- a/packages/workflow/test/Workflow.test.ts +++ b/packages/workflow/test/Workflow.test.ts @@ -344,6 +344,10 @@ describe('Workflow', () => { active: false, }); + beforeEach(() => { + jest.restoreAllMocks(); + }); + describe('renameNodeInParameterValue', () => { describe('for expressions', () => { const tests = [ @@ -2339,4 +2343,65 @@ describe('Workflow', () => { expect(workflow.getStartNode()).toBeUndefined(); }); }); + + describe('getNode', () => { + test('should return the node with the given name if it exists', () => { + const workflow = SIMPLE_WORKFLOW; + const node = workflow.getNode('Start'); + expect(node).not.toBeNull(); + expect(node?.name).toBe('Start'); + expect(node?.type).toBe('test.set'); + expect(node?.id).toBe('uuid-1'); + }); + + test('should return null if the node does not exist', () => { + const nonExistentNode = SIMPLE_WORKFLOW.getNode('NonExistentNode'); + expect(nonExistentNode).toBeNull(); + }); + }); + + describe('getNodes', () => { + test('should return all requested nodes that exist', () => { + const nodes = SIMPLE_WORKFLOW.getNodes(['Start', 'Set', 'Set1']); + expect(nodes).toHaveLength(3); + expect(nodes[0].name).toBe('Start'); + expect(nodes[1].name).toBe('Set'); + expect(nodes[2].name).toBe('Set1'); + }); + + test('should return nodes in the order they were requested', () => { + const nodes = SIMPLE_WORKFLOW.getNodes(['Set1', 'Start', 'Set']); + expect(nodes).toHaveLength(3); + expect(nodes[0].name).toBe('Set1'); + expect(nodes[1].name).toBe('Start'); + expect(nodes[2].name).toBe('Set'); + }); + + test('should skip nodes that do not exist and log a warning', () => { + // Spy on console.warn + const consoleWarnSpy = jest.spyOn(console, 'warn').mockImplementation(); + + const nodes = SIMPLE_WORKFLOW.getNodes(['Start', 'NonExistentNode', 'Set1']); + expect(nodes).toHaveLength(2); + expect(nodes[0].name).toBe('Start'); + expect(nodes[1].name).toBe('Set1'); + expect(consoleWarnSpy).toHaveBeenCalledWith( + expect.stringContaining('Could not find a node with the name NonExistentNode'), + ); + }); + + test('should return an empty array if none of the requested nodes exist', () => { + // Spy on console.warn + const consoleWarnSpy = jest.spyOn(console, 'warn').mockImplementation(); + + const nodes = SIMPLE_WORKFLOW.getNodes(['NonExistentNode1', 'NonExistentNode2']); + expect(nodes).toHaveLength(0); + expect(consoleWarnSpy).toHaveBeenCalledTimes(2); + }); + + test('should handle an empty array of node names', () => { + const nodes = SIMPLE_WORKFLOW.getNodes([]); + expect(nodes).toHaveLength(0); + }); + }); });