refactor(core): All calls to supplyData should use a distinct context type (no-changelog) (#11421)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™
2024-10-28 11:37:23 +01:00
committed by GitHub
parent 04c075a46b
commit 8f5fe05a92
70 changed files with 560 additions and 308 deletions

View File

@@ -1,6 +1,11 @@
import { pipeline } from 'stream/promises';
import { createWriteStream } from 'fs';
import type { IBinaryData, IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type {
IBinaryData,
IExecuteFunctions,
INodeExecutionData,
ISupplyDataFunctions,
} from 'n8n-workflow';
import { NodeOperationError, BINARY_ENCODING } from 'n8n-workflow';
import type { TextSplitter } from '@langchain/textsplitters';
@@ -26,25 +31,12 @@ const SUPPORTED_MIME_TYPES = {
};
export class N8nBinaryLoader {
private context: IExecuteFunctions;
private optionsPrefix: string;
private binaryDataKey: string;
private textSplitter?: TextSplitter;
constructor(
context: IExecuteFunctions,
optionsPrefix = '',
binaryDataKey = '',
textSplitter?: TextSplitter,
) {
this.context = context;
this.textSplitter = textSplitter;
this.optionsPrefix = optionsPrefix;
this.binaryDataKey = binaryDataKey;
}
private context: IExecuteFunctions | ISupplyDataFunctions,
private optionsPrefix = '',
private binaryDataKey = '',
private textSplitter?: TextSplitter,
) {}
async processAll(items?: INodeExecutionData[]): Promise<Document[]> {
const docs: Document[] = [];

View File

@@ -1,4 +1,9 @@
import { type IExecuteFunctions, type INodeExecutionData, NodeOperationError } from 'n8n-workflow';
import {
type IExecuteFunctions,
type INodeExecutionData,
type ISupplyDataFunctions,
NodeOperationError,
} from 'n8n-workflow';
import type { TextSplitter } from '@langchain/textsplitters';
import type { Document } from '@langchain/core/documents';
@@ -7,17 +12,11 @@ import { TextLoader } from 'langchain/document_loaders/fs/text';
import { getMetadataFiltersValues } from './helpers';
export class N8nJsonLoader {
private context: IExecuteFunctions;
private optionsPrefix: string;
private textSplitter?: TextSplitter;
constructor(context: IExecuteFunctions, optionsPrefix = '', textSplitter?: TextSplitter) {
this.context = context;
this.textSplitter = textSplitter;
this.optionsPrefix = optionsPrefix;
}
constructor(
private context: IExecuteFunctions | ISupplyDataFunctions,
private optionsPrefix = '',
private textSplitter?: TextSplitter,
) {}
async processAll(items?: INodeExecutionData[]): Promise<Document[]> {
const docs: Document[] = [];

View File

@@ -1,6 +1,6 @@
import type { DynamicStructuredToolInput } from '@langchain/core/tools';
import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools';
import type { IExecuteFunctions, IDataObject } from 'n8n-workflow';
import type { ISupplyDataFunctions, IDataObject } from 'n8n-workflow';
import { NodeConnectionType, jsonParse, NodeOperationError } from 'n8n-workflow';
import { StructuredOutputParser } from 'langchain/output_parsers';
import type { ZodTypeAny } from 'zod';
@@ -45,12 +45,11 @@ ALL parameters marked as required must be provided`;
};
export class N8nTool extends DynamicStructuredTool {
private context: IExecuteFunctions;
constructor(context: IExecuteFunctions, fields: DynamicStructuredToolInput) {
constructor(
private context: ISupplyDataFunctions,
fields: DynamicStructuredToolInput,
) {
super(fields);
this.context = context;
}
asDynamicTool(): DynamicTool {

View File

@@ -5,7 +5,13 @@ import type { BaseMessage } from '@langchain/core/messages';
import type { Tool } from '@langchain/core/tools';
import type { BaseChatMemory } from 'langchain/memory';
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow';
import type {
AiEvent,
IDataObject,
IExecuteFunctions,
ISupplyDataFunctions,
IWebhookFunctions,
} from 'n8n-workflow';
import { N8nTool } from './N8nTool';
@@ -20,7 +26,7 @@ function hasMethods<T>(obj: unknown, ...methodNames: Array<string | symbol>): ob
}
export function getMetadataFiltersValues(
ctx: IExecuteFunctions,
ctx: IExecuteFunctions | ISupplyDataFunctions,
itemIndex: number,
): Record<string, never> | undefined {
const options = ctx.getNodeParameter('options', itemIndex, {});
@@ -93,7 +99,7 @@ export function getPromptInputByType(options: {
}
export function getSessionId(
ctx: IExecuteFunctions | IWebhookFunctions,
ctx: ISupplyDataFunctions | IWebhookFunctions,
itemIndex: number,
selectorKey = 'sessionIdType',
autoSelect = 'fromInput',
@@ -133,13 +139,13 @@ export function getSessionId(
return sessionId;
}
export async function logAiEvent(
executeFunctions: IExecuteFunctions,
export function logAiEvent(
executeFunctions: IExecuteFunctions | ISupplyDataFunctions,
event: AiEvent,
data?: IDataObject,
) {
try {
await executeFunctions.logAiEvent(event, data ? jsonStringify(data) : undefined);
executeFunctions.logAiEvent(event, data ? jsonStringify(data) : undefined);
} catch (error) {
executeFunctions.logger.debug(`Error logging AI event: ${event}`);
}

View File

@@ -10,7 +10,7 @@ import type { Tool } from '@langchain/core/tools';
import { VectorStore } from '@langchain/core/vectorstores';
import { TextSplitter } from '@langchain/textsplitters';
import type { BaseDocumentLoader } from 'langchain/dist/document_loaders/base';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData, ISupplyDataFunctions } from 'n8n-workflow';
import { NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory } from './helpers';
@@ -27,7 +27,7 @@ const errorsMap: { [key: string]: { message: string; description: string } } = {
export async function callMethodAsync<T>(
this: T,
parameters: {
executeFunctions: IExecuteFunctions;
executeFunctions: IExecuteFunctions | ISupplyDataFunctions;
connectionType: NodeConnectionType;
currentNodeRunIndex: number;
method: (...args: any[]) => Promise<unknown>;
@@ -113,7 +113,7 @@ export function logWrapper(
| VectorStore
| N8nBinaryLoader
| N8nJsonLoader,
executeFunctions: IExecuteFunctions,
executeFunctions: IExecuteFunctions | ISupplyDataFunctions,
) {
return new Proxy(originalInstance, {
get: (target, prop) => {
@@ -190,7 +190,7 @@ export function logWrapper(
const payload = { action: 'getMessages', response };
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
void logAiEvent(executeFunctions, 'ai-messages-retrieved-from-memory', { response });
logAiEvent(executeFunctions, 'ai-messages-retrieved-from-memory', { response });
return response;
};
} else if (prop === 'addMessage' && 'addMessage' in target) {
@@ -207,7 +207,7 @@ export function logWrapper(
arguments: [message],
});
void logAiEvent(executeFunctions, 'ai-message-added-to-memory', { message });
logAiEvent(executeFunctions, 'ai-message-added-to-memory', { message });
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
};
}
@@ -233,7 +233,7 @@ export function logWrapper(
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;
void logAiEvent(executeFunctions, 'ai-documents-retrieved', { query });
logAiEvent(executeFunctions, 'ai-documents-retrieved', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@@ -258,7 +258,7 @@ export function logWrapper(
arguments: [documents],
})) as number[][];
void logAiEvent(executeFunctions, 'ai-document-embedded');
logAiEvent(executeFunctions, 'ai-document-embedded');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@@ -278,7 +278,7 @@ export function logWrapper(
method: target[prop],
arguments: [query],
})) as number[];
void logAiEvent(executeFunctions, 'ai-query-embedded');
logAiEvent(executeFunctions, 'ai-query-embedded');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@@ -323,7 +323,7 @@ export function logWrapper(
arguments: [item, itemIndex],
})) as number[];
void logAiEvent(executeFunctions, 'ai-document-processed');
logAiEvent(executeFunctions, 'ai-document-processed');
executeFunctions.addOutputData(connectionType, index, [
[{ json: { response }, pairedItem: { item: itemIndex } }],
]);
@@ -349,7 +349,7 @@ export function logWrapper(
arguments: [text],
})) as string[];
void logAiEvent(executeFunctions, 'ai-text-split');
logAiEvent(executeFunctions, 'ai-text-split');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@@ -373,7 +373,7 @@ export function logWrapper(
arguments: [query],
})) as string;
void logAiEvent(executeFunctions, 'ai-tool-called', { query, response });
logAiEvent(executeFunctions, 'ai-tool-called', { query, response });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@@ -403,7 +403,7 @@ export function logWrapper(
arguments: [query, k, filter, _callbacks],
})) as Array<Document<Record<string, any>>>;
void logAiEvent(executeFunctions, 'ai-vector-store-searched', { query });
logAiEvent(executeFunctions, 'ai-vector-store-searched', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;

View File

@@ -2,7 +2,7 @@ import type { Callbacks } from '@langchain/core/callbacks/manager';
import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import type { AIMessage } from '@langchain/core/messages';
import { BaseOutputParser } from '@langchain/core/output_parsers';
import type { IExecuteFunctions } from 'n8n-workflow';
import type { ISupplyDataFunctions } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import type { N8nStructuredOutputParser } from './N8nStructuredOutputParser';
@@ -10,23 +10,14 @@ import { NAIVE_FIX_PROMPT } from './prompt';
import { logAiEvent } from '../helpers';
export class N8nOutputFixingParser extends BaseOutputParser {
private context: IExecuteFunctions;
private model: BaseLanguageModel;
private outputParser: N8nStructuredOutputParser;
lc_namespace = ['langchain', 'output_parsers', 'fix'];
constructor(
context: IExecuteFunctions,
model: BaseLanguageModel,
outputParser: N8nStructuredOutputParser,
private context: ISupplyDataFunctions,
private model: BaseLanguageModel,
private outputParser: N8nStructuredOutputParser,
) {
super();
this.context = context;
this.model = model;
this.outputParser = outputParser;
}
getRetryChain() {
@@ -48,7 +39,7 @@ export class N8nOutputFixingParser extends BaseOutputParser {
try {
// First attempt to parse the completion
const response = await this.outputParser.parse(completion, callbacks, (e) => e);
void logAiEvent(this.context, 'ai-output-parsed', { text: completion, response });
logAiEvent(this.context, 'ai-output-parsed', { text: completion, response });
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response } }],

View File

@@ -1,7 +1,7 @@
import type { Callbacks } from '@langchain/core/callbacks/manager';
import { StructuredOutputParser } from 'langchain/output_parsers';
import get from 'lodash/get';
import type { IExecuteFunctions } from 'n8n-workflow';
import type { ISupplyDataFunctions } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { z } from 'zod';
@@ -14,11 +14,11 @@ const STRUCTURED_OUTPUT_ARRAY_KEY = '__structured__output__array';
export class N8nStructuredOutputParser extends StructuredOutputParser<
z.ZodType<object, z.ZodTypeDef, object>
> {
context: IExecuteFunctions;
constructor(context: IExecuteFunctions, zodSchema: z.ZodSchema<object>) {
constructor(
private context: ISupplyDataFunctions,
zodSchema: z.ZodSchema<object>,
) {
super(zodSchema);
this.context = context;
}
lc_namespace = ['langchain', 'output_parsers', 'structured'];
@@ -39,7 +39,7 @@ export class N8nStructuredOutputParser extends StructuredOutputParser<
get(parsed, STRUCTURED_OUTPUT_KEY) ??
parsed) as Record<string, unknown>;
void logAiEvent(this.context, 'ai-output-parsed', { text, response: result });
logAiEvent(this.context, 'ai-output-parsed', { text, response: result });
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response: result } }],
@@ -56,7 +56,7 @@ export class N8nStructuredOutputParser extends StructuredOutputParser<
},
);
void logAiEvent(this.context, 'ai-output-parsed', {
logAiEvent(this.context, 'ai-output-parsed', {
text,
response: e.message ?? e,
});
@@ -73,7 +73,7 @@ export class N8nStructuredOutputParser extends StructuredOutputParser<
static async fromZodJsonSchema(
zodSchema: z.ZodSchema<object>,
nodeVersion: number,
context: IExecuteFunctions,
context: ISupplyDataFunctions,
): Promise<N8nStructuredOutputParser> {
let returnSchema: z.ZodType<object, z.ZodTypeDef, object>;
if (nodeVersion === 1) {