diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/description.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/description.ts index 06b64a91de..004f59e6b7 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/description.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/description.ts @@ -47,6 +47,30 @@ export const toolsAgentProperties: INodeProperties[] = [ description: 'Whether or not binary images should be automatically passed through to the agent as image type messages', }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 1, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting, but will impact the ordering in the agents log output.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 0, + type: 'number', + description: 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ], }, ]; diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts index 826059ab78..af3011b3f5 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts @@ -11,7 +11,13 @@ import type { AgentAction, AgentFinish } from 'langchain/agents'; import { AgentExecutor, createToolCallingAgent } from 'langchain/agents'; import type { ToolsAgentAction } from 'langchain/dist/agents/tool_calling/output_parser'; import { omit } from 'lodash'; -import { BINARY_ENCODING, jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; +import { + BINARY_ENCODING, + jsonParse, + NodeConnectionTypes, + NodeOperationError, + sleep, +} from 'n8n-workflow'; import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; import type { ZodObject } from 'zod'; import { z } from 'zod'; @@ -406,12 +412,21 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise { + const itemIndex = i + batchItemIndex; const model = await getChatModel(this); - const memory = await getOptionalMemory(this); const input = getPromptInputByType({ ctx: this, @@ -461,7 +476,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise { + if (result.status === 'rejected') { + if (this.continueOnFail()) { + returnData.push({ + json: { error: result.reason as string }, + pairedItem: { item: index }, + }); + return; + } else { + throw new NodeOperationError(this.getNode(), result.reason); + } + } + const response = result.value; // If memory and outputParser are connected, parse the output. if (memory && outputParser) { const parsedOutput = jsonParse<{ output: Record }>( @@ -492,15 +522,10 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise 0) { + await sleep(delayBetweenBatches); } } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts index ce982e896d..787b1a3932 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/ChainLlm.node.ts @@ -5,7 +5,7 @@ import type { INodeType, INodeTypeDescription, } from 'n8n-workflow'; -import { NodeApiError, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; +import { NodeApiError, NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow'; import { getPromptInputByType } from '@utils/helpers'; import { getOptionalOutputParser } from '@utils/output_parsers/N8nOutputParser'; @@ -67,19 +67,28 @@ export class ChainLlm implements INodeType { this.logger.debug('Executing Basic LLM Chain'); const items = this.getInputData(); const returnData: INodeExecutionData[] = []; + const { batchSize, delayBetweenBatches } = this.getNodeParameter('batching', 0, { + batchSize: 100, + delayBetweenBatches: 0, + }) as { + batchSize: number; + delayBetweenBatches: number; + }; + // Get output parser if configured + const outputParser = await getOptionalOutputParser(this); + + // Process items in batches + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + const batchPromises = batch.map(async (_item, batchItemIndex) => { + const itemIndex = i + batchItemIndex; - // Process each input item - for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { - try { // Get the language model const llm = (await this.getInputConnectionData( NodeConnectionTypes.AiLanguageModel, 0, )) as BaseLanguageModel; - // Get output parser if configured - const outputParser = await getOptionalOutputParser(this); - // Get user prompt based on node version let prompt: string; @@ -106,44 +115,53 @@ export class ChainLlm implements INodeType { [], ) as MessageTemplate[]; - // Execute the chain - const responses = await executeChain({ + return (await executeChain({ context: this, itemIndex, query: prompt, llm, outputParser, messages, - }); + })) as object[]; + }); - // If the node version is 1.6(and LLM is using `response_format: json_object`) or higher or an output parser is configured, - // we unwrap the response and return the object directly as JSON - const shouldUnwrapObjects = this.getNode().typeVersion >= 1.6 || !!outputParser; - // Process each response and add to return data - responses.forEach((response) => { - returnData.push({ - json: formatResponse(response, shouldUnwrapObjects), - }); - }); - } catch (error) { - // Handle OpenAI specific rate limit errors - if (error instanceof NodeApiError && isOpenAiError(error.cause)) { - const openAiErrorCode: string | undefined = (error.cause as any).error?.code; - if (openAiErrorCode) { - const customMessage = getCustomOpenAiErrorMessage(openAiErrorCode); - if (customMessage) { - error.message = customMessage; + const batchResults = await Promise.allSettled(batchPromises); + + batchResults.forEach((promiseResult, batchItemIndex) => { + const itemIndex = i + batchItemIndex; + if (promiseResult.status === 'rejected') { + const error = promiseResult.reason as Error; + // Handle OpenAI specific rate limit errors + if (error instanceof NodeApiError && isOpenAiError(error.cause)) { + const openAiErrorCode: string | undefined = (error.cause as any).error?.code; + if (openAiErrorCode) { + const customMessage = getCustomOpenAiErrorMessage(openAiErrorCode); + if (customMessage) { + error.message = customMessage; + } } } + + if (this.continueOnFail()) { + returnData.push({ + json: { error: error.message }, + pairedItem: { item: itemIndex }, + }); + return; + } + throw new NodeOperationError(this.getNode(), error); } - // Continue on failure if configured - if (this.continueOnFail()) { - returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } }); - continue; - } + const responses = promiseResult.value; + responses.forEach((response: object) => { + returnData.push({ + json: formatResponse(response, this.getNode().typeVersion >= 1.6 || !!outputParser), + }); + }); + }); - throw error; + if (i + batchSize < items.length && delayBetweenBatches > 0) { + await sleep(delayBetweenBatches); } } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/methods/config.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/methods/config.ts index 098bc8a8e1..a1df862432 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/methods/config.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/methods/config.ts @@ -270,4 +270,29 @@ export const nodeProperties: INodeProperties[] = [ }, }, }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + placeholder: 'Add Batch Processing Option', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 100, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting, but will impact the agents log output.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 1000, + type: 'number', + description: 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ]; diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/test/ChainLlm.node.test.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/test/ChainLlm.node.test.ts index d7916a0275..c9c375042c 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/test/ChainLlm.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainLLM/test/ChainLlm.node.test.ts @@ -3,7 +3,7 @@ import { FakeChatModel } from '@langchain/core/utils/testing'; import { mock } from 'jest-mock-extended'; import type { IExecuteFunctions, INode } from 'n8n-workflow'; -import { NodeConnectionTypes } from 'n8n-workflow'; +import { NodeConnectionTypes, UnexpectedError } from 'n8n-workflow'; import * as helperModule from '@utils/helpers'; import * as outputParserModule from '@utils/output_parsers/N8nOutputParser'; @@ -12,6 +12,11 @@ import { ChainLlm } from '../ChainLlm.node'; import * as executeChainModule from '../methods/chainExecutor'; import * as responseFormatterModule from '../methods/responseFormatter'; +jest.mock('n8n-workflow', () => ({ + ...jest.requireActual('n8n-workflow'), + sleep: jest.fn(), +})); + jest.mock('@utils/helpers', () => ({ getPromptInputByType: jest.fn(), })); @@ -25,12 +30,7 @@ jest.mock('../methods/chainExecutor', () => ({ })); jest.mock('../methods/responseFormatter', () => ({ - formatResponse: jest.fn().mockImplementation((response) => { - if (typeof response === 'string') { - return { text: response.trim() }; - } - return response; - }), + formatResponse: jest.fn(), })); describe('ChainLlm Node', () => { @@ -38,6 +38,8 @@ describe('ChainLlm Node', () => { let mockExecuteFunction: jest.Mocked; beforeEach(() => { + jest.resetAllMocks(); + node = new ChainLlm(); mockExecuteFunction = mock(); @@ -63,7 +65,12 @@ describe('ChainLlm Node', () => { const fakeLLM = new FakeChatModel({}); mockExecuteFunction.getInputConnectionData.mockResolvedValue(fakeLLM); - jest.clearAllMocks(); + (responseFormatterModule.formatResponse as jest.Mock).mockImplementation((response) => { + if (typeof response === 'string') { + return { text: response.trim() }; + } + return response; + }); }); describe('description', () => { @@ -164,15 +171,14 @@ describe('ChainLlm Node', () => { }); it('should continue on failure when configured', async () => { + mockExecuteFunction.continueOnFail.mockReturnValue(true); (helperModule.getPromptInputByType as jest.Mock).mockReturnValue('Test prompt'); - const error = new Error('Test error'); - (executeChainModule.executeChain as jest.Mock).mockRejectedValue(error); - - mockExecuteFunction.continueOnFail.mockReturnValue(true); + (executeChainModule.executeChain as jest.Mock).mockRejectedValueOnce( + new UnexpectedError('Test error'), + ); const result = await node.execute.call(mockExecuteFunction); - expect(result).toEqual([[{ json: { error: 'Test error' }, pairedItem: { item: 0 } }]]); }); diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts index 4640603f69..c86c1b6957 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/ChainRetrievalQa.node.ts @@ -8,7 +8,7 @@ import { import type { BaseRetriever } from '@langchain/core/retrievers'; import { createStuffDocumentsChain } from 'langchain/chains/combine_documents'; import { createRetrievalChain } from 'langchain/chains/retrieval'; -import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata } from 'n8n-workflow'; +import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata, sleep } from 'n8n-workflow'; import { type INodeProperties, type IExecuteFunctions, @@ -177,6 +177,31 @@ export class ChainRetrievalQa implements INodeType { }, }, }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 100, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 0, + type: 'number', + description: + 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ], }, ], @@ -186,11 +211,20 @@ export class ChainRetrievalQa implements INodeType { this.logger.debug('Executing Retrieval QA Chain'); const items = this.getInputData(); + const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, { + batchSize: 100, + delayBetweenBatches: 0, + }) as { + batchSize: number; + delayBetweenBatches: number; + }; const returnData: INodeExecutionData[] = []; - // Run for each item - for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { - try { + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + const batchPromises = batch.map(async (_item, batchIndex) => { + const itemIndex = i + batchIndex; + const model = (await this.getInputConnectionData( NodeConnectionTypes.AiLanguageModel, 0, @@ -266,32 +300,47 @@ export class ChainRetrievalQa implements INodeType { // Execute the chain with tracing config const tracingConfig = getTracingConfig(this); - const response = await retrievalChain + + const result = await retrievalChain .withConfig(tracingConfig) .invoke({ input: query }, { signal: this.getExecutionCancelSignal() }); - // Get the answer from the response - const answer: string = response.answer; + return result; + }); + + const batchResults = await Promise.allSettled(batchPromises); + + batchResults.forEach((response, index) => { + if (response.status === 'rejected') { + const error = response.reason; + if (this.continueOnFail()) { + const metadata = parseErrorMetadata(error); + returnData.push({ + json: { error: error.message }, + pairedItem: { item: index }, + metadata, + }); + return; + } else { + throw error; + } + } + const output = response.value; + const answer: string = output.answer; if (this.getNode().typeVersion >= 1.5) { returnData.push({ json: { response: answer } }); } else { // Legacy format for versions 1.4 and below is { text: string } returnData.push({ json: { response: { text: answer } } }); } - } catch (error) { - if (this.continueOnFail()) { - const metadata = parseErrorMetadata(error); - returnData.push({ - json: { error: error.message }, - pairedItem: { item: itemIndex }, - metadata, - }); - continue; - } + }); - throw error; + // Add delay between batches if not the last batch + if (i + batchSize < items.length && delayBetweenBatches > 0) { + await sleep(delayBetweenBatches); } } + return [returnData]; } } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/test/ChainRetrievalQa.node.test.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/test/ChainRetrievalQa.node.test.ts index 36e118464b..44914b7372 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/test/ChainRetrievalQa.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainRetrievalQA/test/ChainRetrievalQa.node.test.ts @@ -8,6 +8,11 @@ import { NodeConnectionTypes, NodeOperationError, UnexpectedError } from 'n8n-wo import { ChainRetrievalQa } from '../ChainRetrievalQa.node'; +jest.mock('n8n-workflow', () => ({ + ...jest.requireActual('n8n-workflow'), + sleep: jest.fn(), +})); + const createExecuteFunctionsMock = ( parameters: IDataObject, fakeLlm: BaseLanguageModel, @@ -80,7 +85,12 @@ describe('ChainRetrievalQa', () => { const params = { promptType: 'define', text: 'What is the capital of France?', - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, }; const result = await node.execute.call( @@ -114,7 +124,12 @@ describe('ChainRetrievalQa', () => { const params = { promptType: 'define', text: 'What is the capital of France?', - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, }; const result = await node.execute.call( @@ -158,6 +173,10 @@ describe('ChainRetrievalQa', () => { text: 'What is the capital of France?', options: { systemPromptTemplate: customSystemPrompt, + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, }, }; @@ -185,7 +204,12 @@ describe('ChainRetrievalQa', () => { const params = { promptType: 'define', text: undefined, // undefined query - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, }; await expect( @@ -211,7 +235,12 @@ describe('ChainRetrievalQa', () => { const params = { promptType: 'define', text: 'What is the capital of France?', - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, }; // Override continueOnFail to return true diff --git a/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts index 7cb2bed603..8d7863a413 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/ChainSummarization/V2/ChainSummarizationV2.node.ts @@ -1,5 +1,6 @@ import type { Document } from '@langchain/core/documents'; import type { BaseLanguageModel } from '@langchain/core/language_models/base'; +import type { ChainValues } from '@langchain/core/utils/types'; import type { TextSplitter } from '@langchain/textsplitters'; import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters'; import { loadSummarizationChain } from 'langchain/chains'; @@ -12,7 +13,7 @@ import type { IDataObject, INodeInputConfiguration, } from 'n8n-workflow'; -import { NodeConnectionTypes } from 'n8n-workflow'; +import { NodeConnectionTypes, sleep } from 'n8n-workflow'; import { N8nBinaryLoader } from '@utils/N8nBinaryLoader'; import { N8nJsonLoader } from '@utils/N8nJsonLoader'; @@ -306,6 +307,31 @@ export class ChainSummarizationV2 implements INodeType { }, ], }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 100, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 0, + type: 'number', + description: + 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ], }, ], @@ -324,9 +350,19 @@ export class ChainSummarizationV2 implements INodeType { const items = this.getInputData(); const returnData: INodeExecutionData[] = []; + const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, { + batchSize: 100, + delayBetweenBatches: 0, + }) as { + batchSize: number; + delayBetweenBatches: number; + }; + + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + const batchPromises = batch.map(async (_item, batchIndex) => { + const itemIndex = i + batchIndex; - for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { - try { const model = (await this.getInputConnectionData( NodeConnectionTypes.AiLanguageModel, 0, @@ -353,6 +389,7 @@ export class ChainSummarizationV2 implements INodeType { const item = items[itemIndex]; let processedDocuments: Document[]; + let output: ChainValues = {}; // Use dedicated document loader input to load documents if (operationMode === 'documentLoader') { @@ -368,11 +405,9 @@ export class ChainSummarizationV2 implements INodeType { ? await documentInput.processItem(item, itemIndex) : documentInput; - const response = await chain.withConfig(getTracingConfig(this)).invoke({ + output = await chain.withConfig(getTracingConfig(this)).invoke({ input_documents: processedDocuments, }); - - returnData.push({ json: { response } }); } // Take the input and use binary or json loader @@ -412,21 +447,37 @@ export class ChainSummarizationV2 implements INodeType { } const processedItem = await processor.processItem(item, itemIndex); - const response = await chain.invoke( + output = await chain.invoke( { input_documents: processedItem, }, { signal: this.getExecutionCancelSignal() }, ); - returnData.push({ json: { response } }); - } - } catch (error) { - if (this.continueOnFail()) { - returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } }); - continue; } + return output; + }); - throw error; + const batchResults = await Promise.allSettled(batchPromises); + batchResults.forEach((response, index) => { + if (response.status === 'rejected') { + const error = response.reason as Error; + if (this.continueOnFail()) { + returnData.push({ + json: { error: error.message }, + pairedItem: { item: i + index }, + }); + } else { + throw error; + } + } else { + const output = response.value; + returnData.push({ json: { output } }); + } + }); + + // Add delay between batches if not the last batch + if (i + batchSize < items.length && delayBetweenBatches > 0) { + await sleep(delayBetweenBatches); } } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts index 182e488782..b285eeffee 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts @@ -3,7 +3,7 @@ import { HumanMessage } from '@langchain/core/messages'; import { ChatPromptTemplate, SystemMessagePromptTemplate } from '@langchain/core/prompts'; import type { JSONSchema7 } from 'json-schema'; import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers'; -import { jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; +import { jsonParse, NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow'; import type { INodeType, INodeTypeDescription, @@ -213,6 +213,31 @@ export class InformationExtractor implements INodeType { rows: 6, }, }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 100, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting, but will impact the agents log output.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 0, + type: 'number', + description: + 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ], }, ], @@ -220,6 +245,13 @@ export class InformationExtractor implements INodeType { async execute(this: IExecuteFunctions): Promise { const items = this.getInputData(); + const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, { + batchSize: 100, + delayBetweenBatches: 0, + }) as { + batchSize: number; + delayBetweenBatches: number; + }; const llm = (await this.getInputConnectionData( NodeConnectionTypes.AiLanguageModel, @@ -265,38 +297,58 @@ export class InformationExtractor implements INodeType { } const resultData: INodeExecutionData[] = []; - for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { - const input = this.getNodeParameter('text', itemIndex) as string; - const inputPrompt = new HumanMessage(input); - const options = this.getNodeParameter('options', itemIndex, {}) as { - systemPromptTemplate?: string; - }; + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); - const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( - `${options.systemPromptTemplate ?? SYSTEM_PROMPT_TEMPLATE} -{format_instructions}`, - ); + const batchPromises = batch.map(async (_item, batchItemIndex) => { + const itemIndex = i + batchItemIndex; - const messages = [ - await systemPromptTemplate.format({ - format_instructions: parser.getFormatInstructions(), - }), - inputPrompt, - ]; - const prompt = ChatPromptTemplate.fromMessages(messages); - const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this)); + const input = this.getNodeParameter('text', itemIndex) as string; + const inputPrompt = new HumanMessage(input); - try { - const output = await chain.invoke(messages); - resultData.push({ json: { output } }); - } catch (error) { - if (this.continueOnFail()) { - resultData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } }); - continue; + const options = this.getNodeParameter('options', itemIndex, {}) as { + systemPromptTemplate?: string; + }; + + const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( + `${options.systemPromptTemplate ?? SYSTEM_PROMPT_TEMPLATE} + {format_instructions}`, + ); + + const messages = [ + await systemPromptTemplate.format({ + format_instructions: parser.getFormatInstructions(), + }), + inputPrompt, + ]; + const prompt = ChatPromptTemplate.fromMessages(messages); + const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this)); + + return await chain.invoke(messages); + }); + const batchResults = await Promise.allSettled(batchPromises); + + batchResults.forEach((response, index) => { + if (response.status === 'rejected') { + const error = response.reason as Error; + if (this.continueOnFail()) { + resultData.push({ + json: { error: response.reason as string }, + pairedItem: { item: i + index }, + }); + return; + } else { + throw new NodeOperationError(this.getNode(), error.message); + } } + const output = response.value; + resultData.push({ json: { output } }); + }); - throw error; + // Add delay between batches if not the last batch + if (i + batchSize < items.length && delayBetweenBatches > 0) { + await sleep(delayBetweenBatches); } } diff --git a/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/test/InformationExtraction.node.test.ts b/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/test/InformationExtraction.node.test.ts index 0444ba3cef..2ee6f79ed7 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/test/InformationExtraction.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/test/InformationExtraction.node.test.ts @@ -7,6 +7,11 @@ import { makeZodSchemaFromAttributes } from '../helpers'; import { InformationExtractor } from '../InformationExtractor.node'; import type { AttributeDefinition } from '../types'; +jest.mock('n8n-workflow', () => ({ + ...jest.requireActual('n8n-workflow'), + sleep: jest.fn(), +})); + const mockPersonAttributes: AttributeDefinition[] = [ { name: 'name', @@ -91,7 +96,12 @@ describe('InformationExtractor', () => { attributes: { attributes: mockPersonAttributes, }, - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, schemaType: 'fromAttributes', }, new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: 30 }) }), @@ -111,7 +121,12 @@ describe('InformationExtractor', () => { attributes: { attributes: mockPersonAttributes, }, - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, schemaType: 'fromAttributes', }, new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }), @@ -132,7 +147,12 @@ describe('InformationExtractor', () => { attributes: { attributes: mockPersonAttributesRequired, }, - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, schemaType: 'fromAttributes', }, new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }), @@ -154,7 +174,12 @@ describe('InformationExtractor', () => { attributes: { attributes: mockPersonAttributes, }, - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, schemaType: 'fromAttributes', }, new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: '30' }) }), @@ -175,7 +200,12 @@ describe('InformationExtractor', () => { attributes: { attributes: mockPersonAttributesRequired, }, - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, schemaType: 'fromAttributes', }, new FakeListChatModel({ @@ -200,7 +230,12 @@ describe('InformationExtractor', () => { attributes: { attributes: mockPersonAttributesRequired, }, - options: {}, + options: { + batching: { + batchSize: 1, + delayBetweenBatches: 100, + }, + }, schemaType: 'fromAttributes', }, new FakeListChatModel({ diff --git a/packages/@n8n/nodes-langchain/nodes/chains/SentimentAnalysis/SentimentAnalysis.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/SentimentAnalysis/SentimentAnalysis.node.ts index ac232eeee4..6a2ef51d40 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/SentimentAnalysis/SentimentAnalysis.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/SentimentAnalysis/SentimentAnalysis.node.ts @@ -2,7 +2,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base'; import { HumanMessage } from '@langchain/core/messages'; import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts'; import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers'; -import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow'; import type { IDataObject, IExecuteFunctions, @@ -131,6 +131,31 @@ export class SentimentAnalysis implements INodeType { description: 'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)', }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 100, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 0, + type: 'number', + description: + 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ], }, ], @@ -145,12 +170,21 @@ export class SentimentAnalysis implements INodeType { )) as BaseLanguageModel; const returnData: INodeExecutionData[][] = []; + const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, { + batchSize: 100, + delayBetweenBatches: 0, + }) as { + batchSize: number; + delayBetweenBatches: number; + }; - for (let i = 0; i < items.length; i++) { - try { + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + const batchPromises = batch.map(async (_item, batchItemIndex) => { + const itemIndex = i + batchItemIndex; const sentimentCategories = this.getNodeParameter( 'options.categories', - i, + itemIndex, DEFAULT_CATEGORIES, ) as string; @@ -160,9 +194,13 @@ export class SentimentAnalysis implements INodeType { .filter(Boolean); if (categories.length === 0) { - throw new NodeOperationError(this.getNode(), 'No sentiment categories provided', { - itemIndex: i, - }); + return { + result: null, + itemIndex, + error: new NodeOperationError(this.getNode(), 'No sentiment categories provided', { + itemIndex, + }), + }; } // Initialize returnData with empty arrays for each category @@ -170,7 +208,7 @@ export class SentimentAnalysis implements INodeType { returnData.push(...Array.from({ length: categories.length }, () => [])); } - const options = this.getNodeParameter('options', i, {}) as { + const options = this.getNodeParameter('options', itemIndex, {}) as { systemPromptTemplate?: string; includeDetailedResults?: boolean; enableAutoFixing?: boolean; @@ -194,10 +232,10 @@ export class SentimentAnalysis implements INodeType { const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( `${options.systemPromptTemplate ?? DEFAULT_SYSTEM_PROMPT_TEMPLATE} - {format_instructions}`, + {format_instructions}`, ); - const input = this.getNodeParameter('inputText', i) as string; + const input = this.getNodeParameter('inputText', itemIndex) as string; const inputPrompt = new HumanMessage(input); const messages = [ await systemPromptTemplate.format({ @@ -217,7 +255,7 @@ export class SentimentAnalysis implements INodeType { ); if (sentimentIndex !== -1) { - const resultItem = { ...items[i] }; + const resultItem = { ...items[itemIndex] }; const sentimentAnalysis: IDataObject = { category: output.sentiment, }; @@ -229,27 +267,59 @@ export class SentimentAnalysis implements INodeType { ...resultItem.json, sentimentAnalysis, }; - returnData[sentimentIndex].push(resultItem); + + return { + result: { + resultItem, + sentimentIndex, + }, + itemIndex, + }; } + + return { + result: {}, + itemIndex, + }; } catch (error) { - throw new NodeOperationError( - this.getNode(), - 'Error during parsing of LLM output, please check your LLM model and configuration', - { - itemIndex: i, - }, - ); + return { + result: null, + itemIndex, + error: new NodeOperationError( + this.getNode(), + 'Error during parsing of LLM output, please check your LLM model and configuration', + { + itemIndex, + }, + ), + }; } - } catch (error) { - if (this.continueOnFail()) { - const executionErrorData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray({ error: error.message }), - { itemData: { item: i } }, - ); - returnData[0].push(...executionErrorData); - continue; + }); + const batchResults = await Promise.all(batchPromises); + + batchResults.forEach(({ result, itemIndex, error }) => { + if (error) { + if (this.continueOnFail()) { + const executionErrorData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray({ error: error.message }), + { itemData: { item: itemIndex } }, + ); + + returnData[0].push(...executionErrorData); + return; + } else { + throw error; + } + } else if (result.resultItem && result.sentimentIndex) { + const sentimentIndex = result.sentimentIndex; + const resultItem = result.resultItem; + returnData[sentimentIndex].push(resultItem); } - throw error; + }); + + // Add delay between batches if not the last batch + if (i + batchSize < items.length && delayBetweenBatches > 0) { + await sleep(delayBetweenBatches); } } return returnData; diff --git a/packages/@n8n/nodes-langchain/nodes/chains/TextClassifier/TextClassifier.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/TextClassifier/TextClassifier.node.ts index 204e164370..564d3c7276 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/TextClassifier/TextClassifier.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/TextClassifier/TextClassifier.node.ts @@ -2,7 +2,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base'; import { HumanMessage } from '@langchain/core/messages'; import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts'; import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers'; -import { NodeOperationError, NodeConnectionTypes } from 'n8n-workflow'; +import { NodeOperationError, NodeConnectionTypes, sleep } from 'n8n-workflow'; import type { IDataObject, IExecuteFunctions, @@ -158,6 +158,31 @@ export class TextClassifier implements INodeType { description: 'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)', }, + { + displayName: 'Batch Processing', + name: 'batching', + type: 'collection', + description: 'Batch processing options for rate limiting', + default: {}, + options: [ + { + displayName: 'Batch Size', + name: 'batchSize', + default: 100, + type: 'number', + description: + 'How many items to process in parallel. This is useful for rate limiting.', + }, + { + displayName: 'Delay Between Batches', + name: 'delayBetweenBatches', + default: 0, + type: 'number', + description: + 'Delay in milliseconds between batches. This is useful for rate limiting.', + }, + ], + }, ], }, ], @@ -165,6 +190,13 @@ export class TextClassifier implements INodeType { async execute(this: IExecuteFunctions): Promise { const items = this.getInputData(); + const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, { + batchSize: 100, + delayBetweenBatches: 0, + }) as { + batchSize: number; + delayBetweenBatches: number; + }; const llm = (await this.getInputConnectionData( NodeConnectionTypes.AiLanguageModel, @@ -223,68 +255,79 @@ export class TextClassifier implements INodeType { { length: categories.length + (fallback === 'other' ? 1 : 0) }, (_) => [], ); - for (let itemIdx = 0; itemIdx < items.length; itemIdx++) { - const item = items[itemIdx]; - item.pairedItem = { item: itemIdx }; - const input = this.getNodeParameter('inputText', itemIdx) as string; - if (input === undefined || input === null) { - if (this.continueOnFail()) { - returnData[0].push({ - json: { error: 'Text to classify is not defined' }, - pairedItem: { item: itemIdx }, - }); - continue; - } else { + for (let i = 0; i < items.length; i += batchSize) { + const batch = items.slice(i, i + batchSize); + const batchPromises = batch.map(async (_item, batchItemIndex) => { + const itemIdx = i + batchItemIndex; + const item = items[itemIdx]; + item.pairedItem = { item: itemIdx }; + const input = this.getNodeParameter('inputText', itemIdx) as string; + + if (input === undefined || input === null) { throw new NodeOperationError( this.getNode(), `Text to classify for item ${itemIdx} is not defined`, ); } - } - const inputPrompt = new HumanMessage(input); + const inputPrompt = new HumanMessage(input); - const systemPromptTemplateOpt = this.getNodeParameter( - 'options.systemPromptTemplate', - itemIdx, - SYSTEM_PROMPT_TEMPLATE, - ) as string; - const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( - `${systemPromptTemplateOpt ?? SYSTEM_PROMPT_TEMPLATE} -{format_instructions} -${multiClassPrompt} -${fallbackPrompt}`, - ); + const systemPromptTemplateOpt = this.getNodeParameter( + 'options.systemPromptTemplate', + itemIdx, + SYSTEM_PROMPT_TEMPLATE, + ) as string; + const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( + `${systemPromptTemplateOpt ?? SYSTEM_PROMPT_TEMPLATE} + {format_instructions} + ${multiClassPrompt} + ${fallbackPrompt}`, + ); - const messages = [ - await systemPromptTemplate.format({ - categories: categories.map((cat) => cat.category).join(', '), - format_instructions: parser.getFormatInstructions(), - }), - inputPrompt, - ]; - const prompt = ChatPromptTemplate.fromMessages(messages); - const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this)); + const messages = [ + await systemPromptTemplate.format({ + categories: categories.map((cat) => cat.category).join(', '), + format_instructions: parser.getFormatInstructions(), + }), + inputPrompt, + ]; + const prompt = ChatPromptTemplate.fromMessages(messages); + const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this)); - try { - const output = await chain.invoke(messages); + return await chain.invoke(messages); + }); - categories.forEach((cat, idx) => { - if (output[cat.category]) returnData[idx].push(item); - }); - if (fallback === 'other' && output.fallback) returnData[returnData.length - 1].push(item); - } catch (error) { - if (this.continueOnFail()) { - returnData[0].push({ - json: { error: error.message }, - pairedItem: { item: itemIdx }, + const batchResults = await Promise.allSettled(batchPromises); + + batchResults.forEach((response, batchItemIndex) => { + const index = i + batchItemIndex; + if (response.status === 'rejected') { + const error = response.reason as Error; + if (this.continueOnFail()) { + returnData[0].push({ + json: { error: error.message }, + pairedItem: { item: index }, + }); + return; + } else { + throw new NodeOperationError(this.getNode(), error.message); + } + } else { + const output = response.value; + const item = items[index]; + + categories.forEach((cat, idx) => { + if (output[cat.category]) returnData[idx].push(item); }); - continue; + if (fallback === 'other' && output.fallback) returnData[returnData.length - 1].push(item); } + }); - throw error; + // Add delay between batches if not the last batch + if (i + batchSize < items.length && delayBetweenBatches > 0) { + await sleep(delayBetweenBatches); } }