feat(Structured Output Parser Node): Refactor Output Parsers and Improve Error Handling (#11148)

This commit is contained in:
oleg
2024-10-22 10:46:58 +02:00
committed by GitHub
parent 4dde772814
commit 45274f2e7f
20 changed files with 1061 additions and 383 deletions

View File

@@ -1,12 +1,12 @@
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow';
import type { BaseChatMessageHistory } from '@langchain/core/chat_history';
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { BaseOutputParser } from '@langchain/core/output_parsers';
import type { BaseLLM } from '@langchain/core/language_models/llms';
import type { BaseMessage } from '@langchain/core/messages';
import type { Tool } from '@langchain/core/tools';
import type { BaseLLM } from '@langchain/core/language_models/llms';
import type { BaseChatMemory } from 'langchain/memory';
import type { BaseChatMessageHistory } from '@langchain/core/chat_history';
import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow';
import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow';
import { N8nTool } from './N8nTool';
function hasMethods<T>(obj: unknown, ...methodNames: Array<string | symbol>): obj is T {
@@ -66,21 +66,6 @@ export function isToolsInstance(model: unknown): model is Tool {
return namespace.includes('tools');
}
export async function getOptionalOutputParsers(
ctx: IExecuteFunctions,
): Promise<Array<BaseOutputParser<unknown>>> {
let outputParsers: BaseOutputParser[] = [];
if (ctx.getNodeParameter('hasOutputParser', 0, true) === true) {
outputParsers = (await ctx.getInputConnectionData(
NodeConnectionType.AiOutputParser,
0,
)) as BaseOutputParser[];
}
return outputParsers;
}
export function getPromptInputByType(options: {
ctx: IExecuteFunctions;
i: number;

View File

@@ -1,24 +1,21 @@
import { NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type { Tool } from '@langchain/core/tools';
import type { BaseMessage } from '@langchain/core/messages';
import type { InputValues, MemoryVariables, OutputValues } from '@langchain/core/memory';
import type { BaseChatMessageHistory } from '@langchain/core/chat_history';
import type { BaseCallbackConfig, Callbacks } from '@langchain/core/callbacks/manager';
import { Embeddings } from '@langchain/core/embeddings';
import { VectorStore } from '@langchain/core/vectorstores';
import type { Document } from '@langchain/core/documents';
import { TextSplitter } from '@langchain/textsplitters';
import type { BaseChatMemory } from '@langchain/community/memory/chat_memory';
import type { BaseCallbackConfig, Callbacks } from '@langchain/core/callbacks/manager';
import type { BaseChatMessageHistory } from '@langchain/core/chat_history';
import type { Document } from '@langchain/core/documents';
import { Embeddings } from '@langchain/core/embeddings';
import type { InputValues, MemoryVariables, OutputValues } from '@langchain/core/memory';
import type { BaseMessage } from '@langchain/core/messages';
import { BaseRetriever } from '@langchain/core/retrievers';
import { BaseOutputParser, OutputParserException } from '@langchain/core/output_parsers';
import { isObject } from 'lodash';
import type { Tool } from '@langchain/core/tools';
import { VectorStore } from '@langchain/core/vectorstores';
import { TextSplitter } from '@langchain/textsplitters';
import type { BaseDocumentLoader } from 'langchain/dist/document_loaders/base';
import { N8nJsonLoader } from './N8nJsonLoader';
import { N8nBinaryLoader } from './N8nBinaryLoader';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import { NodeOperationError, NodeConnectionType } from 'n8n-workflow';
import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory } from './helpers';
import { N8nBinaryLoader } from './N8nBinaryLoader';
import { N8nJsonLoader } from './N8nJsonLoader';
const errorsMap: { [key: string]: { message: string; description: string } } = {
'You exceeded your current quota, please check your plan and billing details.': {
@@ -40,10 +37,6 @@ export async function callMethodAsync<T>(
try {
return await parameters.method.call(this, ...parameters.arguments);
} catch (e) {
// Langchain checks for OutputParserException to run retry chain
// for auto-fixing the output so skip wrapping in this case
if (e instanceof OutputParserException) throw e;
// Propagate errors from sub-nodes
if (e.functionality === 'configuration-node') throw e;
const connectedNode = parameters.executeFunctions.getNode();
@@ -63,7 +56,9 @@ export async function callMethodAsync<T>(
error,
);
if (error.message) {
error.description = error.message;
if (!error.description) {
error.description = error.message;
}
throw error;
}
throw new NodeOperationError(
@@ -109,7 +104,6 @@ export function logWrapper(
| Tool
| BaseChatMemory
| BaseChatMessageHistory
| BaseOutputParser
| BaseRetriever
| Embeddings
| Document[]
@@ -219,44 +213,6 @@ export function logWrapper(
}
}
// ========== BaseOutputParser ==========
if (originalInstance instanceof BaseOutputParser) {
if (prop === 'parse' && 'parse' in target) {
return async (text: string | Record<string, unknown>): Promise<unknown> => {
connectionType = NodeConnectionType.AiOutputParser;
const stringifiedText = isObject(text) ? JSON.stringify(text) : text;
const { index } = executeFunctions.addInputData(connectionType, [
[{ json: { action: 'parse', text: stringifiedText } }],
]);
try {
const response = (await callMethodAsync.call(target, {
executeFunctions,
connectionType,
currentNodeRunIndex: index,
method: target[prop],
arguments: [stringifiedText],
})) as object;
void logAiEvent(executeFunctions, 'ai-output-parsed', { text, response });
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'parse', response } }],
]);
return response;
} catch (error) {
void logAiEvent(executeFunctions, 'ai-output-parsed', {
text,
response: error.message ?? error,
});
executeFunctions.addOutputData(connectionType, index, [
[{ json: { action: 'parse', response: error.message ?? error } }],
]);
throw error;
}
};
}
}
// ========== BaseRetriever ==========
if (originalInstance instanceof BaseRetriever) {
if (prop === 'getRelevantDocuments' && 'getRelevantDocuments' in target) {

View File

@@ -0,0 +1,55 @@
import { BaseOutputParser, OutputParserException } from '@langchain/core/output_parsers';
export class N8nItemListOutputParser extends BaseOutputParser<string[]> {
lc_namespace = ['n8n-nodes-langchain', 'output_parsers', 'list_items'];
private numberOfItems: number = 3;
private separator: string;
constructor(options: { numberOfItems?: number; separator?: string }) {
super();
if (options.numberOfItems && options.numberOfItems > 0) {
this.numberOfItems = options.numberOfItems;
}
this.separator = options.separator ?? '\\n';
if (this.separator === '\\n') {
this.separator = '\n';
}
}
async parse(text: string): Promise<string[]> {
const response = text
.split(this.separator)
.map((item) => item.trim())
.filter((item) => item);
if (this.numberOfItems && response.length < this.numberOfItems) {
// Only error if to few items got returned, if there are to many we can autofix it
throw new OutputParserException(
`Wrong number of items returned. Expected ${this.numberOfItems} items but got ${response.length} items instead.`,
);
}
return response.slice(0, this.numberOfItems);
}
getFormatInstructions(): string {
const instructions = `Your response should be a list of ${
this.numberOfItems ? this.numberOfItems + ' ' : ''
}items separated by`;
const numberOfExamples = this.numberOfItems;
const examples: string[] = [];
for (let i = 1; i <= numberOfExamples; i++) {
examples.push(`item${i}`);
}
return `${instructions} "${this.separator}" (for example: "${examples.join(this.separator)}")`;
}
getSchema() {
return;
}
}

View File

@@ -0,0 +1,95 @@
import type { Callbacks } from '@langchain/core/callbacks/manager';
import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import type { AIMessage } from '@langchain/core/messages';
import { BaseOutputParser } from '@langchain/core/output_parsers';
import type { IExecuteFunctions } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import type { N8nStructuredOutputParser } from './N8nStructuredOutputParser';
import { NAIVE_FIX_PROMPT } from './prompt';
import { logAiEvent } from '../helpers';
export class N8nOutputFixingParser extends BaseOutputParser {
private context: IExecuteFunctions;
private model: BaseLanguageModel;
private outputParser: N8nStructuredOutputParser;
lc_namespace = ['langchain', 'output_parsers', 'fix'];
constructor(
context: IExecuteFunctions,
model: BaseLanguageModel,
outputParser: N8nStructuredOutputParser,
) {
super();
this.context = context;
this.model = model;
this.outputParser = outputParser;
}
getRetryChain() {
return NAIVE_FIX_PROMPT.pipe(this.model);
}
/**
* Attempts to parse the completion string using the output parser.
* If the initial parse fails, it tries to fix the output using a retry chain.
* @param completion The string to be parsed
* @returns The parsed response
* @throws Error if both parsing attempts fail
*/
async parse(completion: string, callbacks?: Callbacks) {
const { index } = this.context.addInputData(NodeConnectionType.AiOutputParser, [
[{ json: { action: 'parse', text: completion } }],
]);
try {
// First attempt to parse the completion
const response = await this.outputParser.parse(completion, callbacks, (e) => e);
void logAiEvent(this.context, 'ai-output-parsed', { text: completion, response });
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response } }],
]);
return response;
} catch (error) {
try {
// Second attempt: use retry chain to fix the output
const result = (await this.getRetryChain().invoke({
completion,
error,
instructions: this.getFormatInstructions(),
})) as AIMessage;
const resultText = result.content.toString();
const parsed = await this.outputParser.parse(resultText, callbacks);
// Add the successfully parsed output to the context
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response: parsed } }],
]);
return parsed;
} catch (autoParseError) {
// If both attempts fail, add the error to the output and throw
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, autoParseError);
throw autoParseError;
}
}
}
/**
* Method to get the format instructions for the parser.
* @returns The format instructions for the parser.
*/
getFormatInstructions() {
return this.outputParser.getFormatInstructions();
}
getSchema() {
return this.outputParser.schema;
}
}

View File

@@ -0,0 +1,26 @@
import type { IExecuteFunctions } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { N8nItemListOutputParser } from './N8nItemListOutputParser';
import { N8nOutputFixingParser } from './N8nOutputFixingParser';
import { N8nStructuredOutputParser } from './N8nStructuredOutputParser';
export type N8nOutputParser =
| N8nOutputFixingParser
| N8nStructuredOutputParser
| N8nItemListOutputParser;
export { N8nOutputFixingParser, N8nItemListOutputParser, N8nStructuredOutputParser };
export async function getOptionalOutputParsers(ctx: IExecuteFunctions): Promise<N8nOutputParser[]> {
let outputParsers: N8nOutputParser[] = [];
if (ctx.getNodeParameter('hasOutputParser', 0, true) === true) {
outputParsers = (await ctx.getInputConnectionData(
NodeConnectionType.AiOutputParser,
0,
)) as N8nOutputParser[];
}
return outputParsers;
}

View File

@@ -0,0 +1,116 @@
import type { Callbacks } from '@langchain/core/callbacks/manager';
import { StructuredOutputParser } from 'langchain/output_parsers';
import get from 'lodash/get';
import type { IExecuteFunctions } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { z } from 'zod';
import { logAiEvent } from '../helpers';
const STRUCTURED_OUTPUT_KEY = '__structured__output';
const STRUCTURED_OUTPUT_OBJECT_KEY = '__structured__output__object';
const STRUCTURED_OUTPUT_ARRAY_KEY = '__structured__output__array';
export class N8nStructuredOutputParser extends StructuredOutputParser<
z.ZodType<object, z.ZodTypeDef, object>
> {
context: IExecuteFunctions;
constructor(context: IExecuteFunctions, zodSchema: z.ZodSchema<object>) {
super(zodSchema);
this.context = context;
}
lc_namespace = ['langchain', 'output_parsers', 'structured'];
async parse(
text: string,
_callbacks?: Callbacks,
errorMapper?: (error: Error) => Error,
): Promise<object> {
const { index } = this.context.addInputData(NodeConnectionType.AiOutputParser, [
[{ json: { action: 'parse', text } }],
]);
try {
const parsed = await super.parse(text);
const result = (get(parsed, [STRUCTURED_OUTPUT_KEY, STRUCTURED_OUTPUT_OBJECT_KEY]) ??
get(parsed, [STRUCTURED_OUTPUT_KEY, STRUCTURED_OUTPUT_ARRAY_KEY]) ??
get(parsed, STRUCTURED_OUTPUT_KEY) ??
parsed) as Record<string, unknown>;
void logAiEvent(this.context, 'ai-output-parsed', { text, response: result });
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response: result } }],
]);
return result;
} catch (e) {
const nodeError = new NodeOperationError(
this.context.getNode(),
"Model output doesn't fit required format",
{
description:
"To continue the execution when this happens, change the 'On Error' parameter in the root node's settings",
},
);
void logAiEvent(this.context, 'ai-output-parsed', {
text,
response: e.message ?? e,
});
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, nodeError);
if (errorMapper) {
throw errorMapper(e);
}
throw nodeError;
}
}
static async fromZodJsonSchema(
zodSchema: z.ZodSchema<object>,
nodeVersion: number,
context: IExecuteFunctions,
): Promise<N8nStructuredOutputParser> {
let returnSchema: z.ZodType<object, z.ZodTypeDef, object>;
if (nodeVersion === 1) {
returnSchema = z.object({
[STRUCTURED_OUTPUT_KEY]: z
.object({
[STRUCTURED_OUTPUT_OBJECT_KEY]: zodSchema.optional(),
[STRUCTURED_OUTPUT_ARRAY_KEY]: z.array(zodSchema).optional(),
})
.describe(
`Wrapper around the output data. It can only contain ${STRUCTURED_OUTPUT_OBJECT_KEY} or ${STRUCTURED_OUTPUT_ARRAY_KEY} but never both.`,
)
.refine(
(data) => {
// Validate that one and only one of the properties exists
return (
Boolean(data[STRUCTURED_OUTPUT_OBJECT_KEY]) !==
Boolean(data[STRUCTURED_OUTPUT_ARRAY_KEY])
);
},
{
message:
'One and only one of __structured__output__object and __structured__output__array should be present.',
path: [STRUCTURED_OUTPUT_KEY],
},
),
});
} else {
returnSchema = z.object({
output: zodSchema.optional(),
});
}
return new N8nStructuredOutputParser(context, returnSchema);
}
getSchema() {
return this.schema;
}
}

View File

@@ -0,0 +1,20 @@
import { PromptTemplate } from '@langchain/core/prompts';
export const NAIVE_FIX_TEMPLATE = `Instructions:
--------------
{instructions}
--------------
Completion:
--------------
{completion}
--------------
Above, the Completion did not satisfy the constraints given in the Instructions.
Error:
--------------
{error}
--------------
Please try again. Please only respond with an answer that satisfies the constraints laid out in the Instructions:`;
export const NAIVE_FIX_PROMPT = PromptTemplate.fromTemplate(NAIVE_FIX_TEMPLATE);