diff --git a/packages/cli/src/manual-execution.service.ts b/packages/cli/src/manual-execution.service.ts index 78b7e1723f..e48c520a60 100644 --- a/packages/cli/src/manual-execution.service.ts +++ b/packages/cli/src/manual-execution.service.ts @@ -6,6 +6,8 @@ import { recreateNodeExecutionStack, WorkflowExecute, Logger, + isTool, + rewireGraph, } from 'n8n-core'; import type { IPinData, @@ -107,6 +109,21 @@ export class ManualExecutionService { const startNode = this.getExecutionStartNode(data, workflow); + if (data.destinationNode) { + const destinationNode = workflow.getNode(data.destinationNode); + a.ok( + destinationNode, + `Could not find a node named "${data.destinationNode}" in the workflow.`, + ); + + // Rewire graph to be able to execute the destination tool node + if (isTool(destinationNode, workflow.nodeTypes)) { + workflow = rewireGraph(destinationNode, DirectedGraph.fromWorkflow(workflow)).toWorkflow({ + ...workflow, + }); + } + } + // Can execute without webhook so go on const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts index 3b0e1a46fe..516be4aec1 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts @@ -631,6 +631,64 @@ describe('WorkflowExecute', () => { new DirectedGraph().addNode(orphan).toWorkflow({ ...workflow }), ); }); + + // ┌───────┐ ┌───────────┐ + // │trigger├────►│agentNode │ + // └───────┘ └───────────┘ + // │ ┌──────┐ + // └─│ Tool │ + // └──────┘ + it('rewires graph for partial execution of tools', async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise); + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + const nodeTypes = Helpers.NodeTypes(); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.toolTest' }); + const agentNode = createNodeData({ name: 'agent' }); + + const workflow = new DirectedGraph() + .addNodes(trigger, tool, agentNode) + .addConnections( + { from: trigger, to: agentNode }, + { from: tool, to: agentNode, type: NodeConnectionTypes.AiTool }, + ) + .toWorkflow({ name: '', active: false, nodeTypes }); + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const dirtyNodeNames: string[] = []; + + const processRunExecutionDataSpy = jest + .spyOn(workflowExecute, 'processRunExecutionData') + .mockImplementationOnce(jest.fn()); + + const expectedTool = { + ...tool, + rewireOutputLogTo: NodeConnectionTypes.AiTool, + }; + + const expectedGraph = new DirectedGraph() + .addNodes(trigger, expectedTool) + .addConnections({ from: trigger, to: expectedTool }) + .toWorkflow({ ...workflow }); + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + tool.name, + ); + + // ASSERT + expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1); + expect(processRunExecutionDataSpy).toHaveBeenCalledWith(expectedGraph); + }); }); describe('checkReadyForExecution', () => { diff --git a/packages/core/src/execution-engine/partial-execution-utils/__tests__/is-tool.test.ts b/packages/core/src/execution-engine/partial-execution-utils/__tests__/is-tool.test.ts new file mode 100644 index 0000000000..e7b5341d02 --- /dev/null +++ b/packages/core/src/execution-engine/partial-execution-utils/__tests__/is-tool.test.ts @@ -0,0 +1,51 @@ +import { mock } from 'jest-mock-extended'; +import { type INode, type INodeTypes, NodeConnectionTypes } from 'n8n-workflow'; + +import { isTool } from '../is-tool'; + +const mockNode = mock({ id: '1', type: 'n8n-nodes-base.openAi', typeVersion: 1 }); +const mockNodeTypes = mock(); + +describe('isTool', () => { + it('should return true for a node with AiTool output', () => { + mockNodeTypes.getByNameAndVersion.mockReturnValue({ + description: { + outputs: [NodeConnectionTypes.AiTool], + version: 0, + defaults: { + name: '', + color: '', + }, + inputs: [NodeConnectionTypes.Main], + properties: [], + displayName: '', + name: '', + group: [], + description: '', + }, + }); + const result = isTool(mockNode, mockNodeTypes); + expect(result).toBe(true); + }); + + it('returns false for node with no AiTool output', () => { + mockNodeTypes.getByNameAndVersion.mockReturnValue({ + description: { + outputs: [NodeConnectionTypes.Main], + version: 0, + defaults: { + name: '', + color: '', + }, + inputs: [NodeConnectionTypes.Main], + properties: [], + displayName: '', + name: '', + group: [], + description: '', + }, + }); + const result = isTool(mockNode, mockNodeTypes); + expect(result).toBe(false); + }); +}); diff --git a/packages/core/src/execution-engine/partial-execution-utils/__tests__/rewire-graph.test.ts b/packages/core/src/execution-engine/partial-execution-utils/__tests__/rewire-graph.test.ts new file mode 100644 index 0000000000..839650f6c0 --- /dev/null +++ b/packages/core/src/execution-engine/partial-execution-utils/__tests__/rewire-graph.test.ts @@ -0,0 +1,118 @@ +import { NodeConnectionTypes } from 'n8n-workflow'; + +import { createNodeData } from './helpers'; +import { DirectedGraph } from '../directed-graph'; +import { rewireGraph } from '../rewire-graph'; + +describe('rewireGraph()', () => { + it('rewires a simple graph with a tool node', () => { + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' }); + const root = createNodeData({ name: 'root' }); + const trigger = createNodeData({ name: 'trigger' }); + + const graph = new DirectedGraph(); + graph.addNodes(trigger, root, tool); + graph.addConnections( + { from: trigger, to: root, type: NodeConnectionTypes.Main }, + { from: tool, to: root, type: NodeConnectionTypes.AiTool }, + ); + + const rewiredGraph = rewireGraph(tool, graph); + + const toolConnections = rewiredGraph.getDirectParentConnections(tool); + expect(toolConnections).toHaveLength(1); + expect(toolConnections[0].from.name).toBe('trigger'); + expect(toolConnections[0].type).toBe(NodeConnectionTypes.Main); + + expect(rewiredGraph.hasNode(root.name)).toBe(false); + }); + + it('rewires all incoming connections of the root node to the tool', () => { + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' }); + const root = createNodeData({ name: 'root' }); + const trigger = createNodeData({ name: 'trigger' }); + const secondNode = createNodeData({ name: 'secondNode' }); + const thirdNode = createNodeData({ name: 'thirdNode' }); + + const graph = new DirectedGraph(); + graph.addNodes(trigger, root, tool, secondNode, thirdNode); + graph.addConnections( + { from: trigger, to: secondNode, type: NodeConnectionTypes.Main }, + { from: trigger, to: thirdNode, type: NodeConnectionTypes.Main }, + { from: tool, to: root, type: NodeConnectionTypes.AiTool }, + { from: secondNode, to: root, type: NodeConnectionTypes.Main }, + { from: thirdNode, to: root, type: NodeConnectionTypes.Main }, + ); + + const rewiredGraph = rewireGraph(tool, graph); + + const toolConnections = rewiredGraph.getDirectParentConnections(tool); + expect(toolConnections).toHaveLength(2); + expect(toolConnections.map((cn) => cn.from.name).sort()).toEqual( + ['secondNode', 'thirdNode'].sort(), + ); + }); + + it('ignores non-main connections when rewiring', () => { + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' }); + const root = createNodeData({ name: 'root' }); + const parent = createNodeData({ name: 'parent' }); + const trigger = createNodeData({ name: 'trigger' }); + const child = createNodeData({ name: 'child' }); + + const graph = new DirectedGraph(); + graph.addNodes(trigger, root, tool, parent, child); + graph.addConnections( + { from: trigger, to: root, type: NodeConnectionTypes.Main }, + { from: parent, to: root, type: NodeConnectionTypes.AiLanguageModel }, + { from: child, to: root, type: NodeConnectionTypes.AiTool }, + { from: tool, to: root, type: NodeConnectionTypes.AiTool }, + ); + + const rewiredGraph = rewireGraph(tool, graph); + + const toolConnections = rewiredGraph.getDirectParentConnections(tool); + expect(toolConnections).toHaveLength(1); + expect(toolConnections[0].type).toBe(NodeConnectionTypes.Main); + }); + + it('sets rewireOutputLogTo to AiTool on the tool node', () => { + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' }); + const trigger = createNodeData({ name: 'trigger' }); + const root = createNodeData({ name: 'root' }); + + const graph = new DirectedGraph(); + graph.addNodes(trigger, root, tool); + graph.addConnections( + { from: trigger, to: root, type: NodeConnectionTypes.Main }, + { from: tool, to: root, type: NodeConnectionTypes.AiTool }, + ); + + rewireGraph(tool, graph); + + expect(tool.rewireOutputLogTo).toBe(NodeConnectionTypes.AiTool); + }); + + it('fails when the tool has no incoming connections', () => { + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' }); + const root = createNodeData({ name: 'root' }); + + const graph = new DirectedGraph(); + graph.addNodes(root, tool); + + expect(() => rewireGraph(tool, graph)).toThrow(); + }); + + it('removes the root node from the graph', () => { + const tool = createNodeData({ name: 'tool', type: 'n8n-nodes-base.ai-tool' }); + const root = createNodeData({ name: 'root' }); + + const graph = new DirectedGraph(); + graph.addNodes(root, tool); + graph.addConnections({ from: tool, to: root, type: NodeConnectionTypes.AiTool }); + + const rewiredGraph = rewireGraph(tool, graph); + + expect(rewiredGraph.hasNode(root.name)).toBe(false); + }); +}); diff --git a/packages/core/src/execution-engine/partial-execution-utils/index.ts b/packages/core/src/execution-engine/partial-execution-utils/index.ts index d363f52302..f7eb5c6eca 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/index.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/index.ts @@ -6,3 +6,5 @@ export { recreateNodeExecutionStack } from './recreate-node-execution-stack'; export { cleanRunData } from './clean-run-data'; export { handleCycles } from './handle-cycles'; export { filterDisabledNodes } from './filter-disabled-nodes'; +export { isTool } from './is-tool'; +export { rewireGraph } from './rewire-graph'; diff --git a/packages/core/src/execution-engine/partial-execution-utils/is-tool.ts b/packages/core/src/execution-engine/partial-execution-utils/is-tool.ts new file mode 100644 index 0000000000..2ce3ceb914 --- /dev/null +++ b/packages/core/src/execution-engine/partial-execution-utils/is-tool.ts @@ -0,0 +1,6 @@ +import { type INode, type INodeTypes, NodeConnectionTypes } from 'n8n-workflow'; + +export function isTool(node: INode, nodeTypes: INodeTypes) { + const type = nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + return type.description.outputs.includes(NodeConnectionTypes.AiTool); +} diff --git a/packages/core/src/execution-engine/partial-execution-utils/rewire-graph.ts b/packages/core/src/execution-engine/partial-execution-utils/rewire-graph.ts new file mode 100644 index 0000000000..51decae9a0 --- /dev/null +++ b/packages/core/src/execution-engine/partial-execution-utils/rewire-graph.ts @@ -0,0 +1,29 @@ +import * as a from 'assert/strict'; +import { type INode, NodeConnectionTypes } from 'n8n-workflow'; + +import { type DirectedGraph } from './directed-graph'; + +export function rewireGraph(tool: INode, graph: DirectedGraph): DirectedGraph { + graph = graph.clone(); + const children = graph.getChildren(tool); + + a.ok(children.size > 0, 'Tool must be connected to a root node'); + + const rootNode = [...children][0]; + + a.ok(rootNode); + + const allIncomingConnection = graph + .getDirectParentConnections(rootNode) + .filter((cn) => cn.type === NodeConnectionTypes.Main); + + tool.rewireOutputLogTo = NodeConnectionTypes.AiTool; + + for (const cn of allIncomingConnection) { + graph.addConnection({ from: cn.from, to: tool }); + } + + graph.removeNode(rootNode); + + return graph; +} diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index b735103ce0..3a75ffdf4c 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -68,6 +68,8 @@ import { recreateNodeExecutionStack, handleCycles, filterDisabledNodes, + rewireGraph, + isTool, } from './partial-execution-utils'; import { RoutingNode } from './routing-node'; import { TriggersAndPollers } from './triggers-and-pollers'; @@ -356,41 +358,47 @@ export class WorkflowExecute { let graph = DirectedGraph.fromWorkflow(workflow); - // Edge Case 1: - // Support executing a single node that is not connected to a trigger - const destinationHasNoParents = graph.getDirectParentConnections(destination).length === 0; - if (destinationHasNoParents) { - // short cut here, only create a subgraph and the stacks - graph = findSubgraph({ - graph: filterDisabledNodes(graph), - destination, - trigger: destination, - }); - const filteredNodes = graph.getNodes(); - runData = cleanRunData(runData, graph, new Set([destination])); - const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {}); + // Partial execution of nodes as tools + if (isTool(destination, workflow.nodeTypes)) { + graph = rewireGraph(destination, graph); + workflow = graph.toWorkflow({ ...workflow }); + } else { + // Edge Case 1: + // Support executing a single node that is not connected to a trigger + const destinationHasNoParents = graph.getDirectParentConnections(destination).length === 0; + if (destinationHasNoParents) { + // short cut here, only create a subgraph and the stacks + graph = findSubgraph({ + graph: filterDisabledNodes(graph), + destination, + trigger: destination, + }); + const filteredNodes = graph.getNodes(); + runData = cleanRunData(runData, graph, new Set([destination])); + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {}); - this.status = 'running'; - this.runExecutionData = { - startData: { - destinationNode: destinationNodeName, - runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), - }, - resultData: { - runData, - pinData, - }, - executionData: { - contextData: {}, - nodeExecutionStack, - metadata: {}, - waitingExecution, - waitingExecutionSource, - }, - }; + this.status = 'running'; + this.runExecutionData = { + startData: { + destinationNode: destinationNodeName, + runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), + }, + resultData: { + runData, + pinData, + }, + executionData: { + contextData: {}, + nodeExecutionStack, + metadata: {}, + waitingExecution, + waitingExecutionSource, + }, + }; - return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); + return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); + } } // 1. Find the Trigger @@ -1704,6 +1712,13 @@ export class WorkflowExecute { main: nodeSuccessData, } as ITaskDataConnections; + // Rewire output data log to the given connectionType + if (executionNode.rewireOutputLogTo) { + taskData.data = { + [executionNode.rewireOutputLogTo]: nodeSuccessData, + } as ITaskDataConnections; + } + this.runExecutionData.resultData.runData[executionNode.name].push(taskData); if (this.runExecutionData.waitTill) { diff --git a/packages/core/test/helpers/constants.ts b/packages/core/test/helpers/constants.ts index 6a7f28b1af..2a54b7d5c9 100644 --- a/packages/core/test/helpers/constants.ts +++ b/packages/core/test/helpers/constants.ts @@ -43,6 +43,25 @@ export const predefinedNodesTypes: INodeTypeData = { type: new SplitInBatches(), sourcePath: '', }, + 'n8n-nodes-base.toolTest': { + sourcePath: '', + type: { + description: { + displayName: 'Test tool', + name: 'toolTest', + group: ['transform'], + version: 1, + description: 'Test tool', + inputs: [], + defaults: { + name: 'Test Tool', + color: '#0000FF', + }, + outputs: [NodeConnectionTypes.AiTool], + properties: [], + }, + }, + }, 'n8n-nodes-base.versionTest': { sourcePath: '', type: { diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 8c6ee6917e..bbd8c1fddd 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1135,6 +1135,7 @@ export interface INode { credentials?: INodeCredentials; webhookId?: string; extendsCredential?: string; + rewireOutputLogTo?: NodeConnectionType; } export interface IPinData {