feat: Optimize langchain calls in batching mode (#15011)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Benjamin Schroth
2025-05-02 17:09:31 +02:00
committed by GitHub
parent a4290dcb78
commit f3e29d25ed
12 changed files with 632 additions and 205 deletions

View File

@@ -47,6 +47,30 @@ export const toolsAgentProperties: INodeProperties[] = [
description: description:
'Whether or not binary images should be automatically passed through to the agent as image type messages', 'Whether or not binary images should be automatically passed through to the agent as image type messages',
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 1,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting, but will impact the ordering in the agents log output.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description: 'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
], ],
}, },
]; ];

View File

@@ -11,7 +11,13 @@ import type { AgentAction, AgentFinish } from 'langchain/agents';
import { AgentExecutor, createToolCallingAgent } from 'langchain/agents'; import { AgentExecutor, createToolCallingAgent } from 'langchain/agents';
import type { ToolsAgentAction } from 'langchain/dist/agents/tool_calling/output_parser'; import type { ToolsAgentAction } from 'langchain/dist/agents/tool_calling/output_parser';
import { omit } from 'lodash'; import { omit } from 'lodash';
import { BINARY_ENCODING, jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import {
BINARY_ENCODING,
jsonParse,
NodeConnectionTypes,
NodeOperationError,
sleep,
} from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow'; import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type { ZodObject } from 'zod'; import type { ZodObject } from 'zod';
import { z } from 'zod'; import { z } from 'zod';
@@ -406,12 +412,21 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
const items = this.getInputData(); const items = this.getInputData();
const outputParser = await getOptionalOutputParser(this); const outputParser = await getOptionalOutputParser(this);
const memory = await getOptionalMemory(this);
const tools = await getTools(this, outputParser); const tools = await getTools(this, outputParser);
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 1,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { for (let i = 0; i < items.length; i += batchSize) {
try { const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
const model = await getChatModel(this); const model = await getChatModel(this);
const memory = await getOptionalMemory(this);
const input = getPromptInputByType({ const input = getPromptInputByType({
ctx: this, ctx: this,
@@ -461,7 +476,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
}); });
// Invoke the executor with the given input and system message. // Invoke the executor with the given input and system message.
const response = await executor.invoke( return await executor.invoke(
{ {
input, input,
system_message: options.systemMessage ?? SYSTEM_MESSAGE, system_message: options.systemMessage ?? SYSTEM_MESSAGE,
@@ -470,7 +485,22 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
}, },
{ signal: this.getExecutionCancelSignal() }, { signal: this.getExecutionCancelSignal() },
); );
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((result, index) => {
if (result.status === 'rejected') {
if (this.continueOnFail()) {
returnData.push({
json: { error: result.reason as string },
pairedItem: { item: index },
});
return;
} else {
throw new NodeOperationError(this.getNode(), result.reason);
}
}
const response = result.value;
// If memory and outputParser are connected, parse the output. // If memory and outputParser are connected, parse the output.
if (memory && outputParser) { if (memory && outputParser) {
const parsedOutput = jsonParse<{ output: Record<string, unknown> }>( const parsedOutput = jsonParse<{ output: Record<string, unknown> }>(
@@ -492,15 +522,10 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
}; };
returnData.push(itemResult); returnData.push(itemResult);
} catch (error) { });
if (this.continueOnFail()) {
returnData.push({ if (i + batchSize < items.length && delayBetweenBatches > 0) {
json: { error: error.message }, await sleep(delayBetweenBatches);
pairedItem: { item: itemIndex },
});
continue;
}
throw error;
} }
} }

View File

@@ -5,7 +5,7 @@ import type {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeApiError, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import { NodeApiError, NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow';
import { getPromptInputByType } from '@utils/helpers'; import { getPromptInputByType } from '@utils/helpers';
import { getOptionalOutputParser } from '@utils/output_parsers/N8nOutputParser'; import { getOptionalOutputParser } from '@utils/output_parsers/N8nOutputParser';
@@ -67,19 +67,28 @@ export class ChainLlm implements INodeType {
this.logger.debug('Executing Basic LLM Chain'); this.logger.debug('Executing Basic LLM Chain');
const items = this.getInputData(); const items = this.getInputData();
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
const { batchSize, delayBetweenBatches } = this.getNodeParameter('batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
// Get output parser if configured
const outputParser = await getOptionalOutputParser(this);
// Process items in batches
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
// Process each input item
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
// Get the language model // Get the language model
const llm = (await this.getInputConnectionData( const llm = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel, NodeConnectionTypes.AiLanguageModel,
0, 0,
)) as BaseLanguageModel; )) as BaseLanguageModel;
// Get output parser if configured
const outputParser = await getOptionalOutputParser(this);
// Get user prompt based on node version // Get user prompt based on node version
let prompt: string; let prompt: string;
@@ -106,44 +115,53 @@ export class ChainLlm implements INodeType {
[], [],
) as MessageTemplate[]; ) as MessageTemplate[];
// Execute the chain return (await executeChain({
const responses = await executeChain({
context: this, context: this,
itemIndex, itemIndex,
query: prompt, query: prompt,
llm, llm,
outputParser, outputParser,
messages, messages,
}); })) as object[];
});
// If the node version is 1.6(and LLM is using `response_format: json_object`) or higher or an output parser is configured, const batchResults = await Promise.allSettled(batchPromises);
// we unwrap the response and return the object directly as JSON
const shouldUnwrapObjects = this.getNode().typeVersion >= 1.6 || !!outputParser; batchResults.forEach((promiseResult, batchItemIndex) => {
// Process each response and add to return data const itemIndex = i + batchItemIndex;
responses.forEach((response) => { if (promiseResult.status === 'rejected') {
returnData.push({ const error = promiseResult.reason as Error;
json: formatResponse(response, shouldUnwrapObjects), // Handle OpenAI specific rate limit errors
}); if (error instanceof NodeApiError && isOpenAiError(error.cause)) {
}); const openAiErrorCode: string | undefined = (error.cause as any).error?.code;
} catch (error) { if (openAiErrorCode) {
// Handle OpenAI specific rate limit errors const customMessage = getCustomOpenAiErrorMessage(openAiErrorCode);
if (error instanceof NodeApiError && isOpenAiError(error.cause)) { if (customMessage) {
const openAiErrorCode: string | undefined = (error.cause as any).error?.code; error.message = customMessage;
if (openAiErrorCode) { }
const customMessage = getCustomOpenAiErrorMessage(openAiErrorCode);
if (customMessage) {
error.message = customMessage;
} }
} }
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
});
return;
}
throw new NodeOperationError(this.getNode(), error);
} }
// Continue on failure if configured const responses = promiseResult.value;
if (this.continueOnFail()) { responses.forEach((response: object) => {
returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } }); returnData.push({
continue; json: formatResponse(response, this.getNode().typeVersion >= 1.6 || !!outputParser),
} });
});
});
throw error; if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} }
} }

View File

@@ -270,4 +270,29 @@ export const nodeProperties: INodeProperties[] = [
}, },
}, },
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
placeholder: 'Add Batch Processing Option',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting, but will impact the agents log output.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 1000,
type: 'number',
description: 'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
]; ];

View File

@@ -3,7 +3,7 @@
import { FakeChatModel } from '@langchain/core/utils/testing'; import { FakeChatModel } from '@langchain/core/utils/testing';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, INode } from 'n8n-workflow'; import type { IExecuteFunctions, INode } from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow'; import { NodeConnectionTypes, UnexpectedError } from 'n8n-workflow';
import * as helperModule from '@utils/helpers'; import * as helperModule from '@utils/helpers';
import * as outputParserModule from '@utils/output_parsers/N8nOutputParser'; import * as outputParserModule from '@utils/output_parsers/N8nOutputParser';
@@ -12,6 +12,11 @@ import { ChainLlm } from '../ChainLlm.node';
import * as executeChainModule from '../methods/chainExecutor'; import * as executeChainModule from '../methods/chainExecutor';
import * as responseFormatterModule from '../methods/responseFormatter'; import * as responseFormatterModule from '../methods/responseFormatter';
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn(),
}));
jest.mock('@utils/helpers', () => ({ jest.mock('@utils/helpers', () => ({
getPromptInputByType: jest.fn(), getPromptInputByType: jest.fn(),
})); }));
@@ -25,12 +30,7 @@ jest.mock('../methods/chainExecutor', () => ({
})); }));
jest.mock('../methods/responseFormatter', () => ({ jest.mock('../methods/responseFormatter', () => ({
formatResponse: jest.fn().mockImplementation((response) => { formatResponse: jest.fn(),
if (typeof response === 'string') {
return { text: response.trim() };
}
return response;
}),
})); }));
describe('ChainLlm Node', () => { describe('ChainLlm Node', () => {
@@ -38,6 +38,8 @@ describe('ChainLlm Node', () => {
let mockExecuteFunction: jest.Mocked<IExecuteFunctions>; let mockExecuteFunction: jest.Mocked<IExecuteFunctions>;
beforeEach(() => { beforeEach(() => {
jest.resetAllMocks();
node = new ChainLlm(); node = new ChainLlm();
mockExecuteFunction = mock<IExecuteFunctions>(); mockExecuteFunction = mock<IExecuteFunctions>();
@@ -63,7 +65,12 @@ describe('ChainLlm Node', () => {
const fakeLLM = new FakeChatModel({}); const fakeLLM = new FakeChatModel({});
mockExecuteFunction.getInputConnectionData.mockResolvedValue(fakeLLM); mockExecuteFunction.getInputConnectionData.mockResolvedValue(fakeLLM);
jest.clearAllMocks(); (responseFormatterModule.formatResponse as jest.Mock).mockImplementation((response) => {
if (typeof response === 'string') {
return { text: response.trim() };
}
return response;
});
}); });
describe('description', () => { describe('description', () => {
@@ -164,15 +171,14 @@ describe('ChainLlm Node', () => {
}); });
it('should continue on failure when configured', async () => { it('should continue on failure when configured', async () => {
mockExecuteFunction.continueOnFail.mockReturnValue(true);
(helperModule.getPromptInputByType as jest.Mock).mockReturnValue('Test prompt'); (helperModule.getPromptInputByType as jest.Mock).mockReturnValue('Test prompt');
const error = new Error('Test error'); (executeChainModule.executeChain as jest.Mock).mockRejectedValueOnce(
(executeChainModule.executeChain as jest.Mock).mockRejectedValue(error); new UnexpectedError('Test error'),
);
mockExecuteFunction.continueOnFail.mockReturnValue(true);
const result = await node.execute.call(mockExecuteFunction); const result = await node.execute.call(mockExecuteFunction);
expect(result).toEqual([[{ json: { error: 'Test error' }, pairedItem: { item: 0 } }]]); expect(result).toEqual([[{ json: { error: 'Test error' }, pairedItem: { item: 0 } }]]);
}); });

View File

@@ -8,7 +8,7 @@ import {
import type { BaseRetriever } from '@langchain/core/retrievers'; import type { BaseRetriever } from '@langchain/core/retrievers';
import { createStuffDocumentsChain } from 'langchain/chains/combine_documents'; import { createStuffDocumentsChain } from 'langchain/chains/combine_documents';
import { createRetrievalChain } from 'langchain/chains/retrieval'; import { createRetrievalChain } from 'langchain/chains/retrieval';
import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata } from 'n8n-workflow'; import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata, sleep } from 'n8n-workflow';
import { import {
type INodeProperties, type INodeProperties,
type IExecuteFunctions, type IExecuteFunctions,
@@ -177,6 +177,31 @@ export class ChainRetrievalQa implements INodeType {
}, },
}, },
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
], ],
}, },
], ],
@@ -186,11 +211,20 @@ export class ChainRetrievalQa implements INodeType {
this.logger.debug('Executing Retrieval QA Chain'); this.logger.debug('Executing Retrieval QA Chain');
const items = this.getInputData(); const items = this.getInputData();
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
// Run for each item for (let i = 0; i < items.length; i += batchSize) {
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { const batch = items.slice(i, i + batchSize);
try { const batchPromises = batch.map(async (_item, batchIndex) => {
const itemIndex = i + batchIndex;
const model = (await this.getInputConnectionData( const model = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel, NodeConnectionTypes.AiLanguageModel,
0, 0,
@@ -266,32 +300,47 @@ export class ChainRetrievalQa implements INodeType {
// Execute the chain with tracing config // Execute the chain with tracing config
const tracingConfig = getTracingConfig(this); const tracingConfig = getTracingConfig(this);
const response = await retrievalChain
const result = await retrievalChain
.withConfig(tracingConfig) .withConfig(tracingConfig)
.invoke({ input: query }, { signal: this.getExecutionCancelSignal() }); .invoke({ input: query }, { signal: this.getExecutionCancelSignal() });
// Get the answer from the response return result;
const answer: string = response.answer; });
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, index) => {
if (response.status === 'rejected') {
const error = response.reason;
if (this.continueOnFail()) {
const metadata = parseErrorMetadata(error);
returnData.push({
json: { error: error.message },
pairedItem: { item: index },
metadata,
});
return;
} else {
throw error;
}
}
const output = response.value;
const answer: string = output.answer;
if (this.getNode().typeVersion >= 1.5) { if (this.getNode().typeVersion >= 1.5) {
returnData.push({ json: { response: answer } }); returnData.push({ json: { response: answer } });
} else { } else {
// Legacy format for versions 1.4 and below is { text: string } // Legacy format for versions 1.4 and below is { text: string }
returnData.push({ json: { response: { text: answer } } }); returnData.push({ json: { response: { text: answer } } });
} }
} catch (error) { });
if (this.continueOnFail()) {
const metadata = parseErrorMetadata(error);
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
metadata,
});
continue;
}
throw error; // Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} }
} }
return [returnData]; return [returnData];
} }
} }

View File

@@ -8,6 +8,11 @@ import { NodeConnectionTypes, NodeOperationError, UnexpectedError } from 'n8n-wo
import { ChainRetrievalQa } from '../ChainRetrievalQa.node'; import { ChainRetrievalQa } from '../ChainRetrievalQa.node';
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn(),
}));
const createExecuteFunctionsMock = ( const createExecuteFunctionsMock = (
parameters: IDataObject, parameters: IDataObject,
fakeLlm: BaseLanguageModel, fakeLlm: BaseLanguageModel,
@@ -80,7 +85,12 @@ describe('ChainRetrievalQa', () => {
const params = { const params = {
promptType: 'define', promptType: 'define',
text: 'What is the capital of France?', text: 'What is the capital of France?',
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
}; };
const result = await node.execute.call( const result = await node.execute.call(
@@ -114,7 +124,12 @@ describe('ChainRetrievalQa', () => {
const params = { const params = {
promptType: 'define', promptType: 'define',
text: 'What is the capital of France?', text: 'What is the capital of France?',
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
}; };
const result = await node.execute.call( const result = await node.execute.call(
@@ -158,6 +173,10 @@ describe('ChainRetrievalQa', () => {
text: 'What is the capital of France?', text: 'What is the capital of France?',
options: { options: {
systemPromptTemplate: customSystemPrompt, systemPromptTemplate: customSystemPrompt,
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
}, },
}; };
@@ -185,7 +204,12 @@ describe('ChainRetrievalQa', () => {
const params = { const params = {
promptType: 'define', promptType: 'define',
text: undefined, // undefined query text: undefined, // undefined query
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
}; };
await expect( await expect(
@@ -211,7 +235,12 @@ describe('ChainRetrievalQa', () => {
const params = { const params = {
promptType: 'define', promptType: 'define',
text: 'What is the capital of France?', text: 'What is the capital of France?',
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
}; };
// Override continueOnFail to return true // Override continueOnFail to return true

View File

@@ -1,5 +1,6 @@
import type { Document } from '@langchain/core/documents'; import type { Document } from '@langchain/core/documents';
import type { BaseLanguageModel } from '@langchain/core/language_models/base'; import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import type { ChainValues } from '@langchain/core/utils/types';
import type { TextSplitter } from '@langchain/textsplitters'; import type { TextSplitter } from '@langchain/textsplitters';
import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters'; import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters';
import { loadSummarizationChain } from 'langchain/chains'; import { loadSummarizationChain } from 'langchain/chains';
@@ -12,7 +13,7 @@ import type {
IDataObject, IDataObject,
INodeInputConfiguration, INodeInputConfiguration,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow'; import { NodeConnectionTypes, sleep } from 'n8n-workflow';
import { N8nBinaryLoader } from '@utils/N8nBinaryLoader'; import { N8nBinaryLoader } from '@utils/N8nBinaryLoader';
import { N8nJsonLoader } from '@utils/N8nJsonLoader'; import { N8nJsonLoader } from '@utils/N8nJsonLoader';
@@ -306,6 +307,31 @@ export class ChainSummarizationV2 implements INodeType {
}, },
], ],
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
], ],
}, },
], ],
@@ -324,9 +350,19 @@ export class ChainSummarizationV2 implements INodeType {
const items = this.getInputData(); const items = this.getInputData();
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchIndex) => {
const itemIndex = i + batchIndex;
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const model = (await this.getInputConnectionData( const model = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel, NodeConnectionTypes.AiLanguageModel,
0, 0,
@@ -353,6 +389,7 @@ export class ChainSummarizationV2 implements INodeType {
const item = items[itemIndex]; const item = items[itemIndex];
let processedDocuments: Document[]; let processedDocuments: Document[];
let output: ChainValues = {};
// Use dedicated document loader input to load documents // Use dedicated document loader input to load documents
if (operationMode === 'documentLoader') { if (operationMode === 'documentLoader') {
@@ -368,11 +405,9 @@ export class ChainSummarizationV2 implements INodeType {
? await documentInput.processItem(item, itemIndex) ? await documentInput.processItem(item, itemIndex)
: documentInput; : documentInput;
const response = await chain.withConfig(getTracingConfig(this)).invoke({ output = await chain.withConfig(getTracingConfig(this)).invoke({
input_documents: processedDocuments, input_documents: processedDocuments,
}); });
returnData.push({ json: { response } });
} }
// Take the input and use binary or json loader // Take the input and use binary or json loader
@@ -412,21 +447,37 @@ export class ChainSummarizationV2 implements INodeType {
} }
const processedItem = await processor.processItem(item, itemIndex); const processedItem = await processor.processItem(item, itemIndex);
const response = await chain.invoke( output = await chain.invoke(
{ {
input_documents: processedItem, input_documents: processedItem,
}, },
{ signal: this.getExecutionCancelSignal() }, { signal: this.getExecutionCancelSignal() },
); );
returnData.push({ json: { response } });
}
} catch (error) {
if (this.continueOnFail()) {
returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } });
continue;
} }
return output;
});
throw error; const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, index) => {
if (response.status === 'rejected') {
const error = response.reason as Error;
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: i + index },
});
} else {
throw error;
}
} else {
const output = response.value;
returnData.push({ json: { output } });
}
});
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} }
} }

View File

@@ -3,7 +3,7 @@ import { HumanMessage } from '@langchain/core/messages';
import { ChatPromptTemplate, SystemMessagePromptTemplate } from '@langchain/core/prompts'; import { ChatPromptTemplate, SystemMessagePromptTemplate } from '@langchain/core/prompts';
import type { JSONSchema7 } from 'json-schema'; import type { JSONSchema7 } from 'json-schema';
import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers'; import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers';
import { jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import { jsonParse, NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow';
import type { import type {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
@@ -213,6 +213,31 @@ export class InformationExtractor implements INodeType {
rows: 6, rows: 6,
}, },
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting, but will impact the agents log output.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
], ],
}, },
], ],
@@ -220,6 +245,13 @@ export class InformationExtractor implements INodeType {
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData(); const items = this.getInputData();
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
const llm = (await this.getInputConnectionData( const llm = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel, NodeConnectionTypes.AiLanguageModel,
@@ -265,38 +297,58 @@ export class InformationExtractor implements INodeType {
} }
const resultData: INodeExecutionData[] = []; const resultData: INodeExecutionData[] = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const input = this.getNodeParameter('text', itemIndex) as string;
const inputPrompt = new HumanMessage(input);
const options = this.getNodeParameter('options', itemIndex, {}) as { for (let i = 0; i < items.length; i += batchSize) {
systemPromptTemplate?: string; const batch = items.slice(i, i + batchSize);
};
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( const batchPromises = batch.map(async (_item, batchItemIndex) => {
`${options.systemPromptTemplate ?? SYSTEM_PROMPT_TEMPLATE} const itemIndex = i + batchItemIndex;
{format_instructions}`,
);
const messages = [ const input = this.getNodeParameter('text', itemIndex) as string;
await systemPromptTemplate.format({ const inputPrompt = new HumanMessage(input);
format_instructions: parser.getFormatInstructions(),
}),
inputPrompt,
];
const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
try { const options = this.getNodeParameter('options', itemIndex, {}) as {
const output = await chain.invoke(messages); systemPromptTemplate?: string;
resultData.push({ json: { output } }); };
} catch (error) {
if (this.continueOnFail()) { const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
resultData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } }); `${options.systemPromptTemplate ?? SYSTEM_PROMPT_TEMPLATE}
continue; {format_instructions}`,
);
const messages = [
await systemPromptTemplate.format({
format_instructions: parser.getFormatInstructions(),
}),
inputPrompt,
];
const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
return await chain.invoke(messages);
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, index) => {
if (response.status === 'rejected') {
const error = response.reason as Error;
if (this.continueOnFail()) {
resultData.push({
json: { error: response.reason as string },
pairedItem: { item: i + index },
});
return;
} else {
throw new NodeOperationError(this.getNode(), error.message);
}
} }
const output = response.value;
resultData.push({ json: { output } });
});
throw error; // Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} }
} }

View File

@@ -7,6 +7,11 @@ import { makeZodSchemaFromAttributes } from '../helpers';
import { InformationExtractor } from '../InformationExtractor.node'; import { InformationExtractor } from '../InformationExtractor.node';
import type { AttributeDefinition } from '../types'; import type { AttributeDefinition } from '../types';
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn(),
}));
const mockPersonAttributes: AttributeDefinition[] = [ const mockPersonAttributes: AttributeDefinition[] = [
{ {
name: 'name', name: 'name',
@@ -91,7 +96,12 @@ describe('InformationExtractor', () => {
attributes: { attributes: {
attributes: mockPersonAttributes, attributes: mockPersonAttributes,
}, },
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
schemaType: 'fromAttributes', schemaType: 'fromAttributes',
}, },
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: 30 }) }), new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: 30 }) }),
@@ -111,7 +121,12 @@ describe('InformationExtractor', () => {
attributes: { attributes: {
attributes: mockPersonAttributes, attributes: mockPersonAttributes,
}, },
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
schemaType: 'fromAttributes', schemaType: 'fromAttributes',
}, },
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }), new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }),
@@ -132,7 +147,12 @@ describe('InformationExtractor', () => {
attributes: { attributes: {
attributes: mockPersonAttributesRequired, attributes: mockPersonAttributesRequired,
}, },
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
schemaType: 'fromAttributes', schemaType: 'fromAttributes',
}, },
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }), new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }),
@@ -154,7 +174,12 @@ describe('InformationExtractor', () => {
attributes: { attributes: {
attributes: mockPersonAttributes, attributes: mockPersonAttributes,
}, },
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
schemaType: 'fromAttributes', schemaType: 'fromAttributes',
}, },
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: '30' }) }), new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: '30' }) }),
@@ -175,7 +200,12 @@ describe('InformationExtractor', () => {
attributes: { attributes: {
attributes: mockPersonAttributesRequired, attributes: mockPersonAttributesRequired,
}, },
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
schemaType: 'fromAttributes', schemaType: 'fromAttributes',
}, },
new FakeListChatModel({ new FakeListChatModel({
@@ -200,7 +230,12 @@ describe('InformationExtractor', () => {
attributes: { attributes: {
attributes: mockPersonAttributesRequired, attributes: mockPersonAttributesRequired,
}, },
options: {}, options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
schemaType: 'fromAttributes', schemaType: 'fromAttributes',
}, },
new FakeListChatModel({ new FakeListChatModel({

View File

@@ -2,7 +2,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import { HumanMessage } from '@langchain/core/messages'; import { HumanMessage } from '@langchain/core/messages';
import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts'; import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts';
import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers'; import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers';
import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import { NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow';
import type { import type {
IDataObject, IDataObject,
IExecuteFunctions, IExecuteFunctions,
@@ -131,6 +131,31 @@ export class SentimentAnalysis implements INodeType {
description: description:
'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)', 'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)',
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
], ],
}, },
], ],
@@ -145,12 +170,21 @@ export class SentimentAnalysis implements INodeType {
)) as BaseLanguageModel; )) as BaseLanguageModel;
const returnData: INodeExecutionData[][] = []; const returnData: INodeExecutionData[][] = [];
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
for (let i = 0; i < items.length; i++) { for (let i = 0; i < items.length; i += batchSize) {
try { const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
const sentimentCategories = this.getNodeParameter( const sentimentCategories = this.getNodeParameter(
'options.categories', 'options.categories',
i, itemIndex,
DEFAULT_CATEGORIES, DEFAULT_CATEGORIES,
) as string; ) as string;
@@ -160,9 +194,13 @@ export class SentimentAnalysis implements INodeType {
.filter(Boolean); .filter(Boolean);
if (categories.length === 0) { if (categories.length === 0) {
throw new NodeOperationError(this.getNode(), 'No sentiment categories provided', { return {
itemIndex: i, result: null,
}); itemIndex,
error: new NodeOperationError(this.getNode(), 'No sentiment categories provided', {
itemIndex,
}),
};
} }
// Initialize returnData with empty arrays for each category // Initialize returnData with empty arrays for each category
@@ -170,7 +208,7 @@ export class SentimentAnalysis implements INodeType {
returnData.push(...Array.from({ length: categories.length }, () => [])); returnData.push(...Array.from({ length: categories.length }, () => []));
} }
const options = this.getNodeParameter('options', i, {}) as { const options = this.getNodeParameter('options', itemIndex, {}) as {
systemPromptTemplate?: string; systemPromptTemplate?: string;
includeDetailedResults?: boolean; includeDetailedResults?: boolean;
enableAutoFixing?: boolean; enableAutoFixing?: boolean;
@@ -194,10 +232,10 @@ export class SentimentAnalysis implements INodeType {
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${options.systemPromptTemplate ?? DEFAULT_SYSTEM_PROMPT_TEMPLATE} `${options.systemPromptTemplate ?? DEFAULT_SYSTEM_PROMPT_TEMPLATE}
{format_instructions}`, {format_instructions}`,
); );
const input = this.getNodeParameter('inputText', i) as string; const input = this.getNodeParameter('inputText', itemIndex) as string;
const inputPrompt = new HumanMessage(input); const inputPrompt = new HumanMessage(input);
const messages = [ const messages = [
await systemPromptTemplate.format({ await systemPromptTemplate.format({
@@ -217,7 +255,7 @@ export class SentimentAnalysis implements INodeType {
); );
if (sentimentIndex !== -1) { if (sentimentIndex !== -1) {
const resultItem = { ...items[i] }; const resultItem = { ...items[itemIndex] };
const sentimentAnalysis: IDataObject = { const sentimentAnalysis: IDataObject = {
category: output.sentiment, category: output.sentiment,
}; };
@@ -229,27 +267,59 @@ export class SentimentAnalysis implements INodeType {
...resultItem.json, ...resultItem.json,
sentimentAnalysis, sentimentAnalysis,
}; };
returnData[sentimentIndex].push(resultItem);
return {
result: {
resultItem,
sentimentIndex,
},
itemIndex,
};
} }
return {
result: {},
itemIndex,
};
} catch (error) { } catch (error) {
throw new NodeOperationError( return {
this.getNode(), result: null,
'Error during parsing of LLM output, please check your LLM model and configuration', itemIndex,
{ error: new NodeOperationError(
itemIndex: i, this.getNode(),
}, 'Error during parsing of LLM output, please check your LLM model and configuration',
); {
itemIndex,
},
),
};
} }
} catch (error) { });
if (this.continueOnFail()) { const batchResults = await Promise.all(batchPromises);
const executionErrorData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: error.message }), batchResults.forEach(({ result, itemIndex, error }) => {
{ itemData: { item: i } }, if (error) {
); if (this.continueOnFail()) {
returnData[0].push(...executionErrorData); const executionErrorData = this.helpers.constructExecutionMetaData(
continue; this.helpers.returnJsonArray({ error: error.message }),
{ itemData: { item: itemIndex } },
);
returnData[0].push(...executionErrorData);
return;
} else {
throw error;
}
} else if (result.resultItem && result.sentimentIndex) {
const sentimentIndex = result.sentimentIndex;
const resultItem = result.resultItem;
returnData[sentimentIndex].push(resultItem);
} }
throw error; });
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} }
} }
return returnData; return returnData;

View File

@@ -2,7 +2,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import { HumanMessage } from '@langchain/core/messages'; import { HumanMessage } from '@langchain/core/messages';
import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts'; import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts';
import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers'; import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers';
import { NodeOperationError, NodeConnectionTypes } from 'n8n-workflow'; import { NodeOperationError, NodeConnectionTypes, sleep } from 'n8n-workflow';
import type { import type {
IDataObject, IDataObject,
IExecuteFunctions, IExecuteFunctions,
@@ -158,6 +158,31 @@ export class TextClassifier implements INodeType {
description: description:
'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)', 'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)',
}, },
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
], ],
}, },
], ],
@@ -165,6 +190,13 @@ export class TextClassifier implements INodeType {
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData(); const items = this.getInputData();
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
const llm = (await this.getInputConnectionData( const llm = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel, NodeConnectionTypes.AiLanguageModel,
@@ -223,68 +255,79 @@ export class TextClassifier implements INodeType {
{ length: categories.length + (fallback === 'other' ? 1 : 0) }, { length: categories.length + (fallback === 'other' ? 1 : 0) },
(_) => [], (_) => [],
); );
for (let itemIdx = 0; itemIdx < items.length; itemIdx++) {
const item = items[itemIdx];
item.pairedItem = { item: itemIdx };
const input = this.getNodeParameter('inputText', itemIdx) as string;
if (input === undefined || input === null) { for (let i = 0; i < items.length; i += batchSize) {
if (this.continueOnFail()) { const batch = items.slice(i, i + batchSize);
returnData[0].push({ const batchPromises = batch.map(async (_item, batchItemIndex) => {
json: { error: 'Text to classify is not defined' }, const itemIdx = i + batchItemIndex;
pairedItem: { item: itemIdx }, const item = items[itemIdx];
}); item.pairedItem = { item: itemIdx };
continue; const input = this.getNodeParameter('inputText', itemIdx) as string;
} else {
if (input === undefined || input === null) {
throw new NodeOperationError( throw new NodeOperationError(
this.getNode(), this.getNode(),
`Text to classify for item ${itemIdx} is not defined`, `Text to classify for item ${itemIdx} is not defined`,
); );
} }
}
const inputPrompt = new HumanMessage(input); const inputPrompt = new HumanMessage(input);
const systemPromptTemplateOpt = this.getNodeParameter( const systemPromptTemplateOpt = this.getNodeParameter(
'options.systemPromptTemplate', 'options.systemPromptTemplate',
itemIdx, itemIdx,
SYSTEM_PROMPT_TEMPLATE, SYSTEM_PROMPT_TEMPLATE,
) as string; ) as string;
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate( const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${systemPromptTemplateOpt ?? SYSTEM_PROMPT_TEMPLATE} `${systemPromptTemplateOpt ?? SYSTEM_PROMPT_TEMPLATE}
{format_instructions} {format_instructions}
${multiClassPrompt} ${multiClassPrompt}
${fallbackPrompt}`, ${fallbackPrompt}`,
); );
const messages = [ const messages = [
await systemPromptTemplate.format({ await systemPromptTemplate.format({
categories: categories.map((cat) => cat.category).join(', '), categories: categories.map((cat) => cat.category).join(', '),
format_instructions: parser.getFormatInstructions(), format_instructions: parser.getFormatInstructions(),
}), }),
inputPrompt, inputPrompt,
]; ];
const prompt = ChatPromptTemplate.fromMessages(messages); const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this)); const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
try { return await chain.invoke(messages);
const output = await chain.invoke(messages); });
categories.forEach((cat, idx) => { const batchResults = await Promise.allSettled(batchPromises);
if (output[cat.category]) returnData[idx].push(item);
}); batchResults.forEach((response, batchItemIndex) => {
if (fallback === 'other' && output.fallback) returnData[returnData.length - 1].push(item); const index = i + batchItemIndex;
} catch (error) { if (response.status === 'rejected') {
if (this.continueOnFail()) { const error = response.reason as Error;
returnData[0].push({ if (this.continueOnFail()) {
json: { error: error.message }, returnData[0].push({
pairedItem: { item: itemIdx }, json: { error: error.message },
pairedItem: { item: index },
});
return;
} else {
throw new NodeOperationError(this.getNode(), error.message);
}
} else {
const output = response.value;
const item = items[index];
categories.forEach((cat, idx) => {
if (output[cat.category]) returnData[idx].push(item);
}); });
continue; if (fallback === 'other' && output.fallback) returnData[returnData.length - 1].push(item);
} }
});
throw error; // Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} }
} }