mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 10:02:05 +00:00
fix(core): Clean run data for dirty nodes properly, including their children (#13821)
This commit is contained in:
@@ -11,6 +11,7 @@ process.env.N8N_RUNNERS_ENABLED = 'false';
|
|||||||
// 1 means the output has run data
|
// 1 means the output has run data
|
||||||
// ►► denotes the node that the user wants to execute to
|
// ►► denotes the node that the user wants to execute to
|
||||||
// XX denotes that the node is disabled
|
// XX denotes that the node is disabled
|
||||||
|
// DR denotes that the node is dirty
|
||||||
// PD denotes that the node has pinned data
|
// PD denotes that the node has pinned data
|
||||||
|
|
||||||
import { mock } from 'jest-mock-extended';
|
import { mock } from 'jest-mock-extended';
|
||||||
@@ -295,6 +296,72 @@ describe('WorkflowExecute', () => {
|
|||||||
expect(fullRunData.data.resultData.runData).not.toHaveProperty(node1.name);
|
expect(fullRunData.data.resultData.runData).not.toHaveProperty(node1.name);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
//
|
||||||
|
// ┌───────┐1 ┌────┐1
|
||||||
|
// │trigger├───►set1├─┐ ┌─────┐ ►►
|
||||||
|
// └───────┘ └────┘ └─► │1 ┌───────────┐
|
||||||
|
// DR │merge├───►destination│
|
||||||
|
// ┌────┐1┌─► │ └───────────┘
|
||||||
|
// │set2├─┘ └─────┘
|
||||||
|
// └────┘
|
||||||
|
test('deletes run data of children of dirty nodes as well', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const waitPromise = createDeferredPromise<IRun>();
|
||||||
|
const nodeExecutionOrder: string[] = [];
|
||||||
|
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
|
||||||
|
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
|
||||||
|
jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn());
|
||||||
|
|
||||||
|
const recreateNodeExecutionStackSpy = jest.spyOn(
|
||||||
|
partialExecutionUtils,
|
||||||
|
'recreateNodeExecutionStack',
|
||||||
|
);
|
||||||
|
|
||||||
|
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
|
||||||
|
const set1 = createNodeData({ name: 'set1' });
|
||||||
|
const set2 = createNodeData({ name: 'set2' });
|
||||||
|
const merge = createNodeData({ name: 'merge' });
|
||||||
|
const destination = createNodeData({ name: 'destination' });
|
||||||
|
const workflow = new DirectedGraph()
|
||||||
|
.addNodes(trigger, set1, set2, merge, destination)
|
||||||
|
.addConnections(
|
||||||
|
{ from: trigger, to: set1 },
|
||||||
|
{ from: trigger, to: set2 },
|
||||||
|
{ from: set1, to: merge, inputIndex: 0 },
|
||||||
|
{ from: set2, to: merge, inputIndex: 1 },
|
||||||
|
{ from: merge, to: destination },
|
||||||
|
)
|
||||||
|
.toWorkflow({ name: '', active: false, nodeTypes });
|
||||||
|
const pinData: IPinData = {};
|
||||||
|
const runData: IRunData = {
|
||||||
|
[trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])],
|
||||||
|
[set1.name]: [toITaskData([{ data: { node: 'set1' } }])],
|
||||||
|
[set2.name]: [toITaskData([{ data: { node: 'set2' } }])],
|
||||||
|
[merge.name]: [toITaskData([{ data: { node: 'merge' } }])],
|
||||||
|
[destination.name]: [toITaskData([{ data: { node: 'destination' } }])],
|
||||||
|
};
|
||||||
|
const dirtyNodeNames = [set2.name];
|
||||||
|
const destinationNode = destination.name;
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await workflowExecute.runPartialWorkflow2(
|
||||||
|
workflow,
|
||||||
|
runData,
|
||||||
|
pinData,
|
||||||
|
dirtyNodeNames,
|
||||||
|
destinationNode,
|
||||||
|
);
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(recreateNodeExecutionStackSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(recreateNodeExecutionStackSpy).toHaveBeenCalledWith(
|
||||||
|
expect.any(DirectedGraph),
|
||||||
|
new Set([merge, set2]),
|
||||||
|
pick(runData, [trigger.name, set1.name]),
|
||||||
|
pinData,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
// XX ►►
|
// XX ►►
|
||||||
// ┌───────┐1 ┌─────┐1 ┌─────┐
|
// ┌───────┐1 ┌─────┐1 ┌─────┐
|
||||||
// │trigger├──────►node1├──────►node2│
|
// │trigger├──────►node1├──────►node2│
|
||||||
@@ -452,10 +519,23 @@ describe('WorkflowExecute', () => {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// ASSERT
|
// ASSERT
|
||||||
|
const subgraph = new DirectedGraph()
|
||||||
|
.addNodes(trigger, node1)
|
||||||
|
.addConnections({ from: trigger, to: node1 });
|
||||||
|
expect(cleanRunDataSpy).toHaveBeenCalledTimes(2);
|
||||||
expect(cleanRunDataSpy).toHaveBeenNthCalledWith(
|
expect(cleanRunDataSpy).toHaveBeenNthCalledWith(
|
||||||
1,
|
1,
|
||||||
runData,
|
runData,
|
||||||
new DirectedGraph().addNodes(trigger, node1).addConnections({ from: trigger, to: node1 }),
|
subgraph,
|
||||||
|
// first call with the dirty nodes, which are an empty set in this case
|
||||||
|
new Set(),
|
||||||
|
);
|
||||||
|
expect(cleanRunDataSpy).toHaveBeenNthCalledWith(
|
||||||
|
2,
|
||||||
|
pick(runData, [trigger.name, node1.name]),
|
||||||
|
subgraph,
|
||||||
|
// second call with start nodes, which is the destination node in this
|
||||||
|
// case
|
||||||
new Set([node1]),
|
new Set([node1]),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -4,28 +4,30 @@ import type { DirectedGraph } from './directed-graph';
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns new run data that does not contain data for any node that is a child
|
* Returns new run data that does not contain data for any node that is a child
|
||||||
* of any start node.
|
* of any of the passed nodes. This is useful for cleaning run data after start
|
||||||
|
* nodes or dirty nodes.
|
||||||
|
*
|
||||||
* This does not mutate the `runData` being passed in.
|
* This does not mutate the `runData` being passed in.
|
||||||
*/
|
*/
|
||||||
export function cleanRunData(
|
export function cleanRunData(
|
||||||
runData: IRunData,
|
runData: IRunData,
|
||||||
graph: DirectedGraph,
|
graph: DirectedGraph,
|
||||||
startNodes: Set<INode>,
|
nodesToClean: Set<INode>,
|
||||||
): IRunData {
|
): IRunData {
|
||||||
const newRunData: IRunData = { ...runData };
|
const newRunData: IRunData = { ...runData };
|
||||||
|
|
||||||
for (const startNode of startNodes) {
|
for (const nodeToClean of nodesToClean) {
|
||||||
delete newRunData[startNode.name];
|
delete newRunData[nodeToClean.name];
|
||||||
|
|
||||||
const children = graph.getChildren(startNode);
|
const children = graph.getChildren(nodeToClean);
|
||||||
for (const node of [startNode, ...children]) {
|
for (const node of [nodeToClean, ...children]) {
|
||||||
delete newRunData[node.name];
|
delete newRunData[node.name];
|
||||||
|
|
||||||
// Delete runData for subNodes
|
// Delete runData for subNodes
|
||||||
const subNodeConnections = graph.getParentConnections(node);
|
const subNodeConnections = graph.getParentConnections(node);
|
||||||
for (const subNodeConnection of subNodeConnections) {
|
for (const subNodeConnection of subNodeConnections) {
|
||||||
// Sub nodes never use the Main connection type, so this filters out
|
// Sub nodes never use the Main connection type, so this filters out
|
||||||
// the connection that goes upstream of the startNode.
|
// the connection that goes upstream of the node to clean.
|
||||||
if (subNodeConnection.type === NodeConnectionType.Main) {
|
if (subNodeConnection.type === NodeConnectionType.Main) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@
|
|||||||
import { Container } from '@n8n/di';
|
import { Container } from '@n8n/di';
|
||||||
import * as assert from 'assert/strict';
|
import * as assert from 'assert/strict';
|
||||||
import { setMaxListeners } from 'events';
|
import { setMaxListeners } from 'events';
|
||||||
import { omit } from 'lodash';
|
|
||||||
import get from 'lodash/get';
|
import get from 'lodash/get';
|
||||||
import type {
|
import type {
|
||||||
ExecutionBaseError,
|
ExecutionBaseError,
|
||||||
@@ -51,6 +50,7 @@ import {
|
|||||||
ExecutionCancelledError,
|
ExecutionCancelledError,
|
||||||
Node,
|
Node,
|
||||||
UnexpectedError,
|
UnexpectedError,
|
||||||
|
UserError,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import PCancelable from 'p-cancelable';
|
import PCancelable from 'p-cancelable';
|
||||||
|
|
||||||
@@ -396,7 +396,7 @@ export class WorkflowExecute {
|
|||||||
// 1. Find the Trigger
|
// 1. Find the Trigger
|
||||||
const trigger = findTriggerForPartialExecution(workflow, destinationNodeName);
|
const trigger = findTriggerForPartialExecution(workflow, destinationNodeName);
|
||||||
if (trigger === undefined) {
|
if (trigger === undefined) {
|
||||||
throw new ApplicationError('Connect a trigger to run this node');
|
throw new UserError('Connect a trigger to run this node');
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Find the Subgraph
|
// 2. Find the Subgraph
|
||||||
@@ -404,7 +404,8 @@ export class WorkflowExecute {
|
|||||||
const filteredNodes = graph.getNodes();
|
const filteredNodes = graph.getNodes();
|
||||||
|
|
||||||
// 3. Find the Start Nodes
|
// 3. Find the Start Nodes
|
||||||
runData = omit(runData, dirtyNodeNames);
|
const dirtyNodes = new Set(workflow.getNodes(dirtyNodeNames));
|
||||||
|
runData = cleanRunData(runData, graph, dirtyNodes);
|
||||||
let startNodes = findStartNodes({ graph, trigger, destination, runData, pinData });
|
let startNodes = findStartNodes({ graph, trigger, destination, runData, pinData });
|
||||||
|
|
||||||
// 4. Detect Cycles
|
// 4. Detect Cycles
|
||||||
|
|||||||
@@ -289,6 +289,27 @@ export class Workflow {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the nodes with the given names if they exist.
|
||||||
|
* If a node cannot be found it will be ignored, meaning the returned array
|
||||||
|
* of nodes can be smaller than the array of names.
|
||||||
|
*/
|
||||||
|
getNodes(nodeNames: string[]): INode[] {
|
||||||
|
const nodes: INode[] = [];
|
||||||
|
for (const name of nodeNames) {
|
||||||
|
const node = this.getNode(name);
|
||||||
|
if (!node) {
|
||||||
|
console.warn(
|
||||||
|
`Could not find a node with the name ${name} in the workflow. This was passed in as a dirty node name.`,
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
nodes.push(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the pinData of the node with the given name if it exists
|
* Returns the pinData of the node with the given name if it exists
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -344,6 +344,10 @@ describe('Workflow', () => {
|
|||||||
active: false,
|
active: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.restoreAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
describe('renameNodeInParameterValue', () => {
|
describe('renameNodeInParameterValue', () => {
|
||||||
describe('for expressions', () => {
|
describe('for expressions', () => {
|
||||||
const tests = [
|
const tests = [
|
||||||
@@ -2339,4 +2343,65 @@ describe('Workflow', () => {
|
|||||||
expect(workflow.getStartNode()).toBeUndefined();
|
expect(workflow.getStartNode()).toBeUndefined();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('getNode', () => {
|
||||||
|
test('should return the node with the given name if it exists', () => {
|
||||||
|
const workflow = SIMPLE_WORKFLOW;
|
||||||
|
const node = workflow.getNode('Start');
|
||||||
|
expect(node).not.toBeNull();
|
||||||
|
expect(node?.name).toBe('Start');
|
||||||
|
expect(node?.type).toBe('test.set');
|
||||||
|
expect(node?.id).toBe('uuid-1');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should return null if the node does not exist', () => {
|
||||||
|
const nonExistentNode = SIMPLE_WORKFLOW.getNode('NonExistentNode');
|
||||||
|
expect(nonExistentNode).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getNodes', () => {
|
||||||
|
test('should return all requested nodes that exist', () => {
|
||||||
|
const nodes = SIMPLE_WORKFLOW.getNodes(['Start', 'Set', 'Set1']);
|
||||||
|
expect(nodes).toHaveLength(3);
|
||||||
|
expect(nodes[0].name).toBe('Start');
|
||||||
|
expect(nodes[1].name).toBe('Set');
|
||||||
|
expect(nodes[2].name).toBe('Set1');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should return nodes in the order they were requested', () => {
|
||||||
|
const nodes = SIMPLE_WORKFLOW.getNodes(['Set1', 'Start', 'Set']);
|
||||||
|
expect(nodes).toHaveLength(3);
|
||||||
|
expect(nodes[0].name).toBe('Set1');
|
||||||
|
expect(nodes[1].name).toBe('Start');
|
||||||
|
expect(nodes[2].name).toBe('Set');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should skip nodes that do not exist and log a warning', () => {
|
||||||
|
// Spy on console.warn
|
||||||
|
const consoleWarnSpy = jest.spyOn(console, 'warn').mockImplementation();
|
||||||
|
|
||||||
|
const nodes = SIMPLE_WORKFLOW.getNodes(['Start', 'NonExistentNode', 'Set1']);
|
||||||
|
expect(nodes).toHaveLength(2);
|
||||||
|
expect(nodes[0].name).toBe('Start');
|
||||||
|
expect(nodes[1].name).toBe('Set1');
|
||||||
|
expect(consoleWarnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining('Could not find a node with the name NonExistentNode'),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should return an empty array if none of the requested nodes exist', () => {
|
||||||
|
// Spy on console.warn
|
||||||
|
const consoleWarnSpy = jest.spyOn(console, 'warn').mockImplementation();
|
||||||
|
|
||||||
|
const nodes = SIMPLE_WORKFLOW.getNodes(['NonExistentNode1', 'NonExistentNode2']);
|
||||||
|
expect(nodes).toHaveLength(0);
|
||||||
|
expect(consoleWarnSpy).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('should handle an empty array of node names', () => {
|
||||||
|
const nodes = SIMPLE_WORKFLOW.getNodes([]);
|
||||||
|
expect(nodes).toHaveLength(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user