refactor(core): Shovel around more of AI code (no-changelog) (#12218)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-12-16 13:46:19 +01:00
committed by GitHub
parent a8e7a05856
commit 2ce1644d01
138 changed files with 590 additions and 529 deletions

View File

@@ -1,13 +1,6 @@
import { DynamicStructuredTool } from '@langchain/core/tools';
import type {
IExecuteFunctions,
INode,
INodeParameters,
INodeType,
ISupplyDataFunctions,
ITaskDataConnections,
} from 'n8n-workflow';
import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type { IDataObject, INode, INodeType } from 'n8n-workflow';
import { jsonParse, NodeOperationError } from 'n8n-workflow';
import { z } from 'zod';
type AllowedTypes = 'string' | 'number' | 'boolean' | 'json';
@@ -21,7 +14,7 @@ interface FromAIArgument {
type ParserOptions = {
node: INode;
nodeType: INodeType;
contextFactory: (runIndex: number, inputData: ITaskDataConnections) => ISupplyDataFunctions;
handleToolInvocation: (toolArgs: IDataObject) => Promise<unknown>;
};
/**
@@ -31,8 +24,6 @@ type ParserOptions = {
* generating Zod schemas, and creating LangChain tools.
*/
class AIParametersParser {
private runIndex = 0;
/**
* Constructs an instance of AIParametersParser.
*/
@@ -291,39 +282,19 @@ class AIParametersParser {
}
/**
* Generates a description for a node based on the provided parameters.
* @param node The node type.
* @param nodeParameters The parameters of the node.
* @returns A string description for the node.
* Retrieves and validates the Zod schema for the tool.
*
* This method:
* 1. Collects all $fromAI arguments from node parameters
* 2. Validates parameter keys against naming rules
* 3. Checks for duplicate keys and ensures consistency
* 4. Generates a Zod schema from the validated arguments
*
* @throws {NodeOperationError} When parameter keys are invalid or when duplicate keys have inconsistent definitions
* @returns {z.ZodObject} A Zod schema object representing the structure and validation rules for the node parameters
*/
private getDescription(node: INodeType, nodeParameters: INodeParameters): string {
const manualDescription = nodeParameters.toolDescription as string;
if (nodeParameters.descriptionType === 'auto') {
const resource = nodeParameters.resource as string;
const operation = nodeParameters.operation as string;
let description = node.description.description;
if (resource) {
description += `\n Resource: ${resource}`;
}
if (operation) {
description += `\n Operation: ${operation}`;
}
return description.trim();
}
if (nodeParameters.descriptionType === 'manual') {
return manualDescription ?? node.description.description;
}
return node.description.description;
}
/**
* Creates a DynamicStructuredTool from a node.
* @returns A DynamicStructuredTool instance.
*/
public createTool(): DynamicStructuredTool {
const { node, nodeType } = this.options;
private getSchema() {
const { node } = this.options;
const collectedArguments: FromAIArgument[] = [];
this.traverseNodeParameters(node.parameters, collectedArguments);
@@ -381,43 +352,56 @@ class AIParametersParser {
return acc;
}, {});
const schema = z.object(schemaObj).required();
const description = this.getDescription(nodeType, node.parameters);
return z.object(schemaObj).required();
}
/**
* Generates a description for a node based on the provided parameters.
* @param node The node type.
* @param nodeParameters The parameters of the node.
* @returns A string description for the node.
*/
private getDescription(): string {
const { node, nodeType } = this.options;
const manualDescription = node.parameters.toolDescription as string;
if (node.parameters.descriptionType === 'auto') {
const resource = node.parameters.resource as string;
const operation = node.parameters.operation as string;
let description = nodeType.description.description;
if (resource) {
description += `\n Resource: ${resource}`;
}
if (operation) {
description += `\n Operation: ${operation}`;
}
return description.trim();
}
if (node.parameters.descriptionType === 'manual') {
return manualDescription ?? nodeType.description.description;
}
return nodeType.description.description;
}
/**
* Creates a DynamicStructuredTool from a node.
* @returns A DynamicStructuredTool instance.
*/
public createTool(): DynamicStructuredTool {
const { node, nodeType } = this.options;
const schema = this.getSchema();
const description = this.getDescription();
const nodeName = node.name.replace(/ /g, '_');
const name = nodeName || nodeType.description.name;
const tool = new DynamicStructuredTool({
return new DynamicStructuredTool({
name,
description,
schema,
func: async (toolArgs: z.infer<typeof schema>) => {
const currentRunIndex = this.runIndex++;
const context = this.options.contextFactory(currentRunIndex, {});
context.addInputData(NodeConnectionType.AiTool, [[{ json: toolArgs }]]);
try {
// Execute the node with the proxied context
const result = await nodeType.execute?.call(context as IExecuteFunctions);
// Process and map the results
const mappedResults = result?.[0]?.flatMap((item) => item.json);
// Add output data to the context
context.addOutputData(NodeConnectionType.AiTool, currentRunIndex, [
[{ json: { response: mappedResults } }],
]);
// Return the stringified results
return JSON.stringify(mappedResults);
} catch (error) {
const nodeError = new NodeOperationError(this.options.node, error as Error);
context.addOutputData(NodeConnectionType.AiTool, currentRunIndex, nodeError);
return 'Error during node execution: ' + nodeError.description;
}
},
func: async (toolArgs: z.infer<typeof schema>) =>
await this.options.handleToolInvocation(toolArgs),
});
return tool;
}
}

View File

@@ -76,6 +76,7 @@ import type {
WebhookType,
SchedulingFunctions,
SupplyData,
AINodeConnectionType,
} from 'n8n-workflow';
import {
NodeConnectionType,
@@ -2018,7 +2019,7 @@ export async function getInputConnectionData(
executeData: IExecuteData,
mode: WorkflowExecuteMode,
closeFunctions: CloseFunction[],
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
itemIndex: number,
abortSignal?: AbortSignal,
): Promise<unknown> {
@@ -2098,10 +2099,41 @@ export async function getInputConnectionData(
if (!connectedNodeType.supplyData) {
if (connectedNodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
/**
* This keeps track of how many times this specific AI tool node has been invoked.
* It is incremented on every invocation of the tool to keep the output of each invocation separate from each other.
*/
let toolRunIndex = 0;
const supplyData = createNodeAsTool({
node: connectedNode,
nodeType: connectedNodeType,
contextFactory,
handleToolInvocation: async (toolArgs) => {
const runIndex = toolRunIndex++;
const context = contextFactory(runIndex, {});
context.addInputData(NodeConnectionType.AiTool, [[{ json: toolArgs }]]);
try {
// Execute the sub-node with the proxied context
const result = await connectedNodeType.execute?.call(
context as unknown as IExecuteFunctions,
);
// Process and map the results
const mappedResults = result?.[0]?.flatMap((item) => item.json);
// Add output data to the context
context.addOutputData(NodeConnectionType.AiTool, runIndex, [
[{ json: { response: mappedResults } }],
]);
// Return the stringified results
return JSON.stringify(mappedResults);
} catch (error) {
const nodeError = new NodeOperationError(connectedNode, error as Error);
context.addOutputData(NodeConnectionType.AiTool, runIndex, nodeError);
return 'Error during node execution: ' + nodeError.description;
}
},
});
nodes.push(supplyData);
} else {

View File

@@ -1691,7 +1691,7 @@ export class WorkflowExecute {
}
}
if (nodeSuccessData === null && !this.runExecutionData.waitTill!) {
if (nodeSuccessData === null && !this.runExecutionData.waitTill) {
// If null gets returned it means that the node did succeed
// but did not have any data. So the branch should end
// (meaning the nodes afterwards should not be processed)
@@ -1818,7 +1818,7 @@ export class WorkflowExecute {
this.runExecutionData.resultData.runData[executionNode.name].push(taskData);
if (this.runExecutionData.waitTill!) {
if (this.runExecutionData.waitTill) {
await this.executeHook('nodeExecuteAfter', [
executionNode.name,
taskData,
@@ -2199,7 +2199,7 @@ export class WorkflowExecute {
if (executionError.message?.includes('canceled')) {
fullRunData.status = 'canceled';
}
} else if (this.runExecutionData.waitTill!) {
} else if (this.runExecutionData.waitTill) {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
Logger.debug(`Workflow execution will wait until ${this.runExecutionData.waitTill}`, {
workflowId: workflow.id,

View File

@@ -1,4 +1,5 @@
import type {
AINodeConnectionType,
CallbackManager,
CloseFunction,
IExecuteData,
@@ -149,7 +150,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
}
async getInputConnectionData(
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
itemIndex: number,
): Promise<unknown> {
return await getInputConnectionData.call(

View File

@@ -1,5 +1,6 @@
import get from 'lodash/get';
import type {
AINodeConnectionType,
CloseFunction,
ExecutionBaseError,
IExecuteData,
@@ -12,10 +13,11 @@ import type {
ITaskDataConnections,
ITaskMetadata,
IWorkflowExecuteAdditionalData,
NodeConnectionType,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ApplicationError, NodeConnectionType, createDeferredPromise } from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import {
@@ -107,7 +109,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
}
async getInputConnectionData(
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
itemIndex: number,
): Promise<unknown> {
return await getInputConnectionData.call(
@@ -137,7 +139,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
/** @deprecated create a context object with inputData for every runIndex */
addInputData(
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
data: INodeExecutionData[][],
): { index: number } {
const nodeName = this.node.name;
@@ -166,9 +168,9 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
/** @deprecated Switch to WorkflowExecute to store output on runExecutionData.resultData.runData */
addOutputData(
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
currentNodeRunIndex: number,
data: INodeExecutionData[][],
data: INodeExecutionData[][] | ExecutionBaseError,
metadata?: ITaskMetadata,
): void {
const nodeName = this.node.name;
@@ -192,17 +194,11 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
async addExecutionDataFunctions(
type: 'input' | 'output',
data: INodeExecutionData[][] | ExecutionBaseError,
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
sourceNodeName: string,
currentNodeRunIndex: number,
metadata?: ITaskMetadata,
): Promise<void> {
if (connectionType === NodeConnectionType.Main) {
throw new ApplicationError('Setting type is not supported for main connection', {
extra: { type },
});
}
const {
additionalData,
runExecutionData,
@@ -258,23 +254,16 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
}
runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteBefore', {
executionId: additionalData.executionId,
nodeName,
});
}
await additionalData.hooks?.executeHookFunctions('nodeExecuteBefore', [nodeName]);
} else {
// Outputs
taskData.executionTime = new Date().getTime() - taskData.startTime;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteAfter', {
executionId: additionalData.executionId,
nodeName,
data: taskData,
});
}
await additionalData.hooks?.executeHookFunctions('nodeExecuteAfter', [
nodeName,
taskData,
this.runExecutionData,
]);
if (get(runExecutionData, 'executionData.metadata', undefined) === undefined) {
runExecutionData.executionData!.metadata = {};

View File

@@ -1,5 +1,6 @@
import type { Request, Response } from 'express';
import type {
AINodeConnectionType,
CloseFunction,
ICredentialDataDecryptedObject,
IDataObject,
@@ -11,7 +12,6 @@ import type {
IWebhookData,
IWebhookFunctions,
IWorkflowExecuteAdditionalData,
NodeConnectionType,
WebhookType,
Workflow,
WorkflowExecuteMode,
@@ -139,7 +139,7 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc
}
async getInputConnectionData(
connectionType: NodeConnectionType,
connectionType: AINodeConnectionType,
itemIndex: number,
): Promise<unknown> {
// To be able to use expressions like "$json.sessionId" set the