fix: Revert AI nodes batching (#15129)

This commit is contained in:
oleg
2025-05-06 11:18:11 +02:00
committed by GitHub
parent ca0e7ffe3b
commit 939ff97ec4
12 changed files with 205 additions and 632 deletions

View File

@@ -47,30 +47,6 @@ export const toolsAgentProperties: INodeProperties[] = [
description:
'Whether or not binary images should be automatically passed through to the agent as image type messages',
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 1,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting, but will impact the ordering in the agents log output.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description: 'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
],
},
];

View File

@@ -11,13 +11,7 @@ import type { AgentAction, AgentFinish } from 'langchain/agents';
import { AgentExecutor, createToolCallingAgent } from 'langchain/agents';
import type { ToolsAgentAction } from 'langchain/dist/agents/tool_calling/output_parser';
import { omit } from 'lodash';
import {
BINARY_ENCODING,
jsonParse,
NodeConnectionTypes,
NodeOperationError,
sleep,
} from 'n8n-workflow';
import { BINARY_ENCODING, jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type { ZodObject } from 'zod';
import { z } from 'zod';
@@ -412,21 +406,12 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
const returnData: INodeExecutionData[] = [];
const items = this.getInputData();
const outputParser = await getOptionalOutputParser(this);
const memory = await getOptionalMemory(this);
const tools = await getTools(this, outputParser);
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 1,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const model = await getChatModel(this);
const memory = await getOptionalMemory(this);
const input = getPromptInputByType({
ctx: this,
@@ -476,7 +461,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
});
// Invoke the executor with the given input and system message.
return await executor.invoke(
const response = await executor.invoke(
{
input,
system_message: options.systemMessage ?? SYSTEM_MESSAGE,
@@ -485,22 +470,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
},
{ signal: this.getExecutionCancelSignal() },
);
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((result, index) => {
if (result.status === 'rejected') {
if (this.continueOnFail()) {
returnData.push({
json: { error: result.reason as string },
pairedItem: { item: index },
});
return;
} else {
throw new NodeOperationError(this.getNode(), result.reason);
}
}
const response = result.value;
// If memory and outputParser are connected, parse the output.
if (memory && outputParser) {
const parsedOutput = jsonParse<{ output: Record<string, unknown> }>(
@@ -522,10 +492,15 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
};
returnData.push(itemResult);
});
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
} catch (error) {
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
});
continue;
}
throw error;
}
}

View File

@@ -5,7 +5,7 @@ import type {
INodeType,
INodeTypeDescription,
} from 'n8n-workflow';
import { NodeApiError, NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow';
import { NodeApiError, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import { getPromptInputByType } from '@utils/helpers';
import { getOptionalOutputParser } from '@utils/output_parsers/N8nOutputParser';
@@ -67,28 +67,19 @@ export class ChainLlm implements INodeType {
this.logger.debug('Executing Basic LLM Chain');
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
const { batchSize, delayBetweenBatches } = this.getNodeParameter('batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
// Get output parser if configured
const outputParser = await getOptionalOutputParser(this);
// Process items in batches
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
// Process each input item
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
// Get the language model
const llm = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel,
0,
)) as BaseLanguageModel;
// Get output parser if configured
const outputParser = await getOptionalOutputParser(this);
// Get user prompt based on node version
let prompt: string;
@@ -115,53 +106,44 @@ export class ChainLlm implements INodeType {
[],
) as MessageTemplate[];
return (await executeChain({
// Execute the chain
const responses = await executeChain({
context: this,
itemIndex,
query: prompt,
llm,
outputParser,
messages,
})) as object[];
});
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((promiseResult, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
if (promiseResult.status === 'rejected') {
const error = promiseResult.reason as Error;
// Handle OpenAI specific rate limit errors
if (error instanceof NodeApiError && isOpenAiError(error.cause)) {
const openAiErrorCode: string | undefined = (error.cause as any).error?.code;
if (openAiErrorCode) {
const customMessage = getCustomOpenAiErrorMessage(openAiErrorCode);
if (customMessage) {
error.message = customMessage;
}
}
}
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
});
return;
}
throw new NodeOperationError(this.getNode(), error);
}
const responses = promiseResult.value;
responses.forEach((response: object) => {
// If the node version is 1.6(and LLM is using `response_format: json_object`) or higher or an output parser is configured,
// we unwrap the response and return the object directly as JSON
const shouldUnwrapObjects = this.getNode().typeVersion >= 1.6 || !!outputParser;
// Process each response and add to return data
responses.forEach((response) => {
returnData.push({
json: formatResponse(response, this.getNode().typeVersion >= 1.6 || !!outputParser),
json: formatResponse(response, shouldUnwrapObjects),
});
});
});
} catch (error) {
// Handle OpenAI specific rate limit errors
if (error instanceof NodeApiError && isOpenAiError(error.cause)) {
const openAiErrorCode: string | undefined = (error.cause as any).error?.code;
if (openAiErrorCode) {
const customMessage = getCustomOpenAiErrorMessage(openAiErrorCode);
if (customMessage) {
error.message = customMessage;
}
}
}
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
// Continue on failure if configured
if (this.continueOnFail()) {
returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } });
continue;
}
throw error;
}
}

View File

@@ -270,29 +270,4 @@ export const nodeProperties: INodeProperties[] = [
},
},
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
placeholder: 'Add Batch Processing Option',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting, but will impact the agents log output.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 1000,
type: 'number',
description: 'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
];

View File

@@ -3,7 +3,7 @@
import { FakeChatModel } from '@langchain/core/utils/testing';
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, INode } from 'n8n-workflow';
import { NodeConnectionTypes, UnexpectedError } from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow';
import * as helperModule from '@utils/helpers';
import * as outputParserModule from '@utils/output_parsers/N8nOutputParser';
@@ -12,11 +12,6 @@ import { ChainLlm } from '../ChainLlm.node';
import * as executeChainModule from '../methods/chainExecutor';
import * as responseFormatterModule from '../methods/responseFormatter';
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn(),
}));
jest.mock('@utils/helpers', () => ({
getPromptInputByType: jest.fn(),
}));
@@ -30,7 +25,12 @@ jest.mock('../methods/chainExecutor', () => ({
}));
jest.mock('../methods/responseFormatter', () => ({
formatResponse: jest.fn(),
formatResponse: jest.fn().mockImplementation((response) => {
if (typeof response === 'string') {
return { text: response.trim() };
}
return response;
}),
}));
describe('ChainLlm Node', () => {
@@ -38,8 +38,6 @@ describe('ChainLlm Node', () => {
let mockExecuteFunction: jest.Mocked<IExecuteFunctions>;
beforeEach(() => {
jest.resetAllMocks();
node = new ChainLlm();
mockExecuteFunction = mock<IExecuteFunctions>();
@@ -65,12 +63,7 @@ describe('ChainLlm Node', () => {
const fakeLLM = new FakeChatModel({});
mockExecuteFunction.getInputConnectionData.mockResolvedValue(fakeLLM);
(responseFormatterModule.formatResponse as jest.Mock).mockImplementation((response) => {
if (typeof response === 'string') {
return { text: response.trim() };
}
return response;
});
jest.clearAllMocks();
});
describe('description', () => {
@@ -171,14 +164,15 @@ describe('ChainLlm Node', () => {
});
it('should continue on failure when configured', async () => {
mockExecuteFunction.continueOnFail.mockReturnValue(true);
(helperModule.getPromptInputByType as jest.Mock).mockReturnValue('Test prompt');
(executeChainModule.executeChain as jest.Mock).mockRejectedValueOnce(
new UnexpectedError('Test error'),
);
const error = new Error('Test error');
(executeChainModule.executeChain as jest.Mock).mockRejectedValue(error);
mockExecuteFunction.continueOnFail.mockReturnValue(true);
const result = await node.execute.call(mockExecuteFunction);
expect(result).toEqual([[{ json: { error: 'Test error' }, pairedItem: { item: 0 } }]]);
});

View File

@@ -8,7 +8,7 @@ import {
import type { BaseRetriever } from '@langchain/core/retrievers';
import { createStuffDocumentsChain } from 'langchain/chains/combine_documents';
import { createRetrievalChain } from 'langchain/chains/retrieval';
import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata, sleep } from 'n8n-workflow';
import { NodeConnectionTypes, NodeOperationError, parseErrorMetadata } from 'n8n-workflow';
import {
type INodeProperties,
type IExecuteFunctions,
@@ -177,31 +177,6 @@ export class ChainRetrievalQa implements INodeType {
},
},
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
],
},
],
@@ -211,20 +186,11 @@ export class ChainRetrievalQa implements INodeType {
this.logger.debug('Executing Retrieval QA Chain');
const items = this.getInputData();
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
const returnData: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchIndex) => {
const itemIndex = i + batchIndex;
// Run for each item
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const model = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel,
0,
@@ -300,47 +266,32 @@ export class ChainRetrievalQa implements INodeType {
// Execute the chain with tracing config
const tracingConfig = getTracingConfig(this);
const result = await retrievalChain
const response = await retrievalChain
.withConfig(tracingConfig)
.invoke({ input: query }, { signal: this.getExecutionCancelSignal() });
return result;
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, index) => {
if (response.status === 'rejected') {
const error = response.reason;
if (this.continueOnFail()) {
const metadata = parseErrorMetadata(error);
returnData.push({
json: { error: error.message },
pairedItem: { item: index },
metadata,
});
return;
} else {
throw error;
}
}
const output = response.value;
const answer: string = output.answer;
// Get the answer from the response
const answer: string = response.answer;
if (this.getNode().typeVersion >= 1.5) {
returnData.push({ json: { response: answer } });
} else {
// Legacy format for versions 1.4 and below is { text: string }
returnData.push({ json: { response: { text: answer } } });
}
});
} catch (error) {
if (this.continueOnFail()) {
const metadata = parseErrorMetadata(error);
returnData.push({
json: { error: error.message },
pairedItem: { item: itemIndex },
metadata,
});
continue;
}
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
throw error;
}
}
return [returnData];
}
}

View File

@@ -8,11 +8,6 @@ import { NodeConnectionTypes, NodeOperationError, UnexpectedError } from 'n8n-wo
import { ChainRetrievalQa } from '../ChainRetrievalQa.node';
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn(),
}));
const createExecuteFunctionsMock = (
parameters: IDataObject,
fakeLlm: BaseLanguageModel,
@@ -85,12 +80,7 @@ describe('ChainRetrievalQa', () => {
const params = {
promptType: 'define',
text: 'What is the capital of France?',
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
};
const result = await node.execute.call(
@@ -124,12 +114,7 @@ describe('ChainRetrievalQa', () => {
const params = {
promptType: 'define',
text: 'What is the capital of France?',
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
};
const result = await node.execute.call(
@@ -173,10 +158,6 @@ describe('ChainRetrievalQa', () => {
text: 'What is the capital of France?',
options: {
systemPromptTemplate: customSystemPrompt,
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
};
@@ -204,12 +185,7 @@ describe('ChainRetrievalQa', () => {
const params = {
promptType: 'define',
text: undefined, // undefined query
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
};
await expect(
@@ -235,12 +211,7 @@ describe('ChainRetrievalQa', () => {
const params = {
promptType: 'define',
text: 'What is the capital of France?',
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
};
// Override continueOnFail to return true

View File

@@ -1,6 +1,5 @@
import type { Document } from '@langchain/core/documents';
import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import type { ChainValues } from '@langchain/core/utils/types';
import type { TextSplitter } from '@langchain/textsplitters';
import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters';
import { loadSummarizationChain } from 'langchain/chains';
@@ -13,7 +12,7 @@ import type {
IDataObject,
INodeInputConfiguration,
} from 'n8n-workflow';
import { NodeConnectionTypes, sleep } from 'n8n-workflow';
import { NodeConnectionTypes } from 'n8n-workflow';
import { N8nBinaryLoader } from '@utils/N8nBinaryLoader';
import { N8nJsonLoader } from '@utils/N8nJsonLoader';
@@ -307,31 +306,6 @@ export class ChainSummarizationV2 implements INodeType {
},
],
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
],
},
],
@@ -350,19 +324,9 @@ export class ChainSummarizationV2 implements INodeType {
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchIndex) => {
const itemIndex = i + batchIndex;
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const model = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel,
0,
@@ -389,7 +353,6 @@ export class ChainSummarizationV2 implements INodeType {
const item = items[itemIndex];
let processedDocuments: Document[];
let output: ChainValues = {};
// Use dedicated document loader input to load documents
if (operationMode === 'documentLoader') {
@@ -405,9 +368,11 @@ export class ChainSummarizationV2 implements INodeType {
? await documentInput.processItem(item, itemIndex)
: documentInput;
output = await chain.withConfig(getTracingConfig(this)).invoke({
const response = await chain.withConfig(getTracingConfig(this)).invoke({
input_documents: processedDocuments,
});
returnData.push({ json: { response } });
}
// Take the input and use binary or json loader
@@ -447,37 +412,21 @@ export class ChainSummarizationV2 implements INodeType {
}
const processedItem = await processor.processItem(item, itemIndex);
output = await chain.invoke(
const response = await chain.invoke(
{
input_documents: processedItem,
},
{ signal: this.getExecutionCancelSignal() },
);
returnData.push({ json: { response } });
}
return output;
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, index) => {
if (response.status === 'rejected') {
const error = response.reason as Error;
if (this.continueOnFail()) {
returnData.push({
json: { error: error.message },
pairedItem: { item: i + index },
});
} else {
throw error;
}
} else {
const output = response.value;
returnData.push({ json: { output } });
} catch (error) {
if (this.continueOnFail()) {
returnData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } });
continue;
}
});
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
throw error;
}
}

View File

@@ -3,7 +3,7 @@ import { HumanMessage } from '@langchain/core/messages';
import { ChatPromptTemplate, SystemMessagePromptTemplate } from '@langchain/core/prompts';
import type { JSONSchema7 } from 'json-schema';
import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers';
import { jsonParse, NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow';
import { jsonParse, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import type {
INodeType,
INodeTypeDescription,
@@ -213,31 +213,6 @@ export class InformationExtractor implements INodeType {
rows: 6,
},
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting, but will impact the agents log output.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
],
},
],
@@ -245,13 +220,6 @@ export class InformationExtractor implements INodeType {
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
const llm = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel,
@@ -297,58 +265,38 @@ export class InformationExtractor implements INodeType {
}
const resultData: INodeExecutionData[] = [];
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
const input = this.getNodeParameter('text', itemIndex) as string;
const inputPrompt = new HumanMessage(input);
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const options = this.getNodeParameter('options', itemIndex, {}) as {
systemPromptTemplate?: string;
};
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${options.systemPromptTemplate ?? SYSTEM_PROMPT_TEMPLATE}
{format_instructions}`,
);
const input = this.getNodeParameter('text', itemIndex) as string;
const inputPrompt = new HumanMessage(input);
const messages = [
await systemPromptTemplate.format({
format_instructions: parser.getFormatInstructions(),
}),
inputPrompt,
];
const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
const options = this.getNodeParameter('options', itemIndex, {}) as {
systemPromptTemplate?: string;
};
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${options.systemPromptTemplate ?? SYSTEM_PROMPT_TEMPLATE}
{format_instructions}`,
);
const messages = [
await systemPromptTemplate.format({
format_instructions: parser.getFormatInstructions(),
}),
inputPrompt,
];
const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
return await chain.invoke(messages);
});
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, index) => {
if (response.status === 'rejected') {
const error = response.reason as Error;
if (this.continueOnFail()) {
resultData.push({
json: { error: response.reason as string },
pairedItem: { item: i + index },
});
return;
} else {
throw new NodeOperationError(this.getNode(), error.message);
}
}
const output = response.value;
try {
const output = await chain.invoke(messages);
resultData.push({ json: { output } });
});
} catch (error) {
if (this.continueOnFail()) {
resultData.push({ json: { error: error.message }, pairedItem: { item: itemIndex } });
continue;
}
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
throw error;
}
}

View File

@@ -7,11 +7,6 @@ import { makeZodSchemaFromAttributes } from '../helpers';
import { InformationExtractor } from '../InformationExtractor.node';
import type { AttributeDefinition } from '../types';
jest.mock('n8n-workflow', () => ({
...jest.requireActual('n8n-workflow'),
sleep: jest.fn(),
}));
const mockPersonAttributes: AttributeDefinition[] = [
{
name: 'name',
@@ -96,12 +91,7 @@ describe('InformationExtractor', () => {
attributes: {
attributes: mockPersonAttributes,
},
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
schemaType: 'fromAttributes',
},
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: 30 }) }),
@@ -121,12 +111,7 @@ describe('InformationExtractor', () => {
attributes: {
attributes: mockPersonAttributes,
},
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
schemaType: 'fromAttributes',
},
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }),
@@ -147,12 +132,7 @@ describe('InformationExtractor', () => {
attributes: {
attributes: mockPersonAttributesRequired,
},
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
schemaType: 'fromAttributes',
},
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John' }) }),
@@ -174,12 +154,7 @@ describe('InformationExtractor', () => {
attributes: {
attributes: mockPersonAttributes,
},
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
schemaType: 'fromAttributes',
},
new FakeLLM({ response: formatFakeLlmResponse({ name: 'John', age: '30' }) }),
@@ -200,12 +175,7 @@ describe('InformationExtractor', () => {
attributes: {
attributes: mockPersonAttributesRequired,
},
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
schemaType: 'fromAttributes',
},
new FakeListChatModel({
@@ -230,12 +200,7 @@ describe('InformationExtractor', () => {
attributes: {
attributes: mockPersonAttributesRequired,
},
options: {
batching: {
batchSize: 1,
delayBetweenBatches: 100,
},
},
options: {},
schemaType: 'fromAttributes',
},
new FakeListChatModel({

View File

@@ -2,7 +2,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import { HumanMessage } from '@langchain/core/messages';
import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts';
import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers';
import { NodeConnectionTypes, NodeOperationError, sleep } from 'n8n-workflow';
import { NodeConnectionTypes, NodeOperationError } from 'n8n-workflow';
import type {
IDataObject,
IExecuteFunctions,
@@ -131,31 +131,6 @@ export class SentimentAnalysis implements INodeType {
description:
'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)',
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
],
},
],
@@ -170,21 +145,12 @@ export class SentimentAnalysis implements INodeType {
)) as BaseLanguageModel;
const returnData: INodeExecutionData[][] = [];
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIndex = i + batchItemIndex;
for (let i = 0; i < items.length; i++) {
try {
const sentimentCategories = this.getNodeParameter(
'options.categories',
itemIndex,
i,
DEFAULT_CATEGORIES,
) as string;
@@ -194,13 +160,9 @@ export class SentimentAnalysis implements INodeType {
.filter(Boolean);
if (categories.length === 0) {
return {
result: null,
itemIndex,
error: new NodeOperationError(this.getNode(), 'No sentiment categories provided', {
itemIndex,
}),
};
throw new NodeOperationError(this.getNode(), 'No sentiment categories provided', {
itemIndex: i,
});
}
// Initialize returnData with empty arrays for each category
@@ -208,7 +170,7 @@ export class SentimentAnalysis implements INodeType {
returnData.push(...Array.from({ length: categories.length }, () => []));
}
const options = this.getNodeParameter('options', itemIndex, {}) as {
const options = this.getNodeParameter('options', i, {}) as {
systemPromptTemplate?: string;
includeDetailedResults?: boolean;
enableAutoFixing?: boolean;
@@ -232,10 +194,10 @@ export class SentimentAnalysis implements INodeType {
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${options.systemPromptTemplate ?? DEFAULT_SYSTEM_PROMPT_TEMPLATE}
{format_instructions}`,
{format_instructions}`,
);
const input = this.getNodeParameter('inputText', itemIndex) as string;
const input = this.getNodeParameter('inputText', i) as string;
const inputPrompt = new HumanMessage(input);
const messages = [
await systemPromptTemplate.format({
@@ -255,7 +217,7 @@ export class SentimentAnalysis implements INodeType {
);
if (sentimentIndex !== -1) {
const resultItem = { ...items[itemIndex] };
const resultItem = { ...items[i] };
const sentimentAnalysis: IDataObject = {
category: output.sentiment,
};
@@ -267,59 +229,27 @@ export class SentimentAnalysis implements INodeType {
...resultItem.json,
sentimentAnalysis,
};
return {
result: {
resultItem,
sentimentIndex,
},
itemIndex,
};
returnData[sentimentIndex].push(resultItem);
}
return {
result: {},
itemIndex,
};
} catch (error) {
return {
result: null,
itemIndex,
error: new NodeOperationError(
this.getNode(),
'Error during parsing of LLM output, please check your LLM model and configuration',
{
itemIndex,
},
),
};
throw new NodeOperationError(
this.getNode(),
'Error during parsing of LLM output, please check your LLM model and configuration',
{
itemIndex: i,
},
);
}
});
const batchResults = await Promise.all(batchPromises);
batchResults.forEach(({ result, itemIndex, error }) => {
if (error) {
if (this.continueOnFail()) {
const executionErrorData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: error.message }),
{ itemData: { item: itemIndex } },
);
returnData[0].push(...executionErrorData);
return;
} else {
throw error;
}
} else if (result.resultItem && result.sentimentIndex) {
const sentimentIndex = result.sentimentIndex;
const resultItem = result.resultItem;
returnData[sentimentIndex].push(resultItem);
} catch (error) {
if (this.continueOnFail()) {
const executionErrorData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: error.message }),
{ itemData: { item: i } },
);
returnData[0].push(...executionErrorData);
continue;
}
});
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
throw error;
}
}
return returnData;

View File

@@ -2,7 +2,7 @@ import type { BaseLanguageModel } from '@langchain/core/language_models/base';
import { HumanMessage } from '@langchain/core/messages';
import { SystemMessagePromptTemplate, ChatPromptTemplate } from '@langchain/core/prompts';
import { OutputFixingParser, StructuredOutputParser } from 'langchain/output_parsers';
import { NodeOperationError, NodeConnectionTypes, sleep } from 'n8n-workflow';
import { NodeOperationError, NodeConnectionTypes } from 'n8n-workflow';
import type {
IDataObject,
IExecuteFunctions,
@@ -158,31 +158,6 @@ export class TextClassifier implements INodeType {
description:
'Whether to enable auto-fixing (may trigger an additional LLM call if output is broken)',
},
{
displayName: 'Batch Processing',
name: 'batching',
type: 'collection',
description: 'Batch processing options for rate limiting',
default: {},
options: [
{
displayName: 'Batch Size',
name: 'batchSize',
default: 100,
type: 'number',
description:
'How many items to process in parallel. This is useful for rate limiting.',
},
{
displayName: 'Delay Between Batches',
name: 'delayBetweenBatches',
default: 0,
type: 'number',
description:
'Delay in milliseconds between batches. This is useful for rate limiting.',
},
],
},
],
},
],
@@ -190,13 +165,6 @@ export class TextClassifier implements INodeType {
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const { batchSize, delayBetweenBatches } = this.getNodeParameter('options.batching', 0, {
batchSize: 100,
delayBetweenBatches: 0,
}) as {
batchSize: number;
delayBetweenBatches: number;
};
const llm = (await this.getInputConnectionData(
NodeConnectionTypes.AiLanguageModel,
@@ -255,79 +223,68 @@ export class TextClassifier implements INodeType {
{ length: categories.length + (fallback === 'other' ? 1 : 0) },
(_) => [],
);
for (let itemIdx = 0; itemIdx < items.length; itemIdx++) {
const item = items[itemIdx];
item.pairedItem = { item: itemIdx };
const input = this.getNodeParameter('inputText', itemIdx) as string;
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchPromises = batch.map(async (_item, batchItemIndex) => {
const itemIdx = i + batchItemIndex;
const item = items[itemIdx];
item.pairedItem = { item: itemIdx };
const input = this.getNodeParameter('inputText', itemIdx) as string;
if (input === undefined || input === null) {
if (input === undefined || input === null) {
if (this.continueOnFail()) {
returnData[0].push({
json: { error: 'Text to classify is not defined' },
pairedItem: { item: itemIdx },
});
continue;
} else {
throw new NodeOperationError(
this.getNode(),
`Text to classify for item ${itemIdx} is not defined`,
);
}
}
const inputPrompt = new HumanMessage(input);
const inputPrompt = new HumanMessage(input);
const systemPromptTemplateOpt = this.getNodeParameter(
'options.systemPromptTemplate',
itemIdx,
SYSTEM_PROMPT_TEMPLATE,
) as string;
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${systemPromptTemplateOpt ?? SYSTEM_PROMPT_TEMPLATE}
{format_instructions}
${multiClassPrompt}
${fallbackPrompt}`,
);
const systemPromptTemplateOpt = this.getNodeParameter(
'options.systemPromptTemplate',
itemIdx,
SYSTEM_PROMPT_TEMPLATE,
) as string;
const systemPromptTemplate = SystemMessagePromptTemplate.fromTemplate(
`${systemPromptTemplateOpt ?? SYSTEM_PROMPT_TEMPLATE}
{format_instructions}
${multiClassPrompt}
${fallbackPrompt}`,
);
const messages = [
await systemPromptTemplate.format({
categories: categories.map((cat) => cat.category).join(', '),
format_instructions: parser.getFormatInstructions(),
}),
inputPrompt,
];
const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
const messages = [
await systemPromptTemplate.format({
categories: categories.map((cat) => cat.category).join(', '),
format_instructions: parser.getFormatInstructions(),
}),
inputPrompt,
];
const prompt = ChatPromptTemplate.fromMessages(messages);
const chain = prompt.pipe(llm).pipe(parser).withConfig(getTracingConfig(this));
return await chain.invoke(messages);
});
try {
const output = await chain.invoke(messages);
const batchResults = await Promise.allSettled(batchPromises);
batchResults.forEach((response, batchItemIndex) => {
const index = i + batchItemIndex;
if (response.status === 'rejected') {
const error = response.reason as Error;
if (this.continueOnFail()) {
returnData[0].push({
json: { error: error.message },
pairedItem: { item: index },
});
return;
} else {
throw new NodeOperationError(this.getNode(), error.message);
}
} else {
const output = response.value;
const item = items[index];
categories.forEach((cat, idx) => {
if (output[cat.category]) returnData[idx].push(item);
categories.forEach((cat, idx) => {
if (output[cat.category]) returnData[idx].push(item);
});
if (fallback === 'other' && output.fallback) returnData[returnData.length - 1].push(item);
} catch (error) {
if (this.continueOnFail()) {
returnData[0].push({
json: { error: error.message },
pairedItem: { item: itemIdx },
});
if (fallback === 'other' && output.fallback) returnData[returnData.length - 1].push(item);
continue;
}
});
// Add delay between batches if not the last batch
if (i + batchSize < items.length && delayBetweenBatches > 0) {
await sleep(delayBetweenBatches);
throw error;
}
}