fix(Call n8n Workflow Tool Node): Return all items from subexecution (#13393)

This commit is contained in:
Mutasem Aldmour
2025-03-03 15:33:02 +01:00
committed by GitHub
parent b7f7121cb8
commit d9e3cfe13f
5 changed files with 114 additions and 15 deletions

View File

@@ -1,7 +1,6 @@
import type { CallbackManagerForToolRun } from '@langchain/core/callbacks/manager';
import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools';
import get from 'lodash/get';
import isObject from 'lodash/isObject';
import { isArray, isObject } from 'lodash';
import type { SetField, SetNodeOptions } from 'n8n-nodes-base/dist/nodes/Set/v2/helpers/interfaces';
import * as manual from 'n8n-nodes-base/dist/nodes/Set/v2/manual.mode';
import { getCurrentWorkflowInputData } from 'n8n-nodes-base/dist/utils/workflowInputsResourceMapping/GenericFunctions';
@@ -29,6 +28,10 @@ import {
} from 'n8n-workflow';
import { z } from 'zod';
function isNodeExecutionData(data: unknown): data is INodeExecutionData[] {
return isArray(data) && Boolean(data.length) && isObject(data[0]) && 'json' in data[0];
}
/**
Main class for creating the Workflow tool
Processes the node parameters and creates AI Agent tool capable of executing n8n workflows
@@ -43,10 +46,16 @@ export class WorkflowToolService {
// Sub-workflow execution id, will be set after the sub-workflow is executed
private subExecutionId: string | undefined;
constructor(private baseContext: ISupplyDataFunctions) {
private returnAllItems: boolean = false;
constructor(
private baseContext: ISupplyDataFunctions,
options?: { returnAllItems: boolean },
) {
const subWorkflowInputs = this.baseContext.getNode().parameters
.workflowInputs as ResourceMapperValue;
this.useSchema = (subWorkflowInputs?.schema ?? []).length > 0;
this.returnAllItems = options?.returnAllItems ?? false;
}
// Creates the tool based on the provided parameters
@@ -65,7 +74,7 @@ export class WorkflowToolService {
const toolHandler = async (
query: string | IDataObject,
runManager?: CallbackManagerForToolRun,
): Promise<string> => {
): Promise<IDataObject | IDataObject[] | string> => {
const localRunIndex = runIndex++;
// We need to clone the context here to handle runIndex correctly
// Otherwise the runIndex will be shared between different executions
@@ -74,10 +83,23 @@ export class WorkflowToolService {
runIndex: localRunIndex,
inputData: [[{ json: { query } }]],
});
try {
const response = await this.runFunction(context, query, itemIndex, runManager);
const processedResponse = this.handleToolResponse(response);
let responseData: INodeExecutionData[];
if (isNodeExecutionData(response)) {
responseData = response;
} else {
const reParsedData = jsonParse<IDataObject>(processedResponse, {
fallbackValue: { response: processedResponse },
});
responseData = [{ json: reParsedData }];
}
// Once the sub-workflow is executed, add the output data to the context
// This will be used to link the sub-workflow execution in the parent workflow
let metadata: ITaskMetadata | undefined;
@@ -89,13 +111,11 @@ export class WorkflowToolService {
},
};
}
const json = jsonParse<IDataObject>(processedResponse, {
fallbackValue: { response: processedResponse },
});
void context.addOutputData(
NodeConnectionType.AiTool,
localRunIndex,
[[{ json }]],
[responseData],
metadata,
);
@@ -126,6 +146,14 @@ export class WorkflowToolService {
return response.toString();
}
if (isNodeExecutionData(response)) {
return JSON.stringify(
response.map((item) => item.json),
null,
2,
);
}
if (isObject(response)) {
return JSON.stringify(response, null, 2);
}
@@ -148,7 +176,7 @@ export class WorkflowToolService {
items: INodeExecutionData[],
workflowProxy: IWorkflowDataProxyData,
runManager?: CallbackManagerForToolRun,
): Promise<{ response: string; subExecutionId: string }> {
): Promise<{ response: string | IDataObject | INodeExecutionData[]; subExecutionId: string }> {
let receivedData: ExecuteWorkflowData;
try {
receivedData = await context.executeWorkflow(workflowInfo, items, runManager?.getChild(), {
@@ -163,7 +191,12 @@ export class WorkflowToolService {
throw new NodeOperationError(context.getNode(), error as Error);
}
const response: string | undefined = get(receivedData, 'data[0][0].json') as string | undefined;
let response: IDataObject | INodeExecutionData[] | undefined;
if (this.returnAllItems) {
response = receivedData?.data?.[0]?.length ? receivedData.data[0] : undefined;
} else {
response = receivedData?.data?.[0]?.[0]?.json;
}
if (response === undefined) {
throw new NodeOperationError(
context.getNode(),
@@ -183,7 +216,7 @@ export class WorkflowToolService {
query: string | IDataObject,
itemIndex: number,
runManager?: CallbackManagerForToolRun,
): Promise<string> {
): Promise<string | IDataObject | INodeExecutionData[]> {
const source = context.getNodeParameter('source', itemIndex) as string;
const workflowProxy = context.getWorkflowDataProxy(0);
@@ -304,7 +337,10 @@ export class WorkflowToolService {
private async createStructuredTool(
name: string,
description: string,
func: (query: string | IDataObject, runManager?: CallbackManagerForToolRun) => Promise<string>,
func: (
query: string | IDataObject,
runManager?: CallbackManagerForToolRun,
) => Promise<string | IDataObject | IDataObject[]>,
): Promise<DynamicStructuredTool | DynamicTool> {
const collectedArguments = await this.extractFromAIParameters();