diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts index 887017ccaf..d171efe95f 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ConversationalAgent/execute.ts @@ -1,19 +1,19 @@ -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; -import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; - -import { initializeAgentExecutorWithOptions } from 'langchain/agents'; import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import type { BaseOutputParser } from '@langchain/core/output_parsers'; import { PromptTemplate } from '@langchain/core/prompts'; +import { initializeAgentExecutorWithOptions } from 'langchain/agents'; import { CombiningOutputParser } from 'langchain/output_parsers'; +import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; + import { isChatInstance, getPromptInputByType, - getOptionalOutputParsers, getConnectedTools, } from '../../../../../utils/helpers'; -import { getTracingConfig } from '../../../../../utils/tracing'; +import { getOptionalOutputParsers } from '../../../../../utils/output_parsers/N8nOutputParser'; import { throwIfToolSchema } from '../../../../../utils/schemaParsing'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function conversationalAgentExecute( this: IExecuteFunctions, diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts index 12e1dbda4e..0518234c29 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/OpenAiFunctionsAgent/execute.ts @@ -1,3 +1,10 @@ +import type { BaseOutputParser } from '@langchain/core/output_parsers'; +import { PromptTemplate } from '@langchain/core/prompts'; +import { ChatOpenAI } from '@langchain/openai'; +import type { AgentExecutorInput } from 'langchain/agents'; +import { AgentExecutor, OpenAIAgent } from 'langchain/agents'; +import { BufferMemory, type BaseChatMemory } from 'langchain/memory'; +import { CombiningOutputParser } from 'langchain/output_parsers'; import { type IExecuteFunctions, type INodeExecutionData, @@ -5,18 +12,8 @@ import { NodeOperationError, } from 'n8n-workflow'; -import type { AgentExecutorInput } from 'langchain/agents'; -import { AgentExecutor, OpenAIAgent } from 'langchain/agents'; -import type { BaseOutputParser } from '@langchain/core/output_parsers'; -import { PromptTemplate } from '@langchain/core/prompts'; -import { CombiningOutputParser } from 'langchain/output_parsers'; -import { BufferMemory, type BaseChatMemory } from 'langchain/memory'; -import { ChatOpenAI } from '@langchain/openai'; -import { - getConnectedTools, - getOptionalOutputParsers, - getPromptInputByType, -} from '../../../../../utils/helpers'; +import { getConnectedTools, getPromptInputByType } from '../../../../../utils/helpers'; +import { getOptionalOutputParsers } from '../../../../../utils/output_parsers/N8nOutputParser'; import { getTracingConfig } from '../../../../../utils/tracing'; export async function openAiFunctionsAgentExecute( diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts index a4ae1a0f1c..f10207c715 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/PlanAndExecuteAgent/execute.ts @@ -1,3 +1,8 @@ +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; +import type { BaseOutputParser } from '@langchain/core/output_parsers'; +import { PromptTemplate } from '@langchain/core/prompts'; +import { PlanAndExecuteAgentExecutor } from 'langchain/experimental/plan_and_execute'; +import { CombiningOutputParser } from 'langchain/output_parsers'; import { type IExecuteFunctions, type INodeExecutionData, @@ -5,18 +10,10 @@ import { NodeOperationError, } from 'n8n-workflow'; -import type { BaseOutputParser } from '@langchain/core/output_parsers'; -import { PromptTemplate } from '@langchain/core/prompts'; -import { CombiningOutputParser } from 'langchain/output_parsers'; -import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import { PlanAndExecuteAgentExecutor } from 'langchain/experimental/plan_and_execute'; -import { - getConnectedTools, - getOptionalOutputParsers, - getPromptInputByType, -} from '../../../../../utils/helpers'; -import { getTracingConfig } from '../../../../../utils/tracing'; +import { getConnectedTools, getPromptInputByType } from '../../../../../utils/helpers'; +import { getOptionalOutputParsers } from '../../../../../utils/output_parsers/N8nOutputParser'; import { throwIfToolSchema } from '../../../../../utils/schemaParsing'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function planAndExecuteAgentExecute( this: IExecuteFunctions, diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts index 11a5acb040..5707baa9d6 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ReActAgent/execute.ts @@ -1,3 +1,9 @@ +import type { BaseLanguageModel } from '@langchain/core/language_models/base'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; +import type { BaseOutputParser } from '@langchain/core/output_parsers'; +import { PromptTemplate } from '@langchain/core/prompts'; +import { AgentExecutor, ChatAgent, ZeroShotAgent } from 'langchain/agents'; +import { CombiningOutputParser } from 'langchain/output_parsers'; import { type IExecuteFunctions, type INodeExecutionData, @@ -5,20 +11,14 @@ import { NodeOperationError, } from 'n8n-workflow'; -import { AgentExecutor, ChatAgent, ZeroShotAgent } from 'langchain/agents'; -import type { BaseLanguageModel } from '@langchain/core/language_models/base'; -import type { BaseOutputParser } from '@langchain/core/output_parsers'; -import { PromptTemplate } from '@langchain/core/prompts'; -import { CombiningOutputParser } from 'langchain/output_parsers'; -import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { getConnectedTools, - getOptionalOutputParsers, getPromptInputByType, isChatInstance, } from '../../../../../utils/helpers'; -import { getTracingConfig } from '../../../../../utils/tracing'; +import { getOptionalOutputParsers } from '../../../../../utils/output_parsers/N8nOutputParser'; import { throwIfToolSchema } from '../../../../../utils/schemaParsing'; +import { getTracingConfig } from '../../../../../utils/tracing'; export async function reActAgentAgentExecute( this: IExecuteFunctions, diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts index 8a9db05083..84d775d0f5 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts @@ -1,7 +1,6 @@ import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import { HumanMessage } from '@langchain/core/messages'; import type { BaseMessage } from '@langchain/core/messages'; -import type { BaseOutputParser, StructuredOutputParser } from '@langchain/core/output_parsers'; import type { BaseMessagePromptTemplateLike } from '@langchain/core/prompts'; import { ChatPromptTemplate } from '@langchain/core/prompts'; import { RunnableSequence } from '@langchain/core/runnables'; @@ -9,7 +8,6 @@ import type { Tool } from '@langchain/core/tools'; import { DynamicStructuredTool } from '@langchain/core/tools'; import type { AgentAction, AgentFinish } from 'langchain/agents'; import { AgentExecutor, createToolCallingAgent } from 'langchain/agents'; -import { OutputFixingParser } from 'langchain/output_parsers'; import { omit } from 'lodash'; import { BINARY_ENCODING, jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; @@ -20,24 +18,16 @@ import { SYSTEM_MESSAGE } from './prompt'; import { isChatInstance, getPromptInputByType, - getOptionalOutputParsers, getConnectedTools, } from '../../../../../utils/helpers'; +import { + getOptionalOutputParsers, + type N8nOutputParser, +} from '../../../../../utils/output_parsers/N8nOutputParser'; -function getOutputParserSchema(outputParser: BaseOutputParser): ZodObject { - const parserType = outputParser.lc_namespace[outputParser.lc_namespace.length - 1]; - let schema: ZodObject; - - if (parserType === 'structured') { - // If the output parser is a structured output parser, we will use the schema from the parser - schema = (outputParser as StructuredOutputParser>).schema; - } else if (parserType === 'fix' && outputParser instanceof OutputFixingParser) { - // If the output parser is a fixing parser, we will use the schema from the connected structured output parser - schema = (outputParser.parser as StructuredOutputParser>).schema; - } else { - // If the output parser is not a structured output parser, we will use a fallback schema - schema = z.object({ text: z.string() }); - } +function getOutputParserSchema(outputParser: N8nOutputParser): ZodObject { + const schema = + (outputParser.getSchema() as ZodObject) ?? z.object({ text: z.string() }); return schema; } @@ -205,10 +195,9 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise step.tool === 'format_final_response'); if (responseParserTool) { const toolInput = responseParserTool?.toolInput; - const returnValues = (await outputParser.parse(toolInput as unknown as string)) as Record< - string, - unknown - >; + // Check if the tool input is a string or an object and convert it to a string + const parserInput = toolInput instanceof Object ? JSON.stringify(toolInput) : toolInput; + const returnValues = (await outputParser.parse(parserInput)) as Record; return handleParsedStepOutput(returnValues); } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts index 71022f84fe..a080862c0f 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts @@ -1,9 +1,17 @@ +import type { BaseLanguageModel } from '@langchain/core/language_models/base'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; +import { HumanMessage } from '@langchain/core/messages'; import { - ApplicationError, - NodeApiError, - NodeConnectionType, - NodeOperationError, -} from 'n8n-workflow'; + AIMessagePromptTemplate, + PromptTemplate, + SystemMessagePromptTemplate, + HumanMessagePromptTemplate, + ChatPromptTemplate, +} from '@langchain/core/prompts'; +import { ChatGoogleGenerativeAI } from '@langchain/google-genai'; +import { ChatOllama } from '@langchain/ollama'; +import { LLMChain } from 'langchain/chains'; +import { CombiningOutputParser } from 'langchain/output_parsers'; import type { IBinaryData, IDataObject, @@ -12,28 +20,17 @@ import type { INodeType, INodeTypeDescription, } from 'n8n-workflow'; +import { + ApplicationError, + NodeApiError, + NodeConnectionType, + NodeOperationError, +} from 'n8n-workflow'; -import type { BaseLanguageModel } from '@langchain/core/language_models/base'; -import { - AIMessagePromptTemplate, - PromptTemplate, - SystemMessagePromptTemplate, - HumanMessagePromptTemplate, - ChatPromptTemplate, -} from '@langchain/core/prompts'; -import type { BaseOutputParser } from '@langchain/core/output_parsers'; -import { CombiningOutputParser } from 'langchain/output_parsers'; -import { LLMChain } from 'langchain/chains'; -import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import { HumanMessage } from '@langchain/core/messages'; -import { ChatGoogleGenerativeAI } from '@langchain/google-genai'; -import { ChatOllama } from '@langchain/ollama'; +import { getPromptInputByType, isChatInstance } from '../../../utils/helpers'; +import type { N8nOutputParser } from '../../../utils/output_parsers/N8nOutputParser'; +import { getOptionalOutputParsers } from '../../../utils/output_parsers/N8nOutputParser'; import { getTemplateNoticeField } from '../../../utils/sharedFields'; -import { - getOptionalOutputParsers, - getPromptInputByType, - isChatInstance, -} from '../../../utils/helpers'; import { getTracingConfig } from '../../../utils/tracing'; import { getCustomErrorMessage as getCustomOpenAiErrorMessage, @@ -189,7 +186,7 @@ async function getChain( itemIndex: number, query: string, llm: BaseLanguageModel, - outputParsers: BaseOutputParser[], + outputParsers: N8nOutputParser[], messages?: MessagesTemplate[], ): Promise { const chatTemplate: ChatPromptTemplate | PromptTemplate = await getChainPromptTemplate( diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/OutputParserAutofixing.node.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/OutputParserAutofixing.node.ts index 97c86506b7..7d676c7607 100644 --- a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/OutputParserAutofixing.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/OutputParserAutofixing.node.ts @@ -1,15 +1,11 @@ -/* eslint-disable n8n-nodes-base/node-dirname-against-convention */ -import { - NodeConnectionType, - type IExecuteFunctions, - type INodeType, - type INodeTypeDescription, - type SupplyData, -} from 'n8n-workflow'; -import { OutputFixingParser } from 'langchain/output_parsers'; -import type { BaseOutputParser } from '@langchain/core/output_parsers'; import type { BaseLanguageModel } from '@langchain/core/language_models/base'; -import { logWrapper } from '../../../utils/logWrapper'; +import { NodeConnectionType } from 'n8n-workflow'; +import type { IExecuteFunctions, INodeType, INodeTypeDescription, SupplyData } from 'n8n-workflow'; + +import { + N8nOutputFixingParser, + type N8nStructuredOutputParser, +} from '../../../utils/output_parsers/N8nOutputParser'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; export class OutputParserAutofixing implements INodeType { @@ -75,12 +71,12 @@ export class OutputParserAutofixing implements INodeType { const outputParser = (await this.getInputConnectionData( NodeConnectionType.AiOutputParser, itemIndex, - )) as BaseOutputParser; + )) as N8nStructuredOutputParser; - const parser = OutputFixingParser.fromLLM(model, outputParser); + const parser = new N8nOutputFixingParser(this, model, outputParser); return { - response: logWrapper(parser, this), + response: parser, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/test/OutputParserAutofixing.node.test.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/test/OutputParserAutofixing.node.test.ts new file mode 100644 index 0000000000..32d25d4f73 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserAutofixing/test/OutputParserAutofixing.node.test.ts @@ -0,0 +1,120 @@ +/* eslint-disable @typescript-eslint/unbound-method */ +/* eslint-disable @typescript-eslint/no-unsafe-call */ +import type { BaseLanguageModel } from '@langchain/core/language_models/base'; +import type { MockProxy } from 'jest-mock-extended'; +import { mock } from 'jest-mock-extended'; +import { normalizeItems } from 'n8n-core'; +import type { IExecuteFunctions, IWorkflowDataProxyData } from 'n8n-workflow'; +import { ApplicationError, NodeConnectionType } from 'n8n-workflow'; + +import { N8nOutputFixingParser } from '../../../../utils/output_parsers/N8nOutputParser'; +import type { N8nStructuredOutputParser } from '../../../../utils/output_parsers/N8nOutputParser'; +import { OutputParserAutofixing } from '../OutputParserAutofixing.node'; + +describe('OutputParserAutofixing', () => { + let outputParser: OutputParserAutofixing; + let thisArg: MockProxy; + let mockModel: MockProxy; + let mockStructuredOutputParser: MockProxy; + + beforeEach(() => { + outputParser = new OutputParserAutofixing(); + thisArg = mock({ + helpers: { normalizeItems }, + }); + mockModel = mock(); + mockStructuredOutputParser = mock(); + + thisArg.getWorkflowDataProxy.mockReturnValue(mock({ $input: mock() })); + thisArg.addInputData.mockReturnValue({ index: 0 }); + thisArg.addOutputData.mockReturnValue(); + thisArg.getInputConnectionData.mockImplementation(async (type: NodeConnectionType) => { + if (type === NodeConnectionType.AiLanguageModel) return mockModel; + if (type === NodeConnectionType.AiOutputParser) return mockStructuredOutputParser; + + throw new ApplicationError('Unexpected connection type'); + }); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + function getMockedRetryChain(output: string) { + return jest.fn().mockReturnValue({ + invoke: jest.fn().mockResolvedValue({ + content: output, + }), + }); + } + + it('should successfully parse valid output without needing to fix it', async () => { + const validOutput = { name: 'Alice', age: 25 }; + + mockStructuredOutputParser.parse.mockResolvedValueOnce(validOutput); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nOutputFixingParser; + }; + + // Ensure the response contains the output-fixing parser + expect(response).toBeDefined(); + expect(response).toBeInstanceOf(N8nOutputFixingParser); + + const result = await response.parse('{"name": "Alice", "age": 25}'); + + // Validate that the parser succeeds without retry + expect(result).toEqual(validOutput); + expect(mockStructuredOutputParser.parse).toHaveBeenCalledTimes(1); // Only one call to parse + }); + + it('should throw an error when both structured parser and fixing parser fail', async () => { + mockStructuredOutputParser.parse + .mockRejectedValueOnce(new Error('Invalid JSON')) // First attempt fails + .mockRejectedValueOnce(new Error('Fixing attempt failed')); // Second attempt fails + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nOutputFixingParser; + }; + + response.getRetryChain = getMockedRetryChain('{}'); + + await expect(response.parse('Invalid JSON string')).rejects.toThrow('Fixing attempt failed'); + expect(mockStructuredOutputParser.parse).toHaveBeenCalledTimes(2); + }); + + it('should reject on the first attempt and succeed on retry with the parsed content', async () => { + const validOutput = { name: 'Bob', age: 28 }; + + mockStructuredOutputParser.parse.mockRejectedValueOnce(new Error('Invalid JSON')); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nOutputFixingParser; + }; + + response.getRetryChain = getMockedRetryChain(JSON.stringify(validOutput)); + + mockStructuredOutputParser.parse.mockResolvedValueOnce(validOutput); + + const result = await response.parse('Invalid JSON string'); + + expect(result).toEqual(validOutput); + expect(mockStructuredOutputParser.parse).toHaveBeenCalledTimes(2); // First fails, second succeeds + }); + + it('should handle non-JSON formatted response from fixing parser', async () => { + mockStructuredOutputParser.parse.mockRejectedValueOnce(new Error('Invalid JSON')); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nOutputFixingParser; + }; + + response.getRetryChain = getMockedRetryChain('This is not JSON'); + + mockStructuredOutputParser.parse.mockRejectedValueOnce(new Error('Unexpected token')); + + // Expect the structured parser to throw an error on invalid JSON from retry + await expect(response.parse('Invalid JSON string')).rejects.toThrow('Unexpected token'); + expect(mockStructuredOutputParser.parse).toHaveBeenCalledTimes(2); // First fails, second tries and fails + }); +}); diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/OutputParserItemList.node.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/OutputParserItemList.node.ts index 24327b2970..cb67afb453 100644 --- a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/OutputParserItemList.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/OutputParserItemList.node.ts @@ -6,9 +6,9 @@ import { type INodeTypeDescription, type SupplyData, } from 'n8n-workflow'; -import { logWrapper } from '../../../utils/logWrapper'; + +import { N8nItemListOutputParser } from '../../../utils/output_parsers/N8nItemListOutputParser'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; -import { ItemListOutputParser } from './ItemListOutputParser'; export class OutputParserItemList implements INodeType { description: INodeTypeDescription = { @@ -86,10 +86,10 @@ export class OutputParserItemList implements INodeType { separator?: string; }; - const parser = new ItemListOutputParser(options); + const parser = new N8nItemListOutputParser(options); return { - response: logWrapper(parser, this), + response: parser, }; } } diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/test/OutputParserItemList.node.test.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/test/OutputParserItemList.node.test.ts new file mode 100644 index 0000000000..31e96077c4 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/test/OutputParserItemList.node.test.ts @@ -0,0 +1,123 @@ +import { mock } from 'jest-mock-extended'; +import { normalizeItems } from 'n8n-core'; +import { + ApplicationError, + type IExecuteFunctions, + type IWorkflowDataProxyData, +} from 'n8n-workflow'; + +import { N8nItemListOutputParser } from '../../../../utils/output_parsers/N8nItemListOutputParser'; +import { OutputParserItemList } from '../OutputParserItemList.node'; + +describe('OutputParserItemList', () => { + let outputParser: OutputParserItemList; + const thisArg = mock({ + helpers: { normalizeItems }, + }); + const workflowDataProxy = mock({ $input: mock() }); + + beforeEach(() => { + outputParser = new OutputParserItemList(); + thisArg.getWorkflowDataProxy.mockReturnValue(workflowDataProxy); + thisArg.addInputData.mockReturnValue({ index: 0 }); + thisArg.addOutputData.mockReturnValue(); + thisArg.getNodeParameter.mockReset(); + }); + + describe('supplyData', () => { + it('should create a parser with default options', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return {}; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + expect(response).toBeInstanceOf(N8nItemListOutputParser); + }); + + it('should create a parser with custom number of items', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return { numberOfItems: 5 }; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + expect(response).toBeInstanceOf(N8nItemListOutputParser); + expect((response as any).numberOfItems).toBe(5); + }); + + it('should create a parser with custom separator', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return { separator: ',' }; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + expect(response).toBeInstanceOf(N8nItemListOutputParser); + expect((response as any).separator).toBe(','); + }); + }); + + describe('parse', () => { + it('should parse a list with default separator', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return {}; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + const result = await (response as N8nItemListOutputParser).parse('item1\nitem2\nitem3'); + expect(result).toEqual(['item1', 'item2', 'item3']); + }); + + it('should parse a list with custom separator', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return { separator: ',' }; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + const result = await (response as N8nItemListOutputParser).parse('item1,item2,item3'); + expect(result).toEqual(['item1', 'item2', 'item3']); + }); + + it('should limit the number of items returned', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return { numberOfItems: 2 }; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + const result = await (response as N8nItemListOutputParser).parse( + 'item1\nitem2\nitem3\nitem4', + ); + expect(result).toEqual(['item1', 'item2']); + }); + + it('should throw an error if not enough items are returned', async () => { + thisArg.getNodeParameter.mockImplementation((parameterName) => { + if (parameterName === 'options') { + return { numberOfItems: 5 }; + } + throw new ApplicationError('Not implemented'); + }); + + const { response } = await outputParser.supplyData.call(thisArg, 0); + await expect( + (response as N8nItemListOutputParser).parse('item1\nitem2\nitem3'), + ).rejects.toThrow('Wrong number of items returned'); + }); + }); +}); diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts index 803a2c99b6..b5b6a5846c 100644 --- a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts @@ -1,8 +1,4 @@ -/* eslint-disable n8n-nodes-base/node-dirname-against-convention */ -import { OutputParserException } from '@langchain/core/output_parsers'; import type { JSONSchema7 } from 'json-schema'; -import { StructuredOutputParser } from 'langchain/output_parsers'; -import get from 'lodash/get'; import { jsonParse, type IExecuteFunctions, @@ -12,77 +8,17 @@ import { NodeOperationError, NodeConnectionType, } from 'n8n-workflow'; -import { z } from 'zod'; +import type { z } from 'zod'; import { inputSchemaField, jsonSchemaExampleField, schemaTypeField, } from '../../../utils/descriptions'; -import { logWrapper } from '../../../utils/logWrapper'; +import { N8nStructuredOutputParser } from '../../../utils/output_parsers/N8nOutputParser'; import { convertJsonSchemaToZod, generateSchema } from '../../../utils/schemaParsing'; import { getConnectionHintNoticeField } from '../../../utils/sharedFields'; -const STRUCTURED_OUTPUT_KEY = '__structured__output'; -const STRUCTURED_OUTPUT_OBJECT_KEY = '__structured__output__object'; -const STRUCTURED_OUTPUT_ARRAY_KEY = '__structured__output__array'; - -export class N8nStructuredOutputParser extends StructuredOutputParser { - async parse(text: string): Promise> { - try { - const parsed = (await super.parse(text)) as object; - - return ( - get(parsed, [STRUCTURED_OUTPUT_KEY, STRUCTURED_OUTPUT_OBJECT_KEY]) ?? - get(parsed, [STRUCTURED_OUTPUT_KEY, STRUCTURED_OUTPUT_ARRAY_KEY]) ?? - get(parsed, STRUCTURED_OUTPUT_KEY) ?? - parsed - ); - } catch (e) { - // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown - throw new OutputParserException(`Failed to parse. Text: "${text}". Error: ${e}`, text); - } - } - - static async fromZedSchema( - zodSchema: z.ZodSchema, - nodeVersion: number, - ): Promise>> { - let returnSchema: z.ZodSchema; - if (nodeVersion === 1) { - returnSchema = z.object({ - [STRUCTURED_OUTPUT_KEY]: z - .object({ - [STRUCTURED_OUTPUT_OBJECT_KEY]: zodSchema.optional(), - [STRUCTURED_OUTPUT_ARRAY_KEY]: z.array(zodSchema).optional(), - }) - .describe( - `Wrapper around the output data. It can only contain ${STRUCTURED_OUTPUT_OBJECT_KEY} or ${STRUCTURED_OUTPUT_ARRAY_KEY} but never both.`, - ) - .refine( - (data) => { - // Validate that one and only one of the properties exists - return ( - Boolean(data[STRUCTURED_OUTPUT_OBJECT_KEY]) !== - Boolean(data[STRUCTURED_OUTPUT_ARRAY_KEY]) - ); - }, - { - message: - 'One and only one of __structured__output__object and __structured__output__array should be present.', - path: [STRUCTURED_OUTPUT_KEY], - }, - ), - }); - } else { - returnSchema = z.object({ - output: zodSchema.optional(), - }); - } - - return N8nStructuredOutputParser.fromZodSchema(returnSchema); - } -} export class OutputParserStructured implements INodeType { description: INodeTypeDescription = { displayName: 'Structured Output Parser', @@ -205,9 +141,13 @@ export class OutputParserStructured implements INodeType { const zodSchema = convertJsonSchemaToZod>(jsonSchema); const nodeVersion = this.getNode().typeVersion; try { - const parser = await N8nStructuredOutputParser.fromZedSchema(zodSchema, nodeVersion); + const parser = await N8nStructuredOutputParser.fromZodJsonSchema( + zodSchema, + nodeVersion, + this, + ); return { - response: logWrapper(parser, this), + response: parser, }; } catch (error) { throw new NodeOperationError(this.getNode(), 'Error during parsing of JSON Schema.'); diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/test/OutputParserStructured.node.test.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/test/OutputParserStructured.node.test.ts index b4dd6708eb..af72c49d7e 100644 --- a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/test/OutputParserStructured.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/test/OutputParserStructured.node.test.ts @@ -1,8 +1,13 @@ -import type { IExecuteFunctions, INode, IWorkflowDataProxyData } from 'n8n-workflow'; import { mock } from 'jest-mock-extended'; import { normalizeItems } from 'n8n-core'; -import type { z } from 'zod'; -import type { StructuredOutputParser } from 'langchain/output_parsers'; +import { + jsonParse, + type IExecuteFunctions, + type INode, + type IWorkflowDataProxyData, +} from 'n8n-workflow'; + +import type { N8nStructuredOutputParser } from '../../../../utils/output_parsers/N8nStructuredOutputParser'; import { OutputParserStructured } from '../OutputParserStructured.node'; describe('OutputParserStructured', () => { @@ -11,139 +16,451 @@ describe('OutputParserStructured', () => { helpers: { normalizeItems }, }); const workflowDataProxy = mock({ $input: mock() }); - thisArg.getWorkflowDataProxy.mockReturnValue(workflowDataProxy); - thisArg.getNode.mockReturnValue(mock({ typeVersion: 1.1 })); - thisArg.addInputData.mockReturnValue({ index: 0 }); - thisArg.addOutputData.mockReturnValue(); beforeEach(() => { outputParser = new OutputParserStructured(); + thisArg.getWorkflowDataProxy.mockReturnValue(workflowDataProxy); + thisArg.addInputData.mockReturnValue({ index: 0 }); + thisArg.addOutputData.mockReturnValue(); }); describe('supplyData', () => { - it('should parse a valid JSON schema', async () => { - const schema = `{ - "type": "object", - "properties": { - "name": { - "type": "string" + describe('Version 1.1 and below', () => { + beforeEach(() => { + thisArg.getNode.mockReturnValue(mock({ typeVersion: 1.1 })); + }); + + it('should parse a complex nested schema', async () => { + const schema = `{ + "type": "object", + "properties": { + "user": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "details": { + "type": "object", + "properties": { + "age": { "type": "number" }, + "hobbies": { "type": "array", "items": { "type": "string" } } + } + } + } + }, + "timestamp": { "type": "string", "format": "date-time" } }, - "age": { - "type": "number" - } - }, - "required": ["name", "age"] - }`; - thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); - const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { - response: StructuredOutputParser>; - }; - const outputObject = { output: { name: 'Mac', age: 27 } }; - const parsersOutput = await response.parse(`Here's the output! - \`\`\`json - ${JSON.stringify(outputObject)} - \`\`\` - `); - - expect(parsersOutput).toEqual(outputObject); - }); - it('should handle missing required properties', async () => { - const schema = `{ - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "age": { - "type": "number" - } - }, - "required": ["name", "age"] - }`; - thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); - const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { - response: StructuredOutputParser>; - }; - const outputObject = { output: { name: 'Mac' } }; - - await expect( - response.parse(`Here's the output! - \`\`\`json - ${JSON.stringify(outputObject)} - \`\`\` - `), - ).rejects.toThrow('Required'); - }); - - it('should throw on wrong type', async () => { - const schema = `{ - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "age": { - "type": "number" - } - }, - "required": ["name", "age"] - }`; - thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); - const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { - response: StructuredOutputParser>; - }; - const outputObject = { output: { name: 'Mac', age: '27' } }; - - await expect( - response.parse(`Here's the output! - \`\`\`json - ${JSON.stringify(outputObject)} - \`\`\` - `), - ).rejects.toThrow('Expected number, received string'); - }); - - it('should parse array output', async () => { - const schema = `{ - "type": "object", - "properties": { - "myArr": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "age": { - "type": "number" - } + "required": ["user", "timestamp"] + }`; + thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { + output: { + user: { + name: 'Alice', + details: { + age: 30, + hobbies: ['reading', 'hiking'], }, - "required": ["name", "age"] - } - } - }, - "required": ["myArr"] - }`; - thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); - const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { - response: StructuredOutputParser>; - }; - const outputObject = { - output: { - myArr: [ - { name: 'Mac', age: 27 }, - { name: 'Alice', age: 25 }, - ], - }, - }; - const parsersOutput = await response.parse(`Here's the output! - \`\`\`json - ${JSON.stringify(outputObject)} - \`\`\` - `); + }, + timestamp: '2023-04-01T12:00:00Z', + }, + }; + const parsersOutput = await response.parse(`Here's the complex output: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); - expect(parsersOutput).toEqual(outputObject); + expect(parsersOutput).toEqual(outputObject); + }); + + it('should handle optional fields correctly', async () => { + const schema = `{ + "type": "object", + "properties": { + "name": { "type": "string" }, + "age": { "type": "number" }, + "email": { "type": "string", "format": "email" } + }, + "required": ["name"] + }`; + thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { + output: { + name: 'Bob', + email: 'bob@example.com', + }, + }; + const parsersOutput = await response.parse(`Here's the output with optional fields: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); + + expect(parsersOutput).toEqual(outputObject); + }); + + it('should handle arrays of objects', async () => { + const schema = `{ + "type": "object", + "properties": { + "users": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { "type": "number" }, + "name": { "type": "string" } + }, + "required": ["id", "name"] + } + } + }, + "required": ["users"] + }`; + thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { + output: { + users: [ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ], + }, + }; + const parsersOutput = await response.parse(`Here's the array output: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); + + expect(parsersOutput).toEqual(outputObject); + }); + + it('should handle empty objects', async () => { + const schema = `{ + "type": "object", + "properties": { + "data": { + "type": "object" + } + }, + "required": ["data"] + }`; + thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { + output: { + data: {}, + }, + }; + const parsersOutput = await response.parse(`Here's the empty object output: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); + + expect(parsersOutput).toEqual(outputObject); + }); + + it('should throw error for null values in non-nullable fields', async () => { + const schema = `{ + "type": "object", + "properties": { + "name": { "type": "string" }, + "age": { "type": "number" } + }, + "required": ["name", "age"] + }`; + thisArg.getNodeParameter.calledWith('jsonSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { + output: { + name: 'Charlie', + age: null, + }, + }; + + await expect( + response.parse( + `Here's the output with null value: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `, + undefined, + (e) => e, + ), + ).rejects.toThrow('Expected number, received null'); + }); + }); + + describe('Version 1.2 and above', () => { + beforeEach(() => { + thisArg.getNode.mockReturnValue(mock({ typeVersion: 1.2 })); + }); + + it('should parse output using schema generated from complex JSON example', async () => { + const jsonExample = `{ + "user": { + "name": "Alice", + "details": { + "age": 30, + "address": { + "street": "123 Main St", + "city": "Anytown", + "zipCode": "12345" + } + } + }, + "orders": [ + { + "id": "ORD-001", + "items": ["item1", "item2"], + "total": 50.99 + }, + { + "id": "ORD-002", + "items": ["item3"], + "total": 25.50 + } + ], + "isActive": true + }`; + thisArg.getNodeParameter.calledWith('schemaType', 0).mockReturnValueOnce('fromJson'); + thisArg.getNodeParameter + .calledWith('jsonSchemaExample', 0) + .mockReturnValueOnce(jsonExample); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + + const outputObject = { + output: jsonParse(jsonExample), + }; + + const parsersOutput = await response.parse(`Here's the complex output: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); + + expect(parsersOutput).toEqual(outputObject); + }); + + it('should validate enum values', async () => { + const inputSchema = `{ + "type": "object", + "properties": { + "color": { + "type": "string", + "enum": ["red", "green", "blue"] + } + }, + "required": ["color"] + }`; + thisArg.getNodeParameter.calledWith('schemaType', 0).mockReturnValueOnce('manual'); + thisArg.getNodeParameter.calledWith('inputSchema', 0).mockReturnValueOnce(inputSchema); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + + const validOutput = { + output: { + color: 'green', + }, + }; + + const invalidOutput = { + output: { + color: 'yellow', + }, + }; + + await expect( + response.parse(`Valid output: + \`\`\`json + ${JSON.stringify(validOutput)} + \`\`\` + `), + ).resolves.toEqual(validOutput); + + await expect( + response.parse( + `Invalid output: + \`\`\`json + ${JSON.stringify(invalidOutput)} + \`\`\` + `, + undefined, + (e) => e, + ), + ).rejects.toThrow(); + }); + + it('should handle recursive structures', async () => { + const inputSchema = `{ + "type": "object", + "properties": { + "name": { "type": "string" }, + "children": { + "type": "array", + "items": { "$ref": "#" } + } + }, + "required": ["name"] + }`; + thisArg.getNodeParameter.calledWith('schemaType', 0).mockReturnValueOnce('manual'); + thisArg.getNodeParameter.calledWith('inputSchema', 0).mockReturnValueOnce(inputSchema); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + + const outputObject = { + output: { + name: 'Root', + children: [ + { + name: 'Child1', + children: [{ name: 'Grandchild1' }, { name: 'Grandchild2' }], + }, + { + name: 'Child2', + }, + ], + }, + }; + + const parsersOutput = await response.parse(`Here's the recursive structure output: + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); + + expect(parsersOutput).toEqual(outputObject); + }); + + it('should handle missing required properties', async () => { + const schema = `{ + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "age": { + "type": "number" + } + }, + "required": ["name", "age"] + }`; + thisArg.getNodeParameter.calledWith('schemaType', 0).mockReturnValueOnce('manual'); + thisArg.getNodeParameter.calledWith('inputSchema', 0).mockReturnValueOnce(schema); + + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { output: { name: 'Mac' } }; + + await expect( + response.parse( + `Here's the output! + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `, + undefined, + (e) => e, + ), + ).rejects.toThrow('Required'); + }); + it('should throw on wrong type', async () => { + const schema = `{ + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "age": { + "type": "number" + } + }, + "required": ["name", "age"] + }`; + thisArg.getNodeParameter.calledWith('inputSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { output: { name: 'Mac', age: '27' } }; + + await expect( + response.parse( + `Here's the output! + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `, + undefined, + (e) => e, + ), + ).rejects.toThrow('Expected number, received string'); + }); + + it('should parse array output', async () => { + const schema = `{ + "type": "object", + "properties": { + "myArr": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "age": { + "type": "number" + } + }, + "required": ["name", "age"] + } + } + }, + "required": ["myArr"] + }`; + thisArg.getNodeParameter.calledWith('inputSchema', 0).mockReturnValueOnce(schema); + const { response } = (await outputParser.supplyData.call(thisArg, 0)) as { + response: N8nStructuredOutputParser; + }; + const outputObject = { + output: { + myArr: [ + { name: 'Mac', age: 27 }, + { name: 'Alice', age: 25 }, + ], + }, + }; + const parsersOutput = await response.parse(`Here's the output! + \`\`\`json + ${JSON.stringify(outputObject)} + \`\`\` + `); + + expect(parsersOutput).toEqual(outputObject); + }); }); }); }); diff --git a/packages/@n8n/nodes-langchain/utils/helpers.ts b/packages/@n8n/nodes-langchain/utils/helpers.ts index c70c8a8991..a760c32ba8 100644 --- a/packages/@n8n/nodes-langchain/utils/helpers.ts +++ b/packages/@n8n/nodes-langchain/utils/helpers.ts @@ -1,12 +1,12 @@ -import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; -import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow'; +import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import type { BaseOutputParser } from '@langchain/core/output_parsers'; +import type { BaseLLM } from '@langchain/core/language_models/llms'; import type { BaseMessage } from '@langchain/core/messages'; import type { Tool } from '@langchain/core/tools'; -import type { BaseLLM } from '@langchain/core/language_models/llms'; import type { BaseChatMemory } from 'langchain/memory'; -import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; +import { NodeConnectionType, NodeOperationError, jsonStringify } from 'n8n-workflow'; +import type { AiEvent, IDataObject, IExecuteFunctions, IWebhookFunctions } from 'n8n-workflow'; + import { N8nTool } from './N8nTool'; function hasMethods(obj: unknown, ...methodNames: Array): obj is T { @@ -66,21 +66,6 @@ export function isToolsInstance(model: unknown): model is Tool { return namespace.includes('tools'); } -export async function getOptionalOutputParsers( - ctx: IExecuteFunctions, -): Promise>> { - let outputParsers: BaseOutputParser[] = []; - - if (ctx.getNodeParameter('hasOutputParser', 0, true) === true) { - outputParsers = (await ctx.getInputConnectionData( - NodeConnectionType.AiOutputParser, - 0, - )) as BaseOutputParser[]; - } - - return outputParsers; -} - export function getPromptInputByType(options: { ctx: IExecuteFunctions; i: number; diff --git a/packages/@n8n/nodes-langchain/utils/logWrapper.ts b/packages/@n8n/nodes-langchain/utils/logWrapper.ts index 8707726183..c1ecd1799c 100644 --- a/packages/@n8n/nodes-langchain/utils/logWrapper.ts +++ b/packages/@n8n/nodes-langchain/utils/logWrapper.ts @@ -1,24 +1,21 @@ -import { NodeOperationError, NodeConnectionType } from 'n8n-workflow'; -import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; - -import type { Tool } from '@langchain/core/tools'; -import type { BaseMessage } from '@langchain/core/messages'; -import type { InputValues, MemoryVariables, OutputValues } from '@langchain/core/memory'; -import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; -import type { BaseCallbackConfig, Callbacks } from '@langchain/core/callbacks/manager'; - -import { Embeddings } from '@langchain/core/embeddings'; -import { VectorStore } from '@langchain/core/vectorstores'; -import type { Document } from '@langchain/core/documents'; -import { TextSplitter } from '@langchain/textsplitters'; import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; +import type { BaseCallbackConfig, Callbacks } from '@langchain/core/callbacks/manager'; +import type { BaseChatMessageHistory } from '@langchain/core/chat_history'; +import type { Document } from '@langchain/core/documents'; +import { Embeddings } from '@langchain/core/embeddings'; +import type { InputValues, MemoryVariables, OutputValues } from '@langchain/core/memory'; +import type { BaseMessage } from '@langchain/core/messages'; import { BaseRetriever } from '@langchain/core/retrievers'; -import { BaseOutputParser, OutputParserException } from '@langchain/core/output_parsers'; -import { isObject } from 'lodash'; +import type { Tool } from '@langchain/core/tools'; +import { VectorStore } from '@langchain/core/vectorstores'; +import { TextSplitter } from '@langchain/textsplitters'; import type { BaseDocumentLoader } from 'langchain/dist/document_loaders/base'; -import { N8nJsonLoader } from './N8nJsonLoader'; -import { N8nBinaryLoader } from './N8nBinaryLoader'; +import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; +import { NodeOperationError, NodeConnectionType } from 'n8n-workflow'; + import { logAiEvent, isToolsInstance, isBaseChatMemory, isBaseChatMessageHistory } from './helpers'; +import { N8nBinaryLoader } from './N8nBinaryLoader'; +import { N8nJsonLoader } from './N8nJsonLoader'; const errorsMap: { [key: string]: { message: string; description: string } } = { 'You exceeded your current quota, please check your plan and billing details.': { @@ -40,10 +37,6 @@ export async function callMethodAsync( try { return await parameters.method.call(this, ...parameters.arguments); } catch (e) { - // Langchain checks for OutputParserException to run retry chain - // for auto-fixing the output so skip wrapping in this case - if (e instanceof OutputParserException) throw e; - // Propagate errors from sub-nodes if (e.functionality === 'configuration-node') throw e; const connectedNode = parameters.executeFunctions.getNode(); @@ -63,7 +56,9 @@ export async function callMethodAsync( error, ); if (error.message) { - error.description = error.message; + if (!error.description) { + error.description = error.message; + } throw error; } throw new NodeOperationError( @@ -109,7 +104,6 @@ export function logWrapper( | Tool | BaseChatMemory | BaseChatMessageHistory - | BaseOutputParser | BaseRetriever | Embeddings | Document[] @@ -219,44 +213,6 @@ export function logWrapper( } } - // ========== BaseOutputParser ========== - if (originalInstance instanceof BaseOutputParser) { - if (prop === 'parse' && 'parse' in target) { - return async (text: string | Record): Promise => { - connectionType = NodeConnectionType.AiOutputParser; - const stringifiedText = isObject(text) ? JSON.stringify(text) : text; - const { index } = executeFunctions.addInputData(connectionType, [ - [{ json: { action: 'parse', text: stringifiedText } }], - ]); - - try { - const response = (await callMethodAsync.call(target, { - executeFunctions, - connectionType, - currentNodeRunIndex: index, - method: target[prop], - arguments: [stringifiedText], - })) as object; - - void logAiEvent(executeFunctions, 'ai-output-parsed', { text, response }); - executeFunctions.addOutputData(connectionType, index, [ - [{ json: { action: 'parse', response } }], - ]); - return response; - } catch (error) { - void logAiEvent(executeFunctions, 'ai-output-parsed', { - text, - response: error.message ?? error, - }); - executeFunctions.addOutputData(connectionType, index, [ - [{ json: { action: 'parse', response: error.message ?? error } }], - ]); - throw error; - } - }; - } - } - // ========== BaseRetriever ========== if (originalInstance instanceof BaseRetriever) { if (prop === 'getRelevantDocuments' && 'getRelevantDocuments' in target) { diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/ItemListOutputParser.ts b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nItemListOutputParser.ts similarity index 88% rename from packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/ItemListOutputParser.ts rename to packages/@n8n/nodes-langchain/utils/output_parsers/N8nItemListOutputParser.ts index 7e596a2b68..f24238690b 100644 --- a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserItemList/ItemListOutputParser.ts +++ b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nItemListOutputParser.ts @@ -1,9 +1,9 @@ import { BaseOutputParser, OutputParserException } from '@langchain/core/output_parsers'; -export class ItemListOutputParser extends BaseOutputParser { +export class N8nItemListOutputParser extends BaseOutputParser { lc_namespace = ['n8n-nodes-langchain', 'output_parsers', 'list_items']; - private numberOfItems: number | undefined; + private numberOfItems: number = 3; private separator: string; @@ -39,7 +39,7 @@ export class ItemListOutputParser extends BaseOutputParser { this.numberOfItems ? this.numberOfItems + ' ' : '' }items separated by`; - const numberOfExamples = this.numberOfItems ?? 3; + const numberOfExamples = this.numberOfItems; const examples: string[] = []; for (let i = 1; i <= numberOfExamples; i++) { @@ -48,4 +48,8 @@ export class ItemListOutputParser extends BaseOutputParser { return `${instructions} "${this.separator}" (for example: "${examples.join(this.separator)}")`; } + + getSchema() { + return; + } } diff --git a/packages/@n8n/nodes-langchain/utils/output_parsers/N8nOutputFixingParser.ts b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nOutputFixingParser.ts new file mode 100644 index 0000000000..bfcbf88b33 --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nOutputFixingParser.ts @@ -0,0 +1,95 @@ +import type { Callbacks } from '@langchain/core/callbacks/manager'; +import type { BaseLanguageModel } from '@langchain/core/language_models/base'; +import type { AIMessage } from '@langchain/core/messages'; +import { BaseOutputParser } from '@langchain/core/output_parsers'; +import type { IExecuteFunctions } from 'n8n-workflow'; +import { NodeConnectionType } from 'n8n-workflow'; + +import type { N8nStructuredOutputParser } from './N8nStructuredOutputParser'; +import { NAIVE_FIX_PROMPT } from './prompt'; +import { logAiEvent } from '../helpers'; + +export class N8nOutputFixingParser extends BaseOutputParser { + private context: IExecuteFunctions; + + private model: BaseLanguageModel; + + private outputParser: N8nStructuredOutputParser; + + lc_namespace = ['langchain', 'output_parsers', 'fix']; + + constructor( + context: IExecuteFunctions, + model: BaseLanguageModel, + outputParser: N8nStructuredOutputParser, + ) { + super(); + this.context = context; + this.model = model; + this.outputParser = outputParser; + } + + getRetryChain() { + return NAIVE_FIX_PROMPT.pipe(this.model); + } + + /** + * Attempts to parse the completion string using the output parser. + * If the initial parse fails, it tries to fix the output using a retry chain. + * @param completion The string to be parsed + * @returns The parsed response + * @throws Error if both parsing attempts fail + */ + async parse(completion: string, callbacks?: Callbacks) { + const { index } = this.context.addInputData(NodeConnectionType.AiOutputParser, [ + [{ json: { action: 'parse', text: completion } }], + ]); + + try { + // First attempt to parse the completion + const response = await this.outputParser.parse(completion, callbacks, (e) => e); + void logAiEvent(this.context, 'ai-output-parsed', { text: completion, response }); + + this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [ + [{ json: { action: 'parse', response } }], + ]); + + return response; + } catch (error) { + try { + // Second attempt: use retry chain to fix the output + const result = (await this.getRetryChain().invoke({ + completion, + error, + instructions: this.getFormatInstructions(), + })) as AIMessage; + + const resultText = result.content.toString(); + const parsed = await this.outputParser.parse(resultText, callbacks); + + // Add the successfully parsed output to the context + this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [ + [{ json: { action: 'parse', response: parsed } }], + ]); + + return parsed; + } catch (autoParseError) { + // If both attempts fail, add the error to the output and throw + this.context.addOutputData(NodeConnectionType.AiOutputParser, index, autoParseError); + throw autoParseError; + } + } + } + + /** + * Method to get the format instructions for the parser. + * @returns The format instructions for the parser. + */ + getFormatInstructions() { + return this.outputParser.getFormatInstructions(); + } + + getSchema() { + return this.outputParser.schema; + } +} diff --git a/packages/@n8n/nodes-langchain/utils/output_parsers/N8nOutputParser.ts b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nOutputParser.ts new file mode 100644 index 0000000000..e9d23a0dea --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nOutputParser.ts @@ -0,0 +1,26 @@ +import type { IExecuteFunctions } from 'n8n-workflow'; +import { NodeConnectionType } from 'n8n-workflow'; + +import { N8nItemListOutputParser } from './N8nItemListOutputParser'; +import { N8nOutputFixingParser } from './N8nOutputFixingParser'; +import { N8nStructuredOutputParser } from './N8nStructuredOutputParser'; + +export type N8nOutputParser = + | N8nOutputFixingParser + | N8nStructuredOutputParser + | N8nItemListOutputParser; + +export { N8nOutputFixingParser, N8nItemListOutputParser, N8nStructuredOutputParser }; + +export async function getOptionalOutputParsers(ctx: IExecuteFunctions): Promise { + let outputParsers: N8nOutputParser[] = []; + + if (ctx.getNodeParameter('hasOutputParser', 0, true) === true) { + outputParsers = (await ctx.getInputConnectionData( + NodeConnectionType.AiOutputParser, + 0, + )) as N8nOutputParser[]; + } + + return outputParsers; +} diff --git a/packages/@n8n/nodes-langchain/utils/output_parsers/N8nStructuredOutputParser.ts b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nStructuredOutputParser.ts new file mode 100644 index 0000000000..4799193be6 --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/output_parsers/N8nStructuredOutputParser.ts @@ -0,0 +1,116 @@ +import type { Callbacks } from '@langchain/core/callbacks/manager'; +import { StructuredOutputParser } from 'langchain/output_parsers'; +import get from 'lodash/get'; +import type { IExecuteFunctions } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { z } from 'zod'; + +import { logAiEvent } from '../helpers'; + +const STRUCTURED_OUTPUT_KEY = '__structured__output'; +const STRUCTURED_OUTPUT_OBJECT_KEY = '__structured__output__object'; +const STRUCTURED_OUTPUT_ARRAY_KEY = '__structured__output__array'; + +export class N8nStructuredOutputParser extends StructuredOutputParser< + z.ZodType +> { + context: IExecuteFunctions; + + constructor(context: IExecuteFunctions, zodSchema: z.ZodSchema) { + super(zodSchema); + this.context = context; + } + + lc_namespace = ['langchain', 'output_parsers', 'structured']; + + async parse( + text: string, + _callbacks?: Callbacks, + errorMapper?: (error: Error) => Error, + ): Promise { + const { index } = this.context.addInputData(NodeConnectionType.AiOutputParser, [ + [{ json: { action: 'parse', text } }], + ]); + try { + const parsed = await super.parse(text); + + const result = (get(parsed, [STRUCTURED_OUTPUT_KEY, STRUCTURED_OUTPUT_OBJECT_KEY]) ?? + get(parsed, [STRUCTURED_OUTPUT_KEY, STRUCTURED_OUTPUT_ARRAY_KEY]) ?? + get(parsed, STRUCTURED_OUTPUT_KEY) ?? + parsed) as Record; + + void logAiEvent(this.context, 'ai-output-parsed', { text, response: result }); + + this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [ + [{ json: { action: 'parse', response: result } }], + ]); + + return result; + } catch (e) { + const nodeError = new NodeOperationError( + this.context.getNode(), + "Model output doesn't fit required format", + { + description: + "To continue the execution when this happens, change the 'On Error' parameter in the root node's settings", + }, + ); + + void logAiEvent(this.context, 'ai-output-parsed', { + text, + response: e.message ?? e, + }); + + this.context.addOutputData(NodeConnectionType.AiOutputParser, index, nodeError); + if (errorMapper) { + throw errorMapper(e); + } + + throw nodeError; + } + } + + static async fromZodJsonSchema( + zodSchema: z.ZodSchema, + nodeVersion: number, + context: IExecuteFunctions, + ): Promise { + let returnSchema: z.ZodType; + if (nodeVersion === 1) { + returnSchema = z.object({ + [STRUCTURED_OUTPUT_KEY]: z + .object({ + [STRUCTURED_OUTPUT_OBJECT_KEY]: zodSchema.optional(), + [STRUCTURED_OUTPUT_ARRAY_KEY]: z.array(zodSchema).optional(), + }) + .describe( + `Wrapper around the output data. It can only contain ${STRUCTURED_OUTPUT_OBJECT_KEY} or ${STRUCTURED_OUTPUT_ARRAY_KEY} but never both.`, + ) + .refine( + (data) => { + // Validate that one and only one of the properties exists + return ( + Boolean(data[STRUCTURED_OUTPUT_OBJECT_KEY]) !== + Boolean(data[STRUCTURED_OUTPUT_ARRAY_KEY]) + ); + }, + { + message: + 'One and only one of __structured__output__object and __structured__output__array should be present.', + path: [STRUCTURED_OUTPUT_KEY], + }, + ), + }); + } else { + returnSchema = z.object({ + output: zodSchema.optional(), + }); + } + + return new N8nStructuredOutputParser(context, returnSchema); + } + + getSchema() { + return this.schema; + } +} diff --git a/packages/@n8n/nodes-langchain/utils/output_parsers/prompt.ts b/packages/@n8n/nodes-langchain/utils/output_parsers/prompt.ts new file mode 100644 index 0000000000..47599d230c --- /dev/null +++ b/packages/@n8n/nodes-langchain/utils/output_parsers/prompt.ts @@ -0,0 +1,20 @@ +import { PromptTemplate } from '@langchain/core/prompts'; + +export const NAIVE_FIX_TEMPLATE = `Instructions: +-------------- +{instructions} +-------------- +Completion: +-------------- +{completion} +-------------- + +Above, the Completion did not satisfy the constraints given in the Instructions. +Error: +-------------- +{error} +-------------- + +Please try again. Please only respond with an answer that satisfies the constraints laid out in the Instructions:`; + +export const NAIVE_FIX_PROMPT = PromptTemplate.fromTemplate(NAIVE_FIX_TEMPLATE); diff --git a/packages/editor-ui/src/components/RunDataAi/RunDataAi.vue b/packages/editor-ui/src/components/RunDataAi/RunDataAi.vue index 89257df47c..48f4c2448e 100644 --- a/packages/editor-ui/src/components/RunDataAi/RunDataAi.vue +++ b/packages/editor-ui/src/components/RunDataAi/RunDataAi.vue @@ -161,12 +161,12 @@ function getTreeNodeData(nodeName: string, currentDepth: number): TreeNode[] { connections[key][0].flatMap((node) => getTreeNodeData(node.node, currentDepth + 1)), ); + children.sort((a, b) => a.startTime - b.startTime); + if (resultData.length) { return resultData.map((r) => createNode(nodeName, currentDepth, r, children)); } - children.sort((a, b) => a.startTime - b.startTime); - return [createNode(nodeName, currentDepth, undefined, children)]; }