feat(core): Improve debugging of sub-workflows (#11602)

This commit is contained in:
Mutasem Aldmour
2024-11-14 23:04:43 +01:00
committed by GitHub
parent f4ca4b792f
commit fd3254d587
36 changed files with 1843 additions and 265 deletions

View File

@@ -10,6 +10,7 @@ import type {
INodeTypeDescription,
SupplyData,
INodeParameterResourceLocator,
ExecuteWorkflowData,
} from 'n8n-workflow';
import { BaseRetriever, type BaseRetrieverInput } from '@langchain/core/retrievers';
@@ -293,6 +294,8 @@ export class RetrieverWorkflow implements INodeType {
};
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const workflowProxy = this.getWorkflowDataProxy(0);
class WorkflowRetriever extends BaseRetriever {
lc_namespace = ['n8n-nodes-langchain', 'retrievers', 'workflow'];
@@ -349,6 +352,9 @@ export class RetrieverWorkflow implements INodeType {
},
);
}
// same as current workflow
baseMetadata.workflowId = workflowProxy.$workflow.id;
}
const rawData: IDataObject = { query };
@@ -384,21 +390,29 @@ export class RetrieverWorkflow implements INodeType {
const items = [newItem] as INodeExecutionData[];
let receivedItems: INodeExecutionData[][];
let receivedData: ExecuteWorkflowData;
try {
receivedItems = (await this.executeFunctions.executeWorkflow(
receivedData = await this.executeFunctions.executeWorkflow(
workflowInfo,
items,
config?.getChild(),
)) as INodeExecutionData[][];
{
parentExecution: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
},
);
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
throw new NodeOperationError(this.executeFunctions.getNode(), error as Error);
}
const receivedItems = receivedData.data?.[0] ?? [];
const returnData: Document[] = [];
for (const [index, itemData] of receivedItems[0].entries()) {
for (const [index, itemData] of receivedItems.entries()) {
const pageContent = objectToString(itemData.json);
returnData.push(
new Document({
@@ -406,6 +420,7 @@ export class RetrieverWorkflow implements INodeType {
metadata: {
...baseMetadata,
itemIndex: index,
executionId: receivedData.executionId,
},
}),
);

View File

@@ -14,8 +14,10 @@ import type {
ISupplyDataFunctions,
SupplyData,
ExecutionError,
ExecuteWorkflowData,
IDataObject,
INodeParameterResourceLocator,
ITaskMetadata,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, jsonParse } from 'n8n-workflow';
@@ -358,9 +360,14 @@ export class ToolWorkflow implements INodeType {
};
async supplyData(this: ISupplyDataFunctions, itemIndex: number): Promise<SupplyData> {
const workflowProxy = this.getWorkflowDataProxy(0);
const name = this.getNodeParameter('name', itemIndex) as string;
const description = this.getNodeParameter('description', itemIndex) as string;
let subExecutionId: string | undefined;
let subWorkflowId: string | undefined;
const useSchema = this.getNodeParameter('specifyInputSchema', itemIndex) as boolean;
let tool: DynamicTool | DynamicStructuredTool | undefined = undefined;
@@ -396,11 +403,16 @@ export class ToolWorkflow implements INodeType {
) as INodeParameterResourceLocator;
workflowInfo.id = value as string;
}
subWorkflowId = workflowInfo.id;
} else if (source === 'parameter') {
// Read workflow from parameter
const workflowJson = this.getNodeParameter('workflowJson', itemIndex) as string;
try {
workflowInfo.code = JSON.parse(workflowJson) as IWorkflowBase;
// subworkflow is same as parent workflow
subWorkflowId = workflowProxy.$workflow.id;
} catch (error) {
throw new NodeOperationError(
this.getNode(),
@@ -440,13 +452,15 @@ export class ToolWorkflow implements INodeType {
const items = [newItem] as INodeExecutionData[];
let receivedData: INodeExecutionData;
let receivedData: ExecuteWorkflowData;
try {
receivedData = (await this.executeWorkflow(
workflowInfo,
items,
runManager?.getChild(),
)) as INodeExecutionData;
receivedData = await this.executeWorkflow(workflowInfo, items, runManager?.getChild(), {
parentExecution: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
});
subExecutionId = receivedData.executionId;
} catch (error) {
// Make sure a valid error gets returned that can by json-serialized else it will
// not show up in the frontend
@@ -454,6 +468,7 @@ export class ToolWorkflow implements INodeType {
}
const response: string | undefined = get(receivedData, [
'data',
0,
0,
'json',
@@ -503,10 +518,25 @@ export class ToolWorkflow implements INodeType {
response = `There was an error: "${executionError.message}"`;
}
let metadata: ITaskMetadata | undefined;
if (subExecutionId && subWorkflowId) {
metadata = {
subExecution: {
executionId: subExecutionId,
workflowId: subWorkflowId,
},
};
}
if (executionError) {
void this.addOutputData(NodeConnectionType.AiTool, index, executionError);
void this.addOutputData(NodeConnectionType.AiTool, index, executionError, metadata);
} else {
void this.addOutputData(NodeConnectionType.AiTool, index, [[{ json: { response } }]]);
void this.addOutputData(
NodeConnectionType.AiTool,
index,
[[{ json: { response } }]],
metadata,
);
}
return response;
};

View File

@@ -10,7 +10,12 @@ import type { Tool } from '@langchain/core/tools';
import { VectorStore } from '@langchain/core/vectorstores';
import { TextSplitter } from '@langchain/textsplitters';
import type { BaseDocumentLoader } from 'langchain/dist/document_loaders/base';
import type { IExecuteFunctions, INodeExecutionData, ISupplyDataFunctions } from 'n8n-workflow';
import type {
IExecuteFunctions,
INodeExecutionData,
ISupplyDataFunctions,
ITaskMetadata,
} from 'n8n-workflow';
import { NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory } from './helpers';
@@ -220,8 +225,24 @@ export function logWrapper(
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;
const executionId: string | undefined = response[0]?.metadata?.executionId as string;
const workflowId: string | undefined = response[0]?.metadata?.workflowId as string;
const metadata: ITaskMetadata = {};
if (executionId && workflowId) {
metadata.subExecution = {
executionId,
workflowId,
};
}
logAiEvent(executeFunctions, 'ai-documents-retrieved', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
executeFunctions.addOutputData(
connectionType,
index,
[[{ json: { response } }]],
metadata,
);
return response;
};
}