fix: Handle AI errors better in builder (no-changelog) (#18406)

This commit is contained in:
Mutasem Aldmour
2025-08-20 13:50:53 +02:00
committed by GitHub
parent 51c867fb66
commit afaa0bec71
7 changed files with 742 additions and 40 deletions

View File

@@ -88,7 +88,7 @@ export class AiWorkflowBuilderService {
},
});
} catch (error) {
const llmError = new LLMServiceError('Failed to setup LLM models', {
const llmError = new LLMServiceError('Failed to connect to LLM Provider', {
cause: error,
tags: {
hasClient: !!this.client,

View File

@@ -0,0 +1,362 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { ToolMessage } from '@langchain/core/messages';
import { AIMessage, HumanMessage } from '@langchain/core/messages';
import type { MemorySaver } from '@langchain/langgraph';
import { GraphRecursionError } from '@langchain/langgraph';
import type { Logger } from '@n8n/backend-common';
import { mock } from 'jest-mock-extended';
import type { INodeTypeDescription } from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';
import { MAX_AI_BUILDER_PROMPT_LENGTH } from '@/constants';
import { ValidationError } from '@/errors';
import type { StreamOutput } from '@/types/streaming';
import { createStreamProcessor, formatMessages } from '@/utils/stream-processor';
import {
WorkflowBuilderAgent,
type WorkflowBuilderAgentConfig,
type ChatPayload,
} from '@/workflow-builder-agent';
jest.mock('@/tools/add-node.tool', () => ({
createAddNodeTool: jest.fn().mockReturnValue({ name: 'add_node' }),
}));
jest.mock('@/tools/connect-nodes.tool', () => ({
createConnectNodesTool: jest.fn().mockReturnValue({ name: 'connect_nodes' }),
}));
jest.mock('@/tools/node-details.tool', () => ({
createNodeDetailsTool: jest.fn().mockReturnValue({ name: 'node_details' }),
}));
jest.mock('@/tools/node-search.tool', () => ({
createNodeSearchTool: jest.fn().mockReturnValue({ name: 'node_search' }),
}));
jest.mock('@/tools/remove-node.tool', () => ({
createRemoveNodeTool: jest.fn().mockReturnValue({ name: 'remove_node' }),
}));
jest.mock('@/tools/update-node-parameters.tool', () => ({
createUpdateNodeParametersTool: jest.fn().mockReturnValue({ name: 'update_node_parameters' }),
}));
jest.mock('@/tools/prompts/main-agent.prompt', () => ({
mainAgentPrompt: {
invoke: jest.fn().mockResolvedValue('mocked prompt'),
},
}));
jest.mock('@/utils/operations-processor', () => ({
processOperations: jest.fn(),
}));
jest.mock('@/utils/stream-processor', () => ({
createStreamProcessor: jest.fn(),
formatMessages: jest.fn(),
}));
jest.mock('@/utils/tool-executor', () => ({
executeToolsInParallel: jest.fn(),
}));
jest.mock('@/chains/conversation-compact', () => ({
conversationCompactChain: jest.fn(),
}));
const mockRandomUUID = jest.fn();
Object.defineProperty(global, 'crypto', {
value: {
randomUUID: mockRandomUUID,
},
writable: true,
});
describe('WorkflowBuilderAgent', () => {
let agent: WorkflowBuilderAgent;
let mockLlmSimple: BaseChatModel;
let mockLlmComplex: BaseChatModel;
let mockLogger: Logger;
let mockCheckpointer: MemorySaver;
let parsedNodeTypes: INodeTypeDescription[];
let config: WorkflowBuilderAgentConfig;
const mockCreateStreamProcessor = createStreamProcessor as jest.MockedFunction<
typeof createStreamProcessor
>;
const mockFormatMessages = formatMessages as jest.MockedFunction<typeof formatMessages>;
beforeEach(() => {
mockLlmSimple = mock<BaseChatModel>({
_llmType: jest.fn().mockReturnValue('test-llm'),
bindTools: jest.fn().mockReturnThis(),
invoke: jest.fn(),
});
mockLlmComplex = mock<BaseChatModel>({
_llmType: jest.fn().mockReturnValue('test-llm-complex'),
bindTools: jest.fn().mockReturnThis(),
invoke: jest.fn(),
});
mockLogger = mock<Logger>({
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
});
mockCheckpointer = mock<MemorySaver>();
mockCheckpointer.getTuple = jest.fn();
mockCheckpointer.put = jest.fn();
mockCheckpointer.list = jest.fn();
parsedNodeTypes = [
{
name: 'TestNode',
displayName: 'Test Node',
description: 'A test node',
version: 1,
defaults: {},
inputs: [],
outputs: [],
properties: [],
group: ['transform'],
} as INodeTypeDescription,
];
config = {
parsedNodeTypes,
llmSimpleTask: mockLlmSimple,
llmComplexTask: mockLlmComplex,
logger: mockLogger,
checkpointer: mockCheckpointer,
};
agent = new WorkflowBuilderAgent(config);
});
describe('generateThreadId', () => {
beforeEach(() => {
mockRandomUUID.mockReset();
});
it('should generate thread ID with workflowId and userId', () => {
const workflowId = 'workflow-123';
const userId = 'user-456';
const threadId = WorkflowBuilderAgent.generateThreadId(workflowId, userId);
expect(threadId).toBe('workflow-workflow-123-user-user-456');
});
it('should generate thread ID with workflowId but without userId', () => {
const workflowId = 'workflow-123';
const threadId = WorkflowBuilderAgent.generateThreadId(workflowId);
expect(threadId).toMatch(/^workflow-workflow-123-user-\d+$/);
});
it('should generate random UUID when no workflowId provided', () => {
const mockUuid = 'test-uuid-1234-5678-9012';
mockRandomUUID.mockReturnValue(mockUuid);
const threadId = WorkflowBuilderAgent.generateThreadId();
expect(mockRandomUUID).toHaveBeenCalled();
expect(threadId).toBe(mockUuid);
});
});
describe('chat method', () => {
let mockPayload: ChatPayload;
beforeEach(() => {
mockPayload = {
message: 'Create a workflow',
workflowContext: {
currentWorkflow: { id: 'workflow-123' },
},
};
});
it('should throw ValidationError when message exceeds maximum length', async () => {
const longMessage = 'x'.repeat(MAX_AI_BUILDER_PROMPT_LENGTH + 1);
const payload: ChatPayload = {
message: longMessage,
};
await expect(async () => {
const generator = agent.chat(payload);
await generator.next();
}).rejects.toThrow(ValidationError);
expect(mockLogger.warn).toHaveBeenCalledWith('Message exceeds maximum length', {
messageLength: longMessage.length,
maxLength: MAX_AI_BUILDER_PROMPT_LENGTH,
});
});
it('should handle valid message length', async () => {
const validMessage = 'Create a simple workflow';
const payload: ChatPayload = {
message: validMessage,
};
// Mock the stream processing to return a proper StreamOutput
const mockStreamOutput: StreamOutput = {
messages: [
{
role: 'assistant',
type: 'message',
text: 'Processing...',
},
],
};
const mockAsyncGenerator = (async function* () {
yield mockStreamOutput;
})();
mockCreateStreamProcessor.mockReturnValue(mockAsyncGenerator);
// Mock the LLM to return a simple response
(mockLlmSimple.invoke as jest.Mock).mockResolvedValue({
content: 'Mocked response',
tool_calls: [],
});
const generator = agent.chat(payload);
const result = await generator.next();
expect(result.value).toEqual(mockStreamOutput);
});
it('should handle GraphRecursionError', async () => {
mockCreateStreamProcessor.mockImplementation(() => {
// eslint-disable-next-line require-yield
return (async function* () {
throw new GraphRecursionError('Recursion limit exceeded');
})();
});
await expect(async () => {
const generator = agent.chat(mockPayload);
await generator.next();
}).rejects.toThrow(ApplicationError);
});
it('should handle invalid request errors', async () => {
const invalidRequestError = Object.assign(new Error('Request failed'), {
error: {
error: {
type: 'invalid_request_error',
message: 'Invalid API request',
},
},
});
(mockLlmSimple.invoke as jest.Mock).mockRejectedValue(invalidRequestError);
await expect(async () => {
const generator = agent.chat(mockPayload);
await generator.next();
}).rejects.toThrow(ApplicationError);
});
it('should rethrow unknown errors', async () => {
const unknownError = new Error('Unknown error');
// Mock createStreamProcessor to throw an unknown error (not GraphRecursionError or abort)
mockCreateStreamProcessor.mockImplementation(() => {
// eslint-disable-next-line require-yield
return (async function* () {
throw unknownError;
})();
});
await expect(async () => {
const generator = agent.chat(mockPayload);
await generator.next();
}).rejects.toThrow(unknownError);
});
});
describe('getSessions', () => {
beforeEach(() => {
mockFormatMessages.mockImplementation(
(messages: Array<AIMessage | HumanMessage | ToolMessage>) =>
messages.map((m) => ({ type: m.constructor.name.toLowerCase(), content: m.content })),
);
});
it('should return session for existing workflowId', async () => {
const workflowId = 'workflow-123';
const userId = 'user-456';
const mockCheckpoint = {
checkpoint: {
channel_values: {
messages: [new HumanMessage('Hello'), new AIMessage('Hi there!')],
},
ts: '2023-12-01T12:00:00Z',
},
};
(mockCheckpointer.getTuple as jest.Mock).mockResolvedValue(mockCheckpoint);
const result = await agent.getSessions(workflowId, userId);
expect(result.sessions).toHaveLength(1);
expect(result.sessions[0]).toMatchObject({
sessionId: 'workflow-workflow-123-user-user-456',
lastUpdated: '2023-12-01T12:00:00Z',
});
expect(result.sessions[0].messages).toHaveLength(2);
});
it('should return empty sessions when workflowId is undefined', async () => {
const result = await agent.getSessions(undefined);
expect(result.sessions).toHaveLength(0);
expect(mockCheckpointer.getTuple).not.toHaveBeenCalled();
});
it('should return empty sessions when no checkpoint exists', async () => {
const workflowId = 'workflow-123';
(mockCheckpointer.getTuple as jest.Mock).mockRejectedValue(new Error('Thread not found'));
const result = await agent.getSessions(workflowId);
expect(result.sessions).toHaveLength(0);
expect(mockLogger.debug).toHaveBeenCalledWith('No session found for workflow:', {
workflowId,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
error: expect.any(Error),
});
});
it('should handle checkpoint without messages', async () => {
const workflowId = 'workflow-123';
const mockCheckpoint = {
checkpoint: {
channel_values: {},
ts: '2023-12-01T12:00:00Z',
},
};
(mockCheckpointer.getTuple as jest.Mock).mockResolvedValue(mockCheckpoint);
const result = await agent.getSessions(workflowId);
expect(result.sessions).toHaveLength(1);
expect(result.sessions[0].messages).toHaveLength(0);
});
it('should handle checkpoint with null messages', async () => {
const workflowId = 'workflow-123';
const mockCheckpoint = {
checkpoint: {
channel_values: {
messages: null,
},
ts: '2023-12-01T12:00:00Z',
},
};
(mockCheckpointer.getTuple as jest.Mock).mockResolvedValue(mockCheckpoint);
const result = await agent.getSessions(workflowId);
expect(result.sessions).toHaveLength(1);
expect(result.sessions[0].messages).toHaveLength(0);
});
});
});

View File

@@ -3,13 +3,14 @@ import type { ToolMessage } from '@langchain/core/messages';
import { AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages';
import type { RunnableConfig } from '@langchain/core/runnables';
import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain';
import { StateGraph, MemorySaver, END } from '@langchain/langgraph';
import { StateGraph, MemorySaver, END, GraphRecursionError } from '@langchain/langgraph';
import type { Logger } from '@n8n/backend-common';
import type {
INodeTypeDescription,
IRunExecutionData,
IWorkflowBase,
NodeExecutionSchema,
import {
ApplicationError,
type INodeTypeDescription,
type IRunExecutionData,
type IWorkflowBase,
type NodeExecutionSchema,
} from 'n8n-workflow';
import { workflowNameChain } from '@/chains/workflow-name';
@@ -148,7 +149,7 @@ export class WorkflowBuilderAgent {
};
const shouldContinue = ({ messages }: typeof WorkflowState.State) => {
const lastMessage = messages[messages.length - 1] as AIMessage;
const lastMessage: AIMessage = messages[messages.length - 1];
if (lastMessage.tool_calls?.length) {
return 'tools';
@@ -295,10 +296,26 @@ export class WorkflowBuilderAgent {
}
async *chat(payload: ChatPayload, userId?: string, abortSignal?: AbortSignal) {
// Check for the message maximum length
if (payload.message.length > MAX_AI_BUILDER_PROMPT_LENGTH) {
this.validateMessageLength(payload.message);
const { agent, threadConfig, streamConfig } = this.setupAgentAndConfigs(
payload,
userId,
abortSignal,
);
try {
const stream = await this.createAgentStream(payload, streamConfig, agent);
yield* this.processAgentStream(stream, agent, threadConfig);
} catch (error: unknown) {
this.handleStreamError(error);
}
}
private validateMessageLength(message: string): void {
if (message.length > MAX_AI_BUILDER_PROMPT_LENGTH) {
this.logger?.warn('Message exceeds maximum length', {
messageLength: payload.message.length,
messageLength: message.length,
maxLength: MAX_AI_BUILDER_PROMPT_LENGTH,
});
@@ -306,7 +323,9 @@ export class WorkflowBuilderAgent {
`Message exceeds maximum length of ${MAX_AI_BUILDER_PROMPT_LENGTH} characters`,
);
}
}
private setupAgentAndConfigs(payload: ChatPayload, userId?: string, abortSignal?: AbortSignal) {
const agent = this.createWorkflow().compile({ checkpointer: this.checkpointer });
const workflowId = payload.workflowContext?.currentWorkflow?.id;
// Generate thread ID from workflowId and userId
@@ -320,12 +339,20 @@ export class WorkflowBuilderAgent {
const streamConfig = {
...threadConfig,
streamMode: ['updates', 'custom'],
recursionLimit: 30,
recursionLimit: 50,
signal: abortSignal,
callbacks: this.tracer ? [this.tracer] : undefined,
} as RunnableConfig;
};
const stream = await agent.stream(
return { agent, threadConfig, streamConfig };
}
private async createAgentStream(
payload: ChatPayload,
streamConfig: RunnableConfig,
agent: ReturnType<ReturnType<typeof this.createWorkflow>['compile']>,
) {
return await agent.stream(
{
messages: [new HumanMessage({ content: payload.message })],
workflowJSON: this.getDefaultWorkflowJSON(payload),
@@ -334,39 +361,95 @@ export class WorkflowBuilderAgent {
},
streamConfig,
);
}
private handleStreamError(error: unknown): never {
const invalidRequestErrorMessage = this.getInvalidRequestError(error);
if (invalidRequestErrorMessage) {
throw new ValidationError(invalidRequestErrorMessage);
}
throw error;
}
private async *processAgentStream(
stream: AsyncGenerator<[string, unknown], void, unknown>,
agent: ReturnType<ReturnType<typeof this.createWorkflow>['compile']>,
threadConfig: RunnableConfig,
) {
try {
const streamProcessor = createStreamProcessor(stream);
for await (const output of streamProcessor) {
yield output;
}
} catch (error) {
if (
error &&
typeof error === 'object' &&
'message' in error &&
typeof error.message === 'string' &&
// This is naive, but it's all we get from LangGraph AbortError
['Abort', 'Aborted'].includes(error.message)
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const messages = (await agent.getState(threadConfig)).values.messages as Array<
AIMessage | HumanMessage | ToolMessage
>;
// Handle abort errors gracefully
const abortedAiMessage = new AIMessage({
content: '[Task aborted]',
id: crypto.randomUUID(),
});
// TODO: Should we clear tool calls that are in progress?
await agent.updateState(threadConfig, { messages: [...messages, abortedAiMessage] });
return;
}
throw error;
await this.handleAgentStreamError(error, agent, threadConfig);
}
}
private async handleAgentStreamError(
error: unknown,
agent: ReturnType<ReturnType<typeof this.createWorkflow>['compile']>,
threadConfig: RunnableConfig,
): Promise<void> {
if (
error &&
typeof error === 'object' &&
'message' in error &&
typeof error.message === 'string' &&
// This is naive, but it's all we get from LangGraph AbortError
['Abort', 'Aborted'].includes(error.message)
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const messages = (await agent.getState(threadConfig)).values.messages as Array<
AIMessage | HumanMessage | ToolMessage
>;
// Handle abort errors gracefully
const abortedAiMessage = new AIMessage({
content: '[Task aborted]',
id: crypto.randomUUID(),
});
// TODO: Should we clear tool calls that are in progress?
await agent.updateState(threadConfig, { messages: [...messages, abortedAiMessage] });
return;
}
// If it's not an abort error, check for GraphRecursionError
if (error instanceof GraphRecursionError) {
throw new ApplicationError(
'Workflow generation stopped: The AI reached the maximum number of steps while building your workflow. This usually means the workflow design became too complex or got stuck in a loop while trying to create the nodes and connections.',
);
}
// Re-throw any other errors
throw error;
}
private getInvalidRequestError(error: unknown): string | undefined {
if (
error instanceof Error &&
'error' in error &&
typeof error.error === 'object' &&
error.error
) {
const innerError = error.error;
if ('error' in innerError && typeof innerError.error === 'object' && innerError.error) {
const errorDetails = innerError.error;
if (
'type' in errorDetails &&
errorDetails.type === 'invalid_request_error' &&
'message' in errorDetails &&
typeof errorDetails.message === 'string'
) {
return errorDetails.message;
}
}
}
return undefined;
}
async getSessions(workflowId: string | undefined, userId?: string) {
// For now, we'll return the current session if we have a workflowId
// MemorySaver doesn't expose a way to list all threads, so we'll need to