diff --git a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts index 2485b895b0..bd6cf81a2f 100644 --- a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts +++ b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts @@ -125,6 +125,32 @@ export class DirectedGraph { return directChildren; } + private getChildrenRecursive(node: INode, children: Set) { + const directChildren = this.getDirectChildren(node); + + for (const directChild of directChildren) { + // Break out if we found a cycle. + if (children.has(directChild.to)) { + continue; + } + children.add(directChild.to); + this.getChildrenRecursive(directChild.to, children); + } + + return children; + } + + /** + * Returns all nodes that are children of the node that is passed as an + * argument. + * + * If the node being passed in is a child of itself (e.g. is part of a + * cylce), the return set will contain it as well. + */ + getChildren(node: INode) { + return this.getChildrenRecursive(node, new Set()); + } + getDirectParents(node: INode) { const nodeExists = this.nodes.get(node.name) === node; a.ok(nodeExists); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts index 4049878eb2..93df23de32 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts @@ -38,4 +38,52 @@ describe('DirectedGraph', () => { graph, ); }); + + describe('getChildren', () => { + // ┌─────┐ ┌─────┐ ┌─────┐ + // │node1├───►│node2├──►│node3│ + // └─────┘ └─────┘ └─────┘ + test('returns all children', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const graph = new DirectedGraph() + .addNodes(node1, node2, node3) + .addConnections({ from: node1, to: node2 }, { from: node2, to: node3 }); + + // ACT + const children = graph.getChildren(node1); + + // ASSERT + expect(children.size).toBe(2); + expect(children).toEqual(new Set([node2, node3])); + }); + + // ┌─────┐ ┌─────┐ ┌─────┐ + // ┌─►│node1├───►│node2├──►│node3├─┐ + // │ └─────┘ └─────┘ └─────┘ │ + // │ │ + // └───────────────────────────────┘ + test('terminates when finding a cycle', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const graph = new DirectedGraph() + .addNodes(node1, node2, node3) + .addConnections( + { from: node1, to: node2 }, + { from: node2, to: node3 }, + { from: node3, to: node1 }, + ); + + // ACT + const children = graph.getChildren(node1); + + // ASSERT + expect(children.size).toBe(3); + expect(children).toEqual(new Set([node1, node2, node3])); + }); + }); }); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts new file mode 100644 index 0000000000..fabfae0ee3 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts @@ -0,0 +1,86 @@ +import type { IRunData } from 'n8n-workflow'; +import { cleanRunData } from '../cleanRunData'; +import { DirectedGraph } from '../DirectedGraph'; +import { createNodeData, toITaskData } from './helpers'; + +describe('cleanRunData', () => { + // ┌─────┐ ┌─────┐ ┌─────┐ + // │node1├───►│node2├──►│node3│ + // └─────┘ └─────┘ └─────┘ + test('deletes all run data of all children and the node being passed in', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const graph = new DirectedGraph() + .addNodes(node1, node2, node3) + .addConnections({ from: node1, to: node2 }, { from: node2, to: node3 }); + const runData: IRunData = { + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 2 } }])], + [node3.name]: [toITaskData([{ data: { value: 3 } }])], + }; + + // ACT + const newRunData = cleanRunData(runData, graph, [node1]); + + // ASSERT + expect(newRunData).toEqual({}); + }); + + // ┌─────┐ ┌─────┐ ┌─────┐ + // │node1├───►│node2├──►│node3│ + // └─────┘ └─────┘ └─────┘ + test('retains the run data of parent nodes of the node being passed in', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const graph = new DirectedGraph() + .addNodes(node1, node2, node3) + .addConnections({ from: node1, to: node2 }, { from: node2, to: node3 }); + const runData: IRunData = { + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 2 } }])], + [node3.name]: [toITaskData([{ data: { value: 3 } }])], + }; + + // ACT + const newRunData = cleanRunData(runData, graph, [node2]); + + // ASSERT + expect(newRunData).toEqual({ [node1.name]: runData[node1.name] }); + }); + + // ┌─────┐ ┌─────┐ ┌─────┐ + // ┌─►│node1├───►│node2├──►│node3├─┐ + // │ └─────┘ └─────┘ └─────┘ │ + // │ │ + // └───────────────────────────────┘ + test('terminates when finding a cycle', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const graph = new DirectedGraph() + .addNodes(node1, node2, node3) + .addConnections( + { from: node1, to: node2 }, + { from: node2, to: node3 }, + { from: node3, to: node1 }, + ); + + const runData: IRunData = { + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 2 } }])], + [node3.name]: [toITaskData([{ data: { value: 3 } }])], + }; + + // ACT + const newRunData = cleanRunData(runData, graph, [node2]); + + // ASSERT + // TODO: Find out if this is a desirable result in milestone 2 + expect(newRunData).toEqual({}); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/cleanRunData.ts b/packages/core/src/PartialExecutionUtils/cleanRunData.ts new file mode 100644 index 0000000000..945dca1451 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/cleanRunData.ts @@ -0,0 +1,26 @@ +import type { INode, IRunData } from 'n8n-workflow'; +import type { DirectedGraph } from './DirectedGraph'; + +/** + * Returns new run data that does not contain data for any node that is a child + * of any start node. + * This does not mutate the `runData` being passed in. + */ +export function cleanRunData( + runData: IRunData, + graph: DirectedGraph, + startNodes: INode[], +): IRunData { + const newRunData: IRunData = { ...runData }; + + for (const startNode of startNodes) { + delete newRunData[startNode.name]; + const children = graph.getChildren(startNode); + + for (const child of children) { + delete newRunData[child.name]; + } + } + + return newRunData; +} diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index e0898225e9..a10d8c530c 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -58,6 +58,7 @@ import { findSubgraph, findTriggerForPartialExecution, } from './PartialExecutionUtils'; +import { cleanRunData } from './PartialExecutionUtils/cleanRunData'; export class WorkflowExecute { private status: ExecutionStatus = 'new'; @@ -347,7 +348,8 @@ export class WorkflowExecute { } // 2. Find the Subgraph - const subgraph = findSubgraph(DirectedGraph.fromWorkflow(workflow), destinationNode, trigger); + const graph = DirectedGraph.fromWorkflow(workflow); + const subgraph = findSubgraph(graph, destinationNode, trigger); const filteredNodes = subgraph.getNodes(); // 3. Find the Start Nodes @@ -362,7 +364,7 @@ export class WorkflowExecute { } // 6. Clean Run Data - // TODO: + const newRunData: IRunData = cleanRunData(runData, graph, startNodes); // 7. Recreate Execution Stack const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = @@ -376,7 +378,7 @@ export class WorkflowExecute { runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), }, resultData: { - runData, + runData: newRunData, pinData, }, executionData: {