mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
feat(core): Implement partial execution of tool nodes (no-changelog) (#14939)
This commit is contained in:
@@ -6,6 +6,8 @@ import {
|
|||||||
recreateNodeExecutionStack,
|
recreateNodeExecutionStack,
|
||||||
WorkflowExecute,
|
WorkflowExecute,
|
||||||
Logger,
|
Logger,
|
||||||
|
isTool,
|
||||||
|
rewireGraph,
|
||||||
} from 'n8n-core';
|
} from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
IPinData,
|
IPinData,
|
||||||
@@ -107,6 +109,21 @@ export class ManualExecutionService {
|
|||||||
|
|
||||||
const startNode = this.getExecutionStartNode(data, workflow);
|
const startNode = this.getExecutionStartNode(data, workflow);
|
||||||
|
|
||||||
|
if (data.destinationNode) {
|
||||||
|
const destinationNode = workflow.getNode(data.destinationNode);
|
||||||
|
a.ok(
|
||||||
|
destinationNode,
|
||||||
|
`Could not find a node named "${data.destinationNode}" in the workflow.`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Rewire graph to be able to execute the destination tool node
|
||||||
|
if (isTool(destinationNode, workflow.nodeTypes)) {
|
||||||
|
workflow = rewireGraph(destinationNode, DirectedGraph.fromWorkflow(workflow)).toWorkflow({
|
||||||
|
...workflow,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Can execute without webhook so go on
|
// Can execute without webhook so go on
|
||||||
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
|
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
|
||||||
|
|
||||||
|
|||||||
@@ -631,6 +631,64 @@ describe('WorkflowExecute', () => {
|
|||||||
new DirectedGraph().addNode(orphan).toWorkflow({ ...workflow }),
|
new DirectedGraph().addNode(orphan).toWorkflow({ ...workflow }),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ┌───────┐ ┌───────────┐
|
||||||
|
// │trigger├────►│agentNode │
|
||||||
|
// └───────┘ └───────────┘
|
||||||
|
// │ ┌──────┐
|
||||||
|
// └─│ Tool │
|
||||||
|
// └──────┘
|
||||||
|
it('rewires graph for partial execution of tools', async () => {
|
||||||
|
// ARRANGE
|
||||||
|
const waitPromise = createDeferredPromise<IRun>();
|
||||||
|
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise);
|
||||||
|
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
|
||||||
|
const nodeTypes = Helpers.NodeTypes();
|
||||||
|
|
||||||
|
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.toolTest' });
|
||||||
|
const agentNode = createNodeData({ name: 'agent' });
|
||||||
|
|
||||||
|
const workflow = new DirectedGraph()
|
||||||
|
.addNodes(trigger, tool, agentNode)
|
||||||
|
.addConnections(
|
||||||
|
{ from: trigger, to: agentNode },
|
||||||
|
{ from: tool, to: agentNode, type: NodeConnectionTypes.AiTool },
|
||||||
|
)
|
||||||
|
.toWorkflow({ name: '', active: false, nodeTypes });
|
||||||
|
const pinData: IPinData = {};
|
||||||
|
const runData: IRunData = {
|
||||||
|
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
|
||||||
|
};
|
||||||
|
const dirtyNodeNames: string[] = [];
|
||||||
|
|
||||||
|
const processRunExecutionDataSpy = jest
|
||||||
|
.spyOn(workflowExecute, 'processRunExecutionData')
|
||||||
|
.mockImplementationOnce(jest.fn());
|
||||||
|
|
||||||
|
const expectedTool = {
|
||||||
|
...tool,
|
||||||
|
rewireOutputLogTo: NodeConnectionTypes.AiTool,
|
||||||
|
};
|
||||||
|
|
||||||
|
const expectedGraph = new DirectedGraph()
|
||||||
|
.addNodes(trigger, expectedTool)
|
||||||
|
.addConnections({ from: trigger, to: expectedTool })
|
||||||
|
.toWorkflow({ ...workflow });
|
||||||
|
|
||||||
|
// ACT
|
||||||
|
await workflowExecute.runPartialWorkflow2(
|
||||||
|
workflow,
|
||||||
|
runData,
|
||||||
|
pinData,
|
||||||
|
dirtyNodeNames,
|
||||||
|
tool.name,
|
||||||
|
);
|
||||||
|
|
||||||
|
// ASSERT
|
||||||
|
expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(processRunExecutionDataSpy).toHaveBeenCalledWith(expectedGraph);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('checkReadyForExecution', () => {
|
describe('checkReadyForExecution', () => {
|
||||||
|
|||||||
@@ -0,0 +1,51 @@
|
|||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import { type INode, type INodeTypes, NodeConnectionTypes } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { isTool } from '../is-tool';
|
||||||
|
|
||||||
|
const mockNode = mock<INode>({ id: '1', type: 'n8n-nodes-base.openAi', typeVersion: 1 });
|
||||||
|
const mockNodeTypes = mock<INodeTypes>();
|
||||||
|
|
||||||
|
describe('isTool', () => {
|
||||||
|
it('should return true for a node with AiTool output', () => {
|
||||||
|
mockNodeTypes.getByNameAndVersion.mockReturnValue({
|
||||||
|
description: {
|
||||||
|
outputs: [NodeConnectionTypes.AiTool],
|
||||||
|
version: 0,
|
||||||
|
defaults: {
|
||||||
|
name: '',
|
||||||
|
color: '',
|
||||||
|
},
|
||||||
|
inputs: [NodeConnectionTypes.Main],
|
||||||
|
properties: [],
|
||||||
|
displayName: '',
|
||||||
|
name: '',
|
||||||
|
group: [],
|
||||||
|
description: '',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const result = isTool(mockNode, mockNodeTypes);
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for node with no AiTool output', () => {
|
||||||
|
mockNodeTypes.getByNameAndVersion.mockReturnValue({
|
||||||
|
description: {
|
||||||
|
outputs: [NodeConnectionTypes.Main],
|
||||||
|
version: 0,
|
||||||
|
defaults: {
|
||||||
|
name: '',
|
||||||
|
color: '',
|
||||||
|
},
|
||||||
|
inputs: [NodeConnectionTypes.Main],
|
||||||
|
properties: [],
|
||||||
|
displayName: '',
|
||||||
|
name: '',
|
||||||
|
group: [],
|
||||||
|
description: '',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const result = isTool(mockNode, mockNodeTypes);
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,118 @@
|
|||||||
|
import { NodeConnectionTypes } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { createNodeData } from './helpers';
|
||||||
|
import { DirectedGraph } from '../directed-graph';
|
||||||
|
import { rewireGraph } from '../rewire-graph';
|
||||||
|
|
||||||
|
describe('rewireGraph()', () => {
|
||||||
|
it('rewires a simple graph with a tool node', () => {
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' });
|
||||||
|
const root = createNodeData({ name: 'root' });
|
||||||
|
const trigger = createNodeData({ name: 'trigger' });
|
||||||
|
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNodes(trigger, root, tool);
|
||||||
|
graph.addConnections(
|
||||||
|
{ from: trigger, to: root, type: NodeConnectionTypes.Main },
|
||||||
|
{ from: tool, to: root, type: NodeConnectionTypes.AiTool },
|
||||||
|
);
|
||||||
|
|
||||||
|
const rewiredGraph = rewireGraph(tool, graph);
|
||||||
|
|
||||||
|
const toolConnections = rewiredGraph.getDirectParentConnections(tool);
|
||||||
|
expect(toolConnections).toHaveLength(1);
|
||||||
|
expect(toolConnections[0].from.name).toBe('trigger');
|
||||||
|
expect(toolConnections[0].type).toBe(NodeConnectionTypes.Main);
|
||||||
|
|
||||||
|
expect(rewiredGraph.hasNode(root.name)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rewires all incoming connections of the root node to the tool', () => {
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' });
|
||||||
|
const root = createNodeData({ name: 'root' });
|
||||||
|
const trigger = createNodeData({ name: 'trigger' });
|
||||||
|
const secondNode = createNodeData({ name: 'secondNode' });
|
||||||
|
const thirdNode = createNodeData({ name: 'thirdNode' });
|
||||||
|
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNodes(trigger, root, tool, secondNode, thirdNode);
|
||||||
|
graph.addConnections(
|
||||||
|
{ from: trigger, to: secondNode, type: NodeConnectionTypes.Main },
|
||||||
|
{ from: trigger, to: thirdNode, type: NodeConnectionTypes.Main },
|
||||||
|
{ from: tool, to: root, type: NodeConnectionTypes.AiTool },
|
||||||
|
{ from: secondNode, to: root, type: NodeConnectionTypes.Main },
|
||||||
|
{ from: thirdNode, to: root, type: NodeConnectionTypes.Main },
|
||||||
|
);
|
||||||
|
|
||||||
|
const rewiredGraph = rewireGraph(tool, graph);
|
||||||
|
|
||||||
|
const toolConnections = rewiredGraph.getDirectParentConnections(tool);
|
||||||
|
expect(toolConnections).toHaveLength(2);
|
||||||
|
expect(toolConnections.map((cn) => cn.from.name).sort()).toEqual(
|
||||||
|
['secondNode', 'thirdNode'].sort(),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('ignores non-main connections when rewiring', () => {
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' });
|
||||||
|
const root = createNodeData({ name: 'root' });
|
||||||
|
const parent = createNodeData({ name: 'parent' });
|
||||||
|
const trigger = createNodeData({ name: 'trigger' });
|
||||||
|
const child = createNodeData({ name: 'child' });
|
||||||
|
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNodes(trigger, root, tool, parent, child);
|
||||||
|
graph.addConnections(
|
||||||
|
{ from: trigger, to: root, type: NodeConnectionTypes.Main },
|
||||||
|
{ from: parent, to: root, type: NodeConnectionTypes.AiLanguageModel },
|
||||||
|
{ from: child, to: root, type: NodeConnectionTypes.AiTool },
|
||||||
|
{ from: tool, to: root, type: NodeConnectionTypes.AiTool },
|
||||||
|
);
|
||||||
|
|
||||||
|
const rewiredGraph = rewireGraph(tool, graph);
|
||||||
|
|
||||||
|
const toolConnections = rewiredGraph.getDirectParentConnections(tool);
|
||||||
|
expect(toolConnections).toHaveLength(1);
|
||||||
|
expect(toolConnections[0].type).toBe(NodeConnectionTypes.Main);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('sets rewireOutputLogTo to AiTool on the tool node', () => {
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' });
|
||||||
|
const trigger = createNodeData({ name: 'trigger' });
|
||||||
|
const root = createNodeData({ name: 'root' });
|
||||||
|
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNodes(trigger, root, tool);
|
||||||
|
graph.addConnections(
|
||||||
|
{ from: trigger, to: root, type: NodeConnectionTypes.Main },
|
||||||
|
{ from: tool, to: root, type: NodeConnectionTypes.AiTool },
|
||||||
|
);
|
||||||
|
|
||||||
|
rewireGraph(tool, graph);
|
||||||
|
|
||||||
|
expect(tool.rewireOutputLogTo).toBe(NodeConnectionTypes.AiTool);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('fails when the tool has no incoming connections', () => {
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' });
|
||||||
|
const root = createNodeData({ name: 'root' });
|
||||||
|
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNodes(root, tool);
|
||||||
|
|
||||||
|
expect(() => rewireGraph(tool, graph)).toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('removes the root node from the graph', () => {
|
||||||
|
const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' });
|
||||||
|
const root = createNodeData({ name: 'root' });
|
||||||
|
|
||||||
|
const graph = new DirectedGraph();
|
||||||
|
graph.addNodes(root, tool);
|
||||||
|
graph.addConnections({ from: tool, to: root, type: NodeConnectionTypes.AiTool });
|
||||||
|
|
||||||
|
const rewiredGraph = rewireGraph(tool, graph);
|
||||||
|
|
||||||
|
expect(rewiredGraph.hasNode(root.name)).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -6,3 +6,5 @@ export { recreateNodeExecutionStack } from './recreate-node-execution-stack';
|
|||||||
export { cleanRunData } from './clean-run-data';
|
export { cleanRunData } from './clean-run-data';
|
||||||
export { handleCycles } from './handle-cycles';
|
export { handleCycles } from './handle-cycles';
|
||||||
export { filterDisabledNodes } from './filter-disabled-nodes';
|
export { filterDisabledNodes } from './filter-disabled-nodes';
|
||||||
|
export { isTool } from './is-tool';
|
||||||
|
export { rewireGraph } from './rewire-graph';
|
||||||
|
|||||||
@@ -0,0 +1,6 @@
|
|||||||
|
import { type INode, type INodeTypes, NodeConnectionTypes } from 'n8n-workflow';
|
||||||
|
|
||||||
|
export function isTool(node: INode, nodeTypes: INodeTypes) {
|
||||||
|
const type = nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
|
||||||
|
return type.description.outputs.includes(NodeConnectionTypes.AiTool);
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
import * as a from 'assert/strict';
|
||||||
|
import { type INode, NodeConnectionTypes } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { type DirectedGraph } from './directed-graph';
|
||||||
|
|
||||||
|
export function rewireGraph(tool: INode, graph: DirectedGraph): DirectedGraph {
|
||||||
|
graph = graph.clone();
|
||||||
|
const children = graph.getChildren(tool);
|
||||||
|
|
||||||
|
a.ok(children.size > 0, 'Tool must be connected to a root node');
|
||||||
|
|
||||||
|
const rootNode = [...children][0];
|
||||||
|
|
||||||
|
a.ok(rootNode);
|
||||||
|
|
||||||
|
const allIncomingConnection = graph
|
||||||
|
.getDirectParentConnections(rootNode)
|
||||||
|
.filter((cn) => cn.type === NodeConnectionTypes.Main);
|
||||||
|
|
||||||
|
tool.rewireOutputLogTo = NodeConnectionTypes.AiTool;
|
||||||
|
|
||||||
|
for (const cn of allIncomingConnection) {
|
||||||
|
graph.addConnection({ from: cn.from, to: tool });
|
||||||
|
}
|
||||||
|
|
||||||
|
graph.removeNode(rootNode);
|
||||||
|
|
||||||
|
return graph;
|
||||||
|
}
|
||||||
@@ -68,6 +68,8 @@ import {
|
|||||||
recreateNodeExecutionStack,
|
recreateNodeExecutionStack,
|
||||||
handleCycles,
|
handleCycles,
|
||||||
filterDisabledNodes,
|
filterDisabledNodes,
|
||||||
|
rewireGraph,
|
||||||
|
isTool,
|
||||||
} from './partial-execution-utils';
|
} from './partial-execution-utils';
|
||||||
import { RoutingNode } from './routing-node';
|
import { RoutingNode } from './routing-node';
|
||||||
import { TriggersAndPollers } from './triggers-and-pollers';
|
import { TriggersAndPollers } from './triggers-and-pollers';
|
||||||
@@ -356,41 +358,47 @@ export class WorkflowExecute {
|
|||||||
|
|
||||||
let graph = DirectedGraph.fromWorkflow(workflow);
|
let graph = DirectedGraph.fromWorkflow(workflow);
|
||||||
|
|
||||||
// Edge Case 1:
|
// Partial execution of nodes as tools
|
||||||
// Support executing a single node that is not connected to a trigger
|
if (isTool(destination, workflow.nodeTypes)) {
|
||||||
const destinationHasNoParents = graph.getDirectParentConnections(destination).length === 0;
|
graph = rewireGraph(destination, graph);
|
||||||
if (destinationHasNoParents) {
|
workflow = graph.toWorkflow({ ...workflow });
|
||||||
// short cut here, only create a subgraph and the stacks
|
} else {
|
||||||
graph = findSubgraph({
|
// Edge Case 1:
|
||||||
graph: filterDisabledNodes(graph),
|
// Support executing a single node that is not connected to a trigger
|
||||||
destination,
|
const destinationHasNoParents = graph.getDirectParentConnections(destination).length === 0;
|
||||||
trigger: destination,
|
if (destinationHasNoParents) {
|
||||||
});
|
// short cut here, only create a subgraph and the stacks
|
||||||
const filteredNodes = graph.getNodes();
|
graph = findSubgraph({
|
||||||
runData = cleanRunData(runData, graph, new Set([destination]));
|
graph: filterDisabledNodes(graph),
|
||||||
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
|
destination,
|
||||||
recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {});
|
trigger: destination,
|
||||||
|
});
|
||||||
|
const filteredNodes = graph.getNodes();
|
||||||
|
runData = cleanRunData(runData, graph, new Set([destination]));
|
||||||
|
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
|
||||||
|
recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {});
|
||||||
|
|
||||||
this.status = 'running';
|
this.status = 'running';
|
||||||
this.runExecutionData = {
|
this.runExecutionData = {
|
||||||
startData: {
|
startData: {
|
||||||
destinationNode: destinationNodeName,
|
destinationNode: destinationNodeName,
|
||||||
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
|
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
|
||||||
},
|
},
|
||||||
resultData: {
|
resultData: {
|
||||||
runData,
|
runData,
|
||||||
pinData,
|
pinData,
|
||||||
},
|
},
|
||||||
executionData: {
|
executionData: {
|
||||||
contextData: {},
|
contextData: {},
|
||||||
nodeExecutionStack,
|
nodeExecutionStack,
|
||||||
metadata: {},
|
metadata: {},
|
||||||
waitingExecution,
|
waitingExecution,
|
||||||
waitingExecutionSource,
|
waitingExecutionSource,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
|
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. Find the Trigger
|
// 1. Find the Trigger
|
||||||
@@ -1704,6 +1712,13 @@ export class WorkflowExecute {
|
|||||||
main: nodeSuccessData,
|
main: nodeSuccessData,
|
||||||
} as ITaskDataConnections;
|
} as ITaskDataConnections;
|
||||||
|
|
||||||
|
// Rewire output data log to the given connectionType
|
||||||
|
if (executionNode.rewireOutputLogTo) {
|
||||||
|
taskData.data = {
|
||||||
|
[executionNode.rewireOutputLogTo]: nodeSuccessData,
|
||||||
|
} as ITaskDataConnections;
|
||||||
|
}
|
||||||
|
|
||||||
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
|
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
|
||||||
|
|
||||||
if (this.runExecutionData.waitTill) {
|
if (this.runExecutionData.waitTill) {
|
||||||
|
|||||||
@@ -43,6 +43,25 @@ export const predefinedNodesTypes: INodeTypeData = {
|
|||||||
type: new SplitInBatches(),
|
type: new SplitInBatches(),
|
||||||
sourcePath: '',
|
sourcePath: '',
|
||||||
},
|
},
|
||||||
|
'n8n-nodes-base.toolTest': {
|
||||||
|
sourcePath: '',
|
||||||
|
type: {
|
||||||
|
description: {
|
||||||
|
displayName: 'Test tool',
|
||||||
|
name: 'toolTest',
|
||||||
|
group: ['transform'],
|
||||||
|
version: 1,
|
||||||
|
description: 'Test tool',
|
||||||
|
inputs: [],
|
||||||
|
defaults: {
|
||||||
|
name: 'Test Tool',
|
||||||
|
color: '#0000FF',
|
||||||
|
},
|
||||||
|
outputs: [NodeConnectionTypes.AiTool],
|
||||||
|
properties: [],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
'n8n-nodes-base.versionTest': {
|
'n8n-nodes-base.versionTest': {
|
||||||
sourcePath: '',
|
sourcePath: '',
|
||||||
type: {
|
type: {
|
||||||
|
|||||||
@@ -1135,6 +1135,7 @@ export interface INode {
|
|||||||
credentials?: INodeCredentials;
|
credentials?: INodeCredentials;
|
||||||
webhookId?: string;
|
webhookId?: string;
|
||||||
extendsCredential?: string;
|
extendsCredential?: string;
|
||||||
|
rewireOutputLogTo?: NodeConnectionType;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface IPinData {
|
export interface IPinData {
|
||||||
|
|||||||
Reference in New Issue
Block a user