mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 09:36:44 +00:00
chore: Remove AI WF Builder planner step (no-changelog) (#19343)
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
|
||||
|
||||
import { PLAN_APPROVAL_MESSAGE } from '../../src/constants';
|
||||
import type { SimpleWorkflow } from '../../src/types/workflow';
|
||||
import type { WorkflowBuilderAgent } from '../../src/workflow-builder-agent';
|
||||
import { evaluateWorkflow } from '../chains/workflow-evaluator';
|
||||
@@ -52,10 +51,7 @@ export async function runSingleTest(
|
||||
try {
|
||||
// Generate workflow
|
||||
const startTime = Date.now();
|
||||
// First generate plan
|
||||
await consumeGenerator(agent.chat(getChatPayload(testCase.prompt, testCase.id), userId));
|
||||
// Confirm plan
|
||||
await consumeGenerator(agent.chat(getChatPayload(PLAN_APPROVAL_MESSAGE, testCase.id), userId));
|
||||
const generationTime = Date.now() - startTime;
|
||||
|
||||
// Get generated workflow with validation
|
||||
|
||||
@@ -5,7 +5,6 @@ import type { INodeTypeDescription } from 'n8n-workflow';
|
||||
import pc from 'picocolors';
|
||||
|
||||
import { createLangsmithEvaluator } from './evaluator';
|
||||
import { PLAN_APPROVAL_MESSAGE } from '../../src/constants';
|
||||
import type { WorkflowState } from '../../src/workflow-state';
|
||||
import { setupTestEnvironment, createAgent } from '../core/environment';
|
||||
import {
|
||||
@@ -43,15 +42,9 @@ function createWorkflowGenerator(
|
||||
|
||||
// Create agent for this run
|
||||
const agent = createAgent(parsedNodeTypes, llm, tracer);
|
||||
|
||||
// First generate plan
|
||||
await consumeGenerator(
|
||||
agent.chat(getChatPayload(messageContent, runId), 'langsmith-eval-user'),
|
||||
);
|
||||
// Confirm plan
|
||||
await consumeGenerator(
|
||||
agent.chat(getChatPayload(PLAN_APPROVAL_MESSAGE, runId), 'langsmith-eval-user'),
|
||||
);
|
||||
|
||||
// Get generated workflow with validation
|
||||
const state = await agent.getState(runId, 'langsmith-eval-user');
|
||||
|
||||
@@ -1,333 +0,0 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
|
||||
|
||||
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
|
||||
import type { BaseMessage, ToolMessage } from '@langchain/core/messages';
|
||||
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
|
||||
import { DynamicStructuredTool } from '@langchain/core/tools';
|
||||
import { StateGraph, MessagesAnnotation, END, START } from '@langchain/langgraph';
|
||||
import { ToolNode } from '@langchain/langgraph/prebuilt';
|
||||
import { jsonParse, type INodeTypeDescription } from 'n8n-workflow';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { isAIMessage } from '@/types/langchain';
|
||||
|
||||
import { LLMServiceError, ToolExecutionError } from '../errors';
|
||||
import { createNodeDetailsTool } from '../tools/node-details.tool';
|
||||
import { createNodeSearchTool } from '../tools/node-search.tool';
|
||||
|
||||
const planNodeSchema = z.object({
|
||||
nodeType: z
|
||||
.string()
|
||||
.describe('The exact n8n node type identifier (e.g., "n8n-nodes-base.httpRequest")'),
|
||||
nodeName: z
|
||||
.string()
|
||||
.describe('A descriptive name for this node instance (e.g., "Get Weather Data")'),
|
||||
reasoning: z
|
||||
.string()
|
||||
.describe('Brief explanation of why this node is needed and what it will do'),
|
||||
});
|
||||
|
||||
const workflowPlanSchema = z.object({
|
||||
intro: z.string().describe('A concise summary of the workflow plan'),
|
||||
plan: z
|
||||
.array(planNodeSchema)
|
||||
.min(1)
|
||||
.describe('Ordered list of nodes that will be used to build the workflow'),
|
||||
});
|
||||
|
||||
const generateWorkflowPlanTool = new DynamicStructuredTool({
|
||||
name: 'generate_workflow_plan',
|
||||
description:
|
||||
'Create a structured plan of n8n nodes based on user requirements and available node information',
|
||||
schema: workflowPlanSchema,
|
||||
func: async (input) => {
|
||||
return input;
|
||||
},
|
||||
});
|
||||
|
||||
export type WorkflowPlanNode = z.infer<typeof planNodeSchema>;
|
||||
export type WorkflowPlan = z.infer<typeof workflowPlanSchema>;
|
||||
|
||||
const SYSTEM_PROMPT = `You are a Workflow Planning Assistant for n8n. Your task is to analyze user requests and create a detailed plan of n8n nodes that will be used to build the workflow.
|
||||
|
||||
## Your Process
|
||||
1. Analyze the user's request to understand what they want to automate
|
||||
2. Use the search_nodes tool to find relevant n8n nodes for each part of the workflow
|
||||
3. Use the get_node_details tool to understand specific nodes' capabilities
|
||||
4. Use the generate_workflow_plan tool to create the final structured plan
|
||||
|
||||
## Guidelines
|
||||
- Be thorough in your search - search for different keywords and concepts
|
||||
- Use exact node type identifiers (e.g., "n8n-nodes-base.httpRequest")
|
||||
- Order nodes logically from trigger/start to final output
|
||||
- Place sub-nodes (e.g., AI tools) immediately after their root node (e.g. AI Agent)
|
||||
- Only include nodes that directly fulfill the user's requirements
|
||||
- Consider data transformation needs between nodes
|
||||
- For AI workflows, search for AI-related nodes and sub-nodes
|
||||
- ALWAYS start with a trigger node and workflow configuration node (see Workflow Structure Requirements)
|
||||
|
||||
## Workflow Structure Requirements
|
||||
CRITICAL: Every workflow MUST follow this structure:
|
||||
|
||||
1. **First Node - Trigger (MANDATORY)**
|
||||
- Every workflow MUST start with a trigger node
|
||||
- Choose the appropriate trigger based on user intent (see Trigger Selection Logic)
|
||||
- If no trigger is specified, intelligently select the most appropriate one
|
||||
|
||||
2. **Second Node - Workflow Configuration (MANDATORY)**
|
||||
- ALWAYS add an Edit Fields (Set) node immediately after the trigger
|
||||
- Name it "Workflow Configuration"
|
||||
- This node serves as the main configuration point for the workflow
|
||||
- Downstream nodes will reference values from this node via expressions
|
||||
- This centralizes key workflow parameters and makes configuration easier
|
||||
|
||||
## Trigger Selection Logic
|
||||
Choose the trigger based on the workflow's purpose:
|
||||
|
||||
### Manual Trigger (n8n-nodes-base.manualTrigger)
|
||||
Use when:
|
||||
- User wants to test or debug the workflow
|
||||
- It's a one-time or ad-hoc process
|
||||
- User hasn't specified any trigger requirements
|
||||
- The workflow is for data processing or transformation tasks
|
||||
|
||||
### Chat Trigger (n8n-nodes-langchain.chatTrigger)
|
||||
Use when:
|
||||
- Building conversational AI or chatbot workflows
|
||||
- User mentions chat, conversation, or interactive communication
|
||||
- The workflow needs to respond to user messages
|
||||
- Building AI agents that interact with users
|
||||
|
||||
### Webhook Trigger (n8n-nodes-base.webhook)
|
||||
Use when:
|
||||
- Integrating with external systems or APIs
|
||||
- User mentions webhooks, API calls, or external events
|
||||
- The workflow needs to be triggered by external applications
|
||||
- Building automated responses to system events
|
||||
|
||||
## Search Strategy
|
||||
- Search for nodes by functionality (e.g., "email", "database", "api")
|
||||
- Search for specific service names mentioned by the user
|
||||
- For AI workflows, search for sub-nodes using connection types:
|
||||
- NodeConnectionTypes.AiLanguageModel for LLM providers
|
||||
- NodeConnectionTypes.AiTool for AI tools
|
||||
- NodeConnectionTypes.AiMemory for memory nodes
|
||||
- NodeConnectionTypes.AiEmbedding for embeddings
|
||||
- NodeConnectionTypes.AiVectorStore for vector stores
|
||||
|
||||
## Connection Parameter Rules
|
||||
When planning nodes, consider their connection requirements:
|
||||
|
||||
### Static vs Dynamic Nodes
|
||||
- **Static nodes** (standard inputs/outputs): HTTP Request, Set, Code
|
||||
- **Dynamic nodes** (parameter-dependent connections): AI nodes, Vector Stores, Document Loaders
|
||||
|
||||
### Dynamic Node Parameters That Affect Connections
|
||||
- AI Agent: hasOutputParser creates additional input for schema
|
||||
- Vector Store: mode parameter affects available connections (insert vs retrieve-as-tool)
|
||||
- Document Loader: textSplittingMode and dataType affect input structure
|
||||
|
||||
## AI Node Connection Patterns
|
||||
CRITICAL: AI sub-nodes PROVIDE capabilities, making them the SOURCE in connections:
|
||||
|
||||
### Main AI Connections
|
||||
- OpenAI Chat Model → AI Agent [ai_languageModel]
|
||||
- Calculator Tool → AI Agent [ai_tool]
|
||||
- Window Buffer Memory → AI Agent [ai_memory]
|
||||
- Token Splitter → Default Data Loader [ai_textSplitter]
|
||||
- Default Data Loader → Vector Store [ai_document]
|
||||
- Embeddings OpenAI → Vector Store [ai_embedding]
|
||||
|
||||
Why: Sub-nodes enhance main nodes with their capabilities
|
||||
|
||||
## RAG Workflow Pattern
|
||||
CRITICAL: For RAG (Retrieval-Augmented Generation) workflows, follow this specific pattern:
|
||||
|
||||
Main data flow:
|
||||
- Data source (e.g., HTTP Request) → Vector Store [main connection]
|
||||
- The Vector Store receives the actual data through its main input
|
||||
|
||||
AI capability connections:
|
||||
- Document Loader → Vector Store [ai_document] - provides document processing
|
||||
- Embeddings → Vector Store [ai_embedding] - provides embedding generation
|
||||
- Text Splitter → Document Loader [ai_textSplitter] - provides text chunking
|
||||
|
||||
Common mistake to avoid:
|
||||
- NEVER connect Document Loader to main data outputs
|
||||
- Document Loader is NOT a data processor in the main flow
|
||||
- Document Loader is an AI sub-node that gives Vector Store the ability to process documents
|
||||
|
||||
## Agent Node Distinction
|
||||
CRITICAL: Distinguish between two different agent node types:
|
||||
|
||||
1. **AI Agent** (n8n-nodes-langchain.agent)
|
||||
- Main workflow node that orchestrates AI tasks
|
||||
- Accepts inputs: trigger data, memory, tools, language models
|
||||
- Use for: Primary AI logic, chatbots, autonomous workflows
|
||||
|
||||
2. **AI Agent Tool** (n8n-nodes-langchain.agentTool)
|
||||
- Sub-node that acts as a tool for another AI Agent
|
||||
- Provides agent-as-a-tool capability to parent agents
|
||||
- Use for: Multi-agent systems where one agent calls another
|
||||
|
||||
Default assumption: When users ask for "an agent" or "AI agent", they mean the main AI Agent node unless they explicitly mention "tool", "sub-agent", or "agent for another agent".
|
||||
|
||||
## Output Format
|
||||
After searching and analyzing available nodes, use the generate_workflow_plan tool to create a structured plan with:
|
||||
- The exact node type to use
|
||||
- A descriptive name for the node instance
|
||||
- Clear reasoning for why this node is needed AND how it connects to other nodes
|
||||
- Consider connection requirements in your reasoning
|
||||
|
||||
Your plan MUST always include:
|
||||
1. An appropriate trigger node as the first node
|
||||
2. An Edit Fields (Set) node named "Workflow Configuration" as the second node
|
||||
3. All other nodes needed to fulfill the user's requirements
|
||||
|
||||
After using the generate_workflow_plan tool, only respond with a single word "DONE" to indicate the plan is complete.
|
||||
Remember: Be precise about node types, understand connection patterns, and always include trigger and configuration nodes.`;
|
||||
|
||||
function formatPlanFeedback(previousPlan: WorkflowPlan, feedback: string) {
|
||||
return `Previous plan: ${JSON.stringify(previousPlan, null, 2)}\n\nUser feedback: ${feedback}\n\nPlease adjust the plan based on the feedback.`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a workflow planner agent that can search for and analyze nodes
|
||||
*/
|
||||
export function createWorkflowPlannerAgent(llm: BaseChatModel, nodeTypes: INodeTypeDescription[]) {
|
||||
if (!llm.bindTools) {
|
||||
throw new LLMServiceError("LLM doesn't support binding tools", { llmModel: llm._llmType() });
|
||||
}
|
||||
|
||||
// Create the tools for the planner
|
||||
const tools = [
|
||||
createNodeSearchTool(nodeTypes).tool,
|
||||
createNodeDetailsTool(nodeTypes).tool,
|
||||
generateWorkflowPlanTool,
|
||||
];
|
||||
|
||||
// Create a ToolNode with our tools
|
||||
const toolNode = new ToolNode(tools);
|
||||
|
||||
// Bind tools to the LLM
|
||||
const modelWithTools = llm.bindTools(tools);
|
||||
|
||||
// Define the function that determines whether to continue
|
||||
const shouldContinue = (state: typeof MessagesAnnotation.State) => {
|
||||
const { messages } = state;
|
||||
const lastMessage = messages[messages.length - 1];
|
||||
|
||||
// Check if the last message has tool calls
|
||||
if (
|
||||
'tool_calls' in lastMessage &&
|
||||
Array.isArray(lastMessage.tool_calls) &&
|
||||
lastMessage.tool_calls?.length
|
||||
) {
|
||||
// Check if one of the tool calls is the final plan generation
|
||||
const hasPlanTool = lastMessage.tool_calls.some((tc) => tc.name === 'generate_workflow_plan');
|
||||
|
||||
if (hasPlanTool) {
|
||||
// If we're generating the plan, still need to execute the tool
|
||||
return 'tools';
|
||||
}
|
||||
|
||||
return 'tools';
|
||||
}
|
||||
return END;
|
||||
};
|
||||
|
||||
// Define the function that calls the model
|
||||
const callModel = async (state: typeof MessagesAnnotation.State) => {
|
||||
const { messages } = state;
|
||||
const response = await modelWithTools.invoke(messages);
|
||||
return { messages: [response] };
|
||||
};
|
||||
|
||||
// Build the graph
|
||||
const workflow = new StateGraph(MessagesAnnotation)
|
||||
.addNode('agent', callModel)
|
||||
.addNode('tools', toolNode)
|
||||
.addEdge(START, 'agent')
|
||||
.addConditionalEdges('agent', shouldContinue, ['tools', END])
|
||||
.addEdge('tools', 'agent');
|
||||
|
||||
const app = workflow.compile();
|
||||
|
||||
return {
|
||||
async plan(
|
||||
userRequest: string,
|
||||
previousPlan?: WorkflowPlan,
|
||||
feedback?: string,
|
||||
): Promise<
|
||||
| {
|
||||
plan: WorkflowPlan;
|
||||
toolMessages: BaseMessage[];
|
||||
}
|
||||
| { text: string }
|
||||
> {
|
||||
// Prepare the initial messages
|
||||
const systemMessage = new SystemMessage(SYSTEM_PROMPT);
|
||||
|
||||
let userMessage = userRequest;
|
||||
if (previousPlan && feedback) {
|
||||
userMessage += '\n\n';
|
||||
userMessage += formatPlanFeedback(previousPlan, feedback);
|
||||
}
|
||||
|
||||
const humanMessage = new HumanMessage(userMessage);
|
||||
|
||||
// Invoke the graph
|
||||
const result = await app.invoke({
|
||||
messages: [systemMessage, humanMessage],
|
||||
});
|
||||
|
||||
// Extract tools messages
|
||||
const toolMessages = result.messages.filter((msg) => {
|
||||
if (['system', 'human'].includes(msg.getType())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Do not include final AI message
|
||||
if (isAIMessage(msg) && (msg.tool_calls ?? []).length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
const workflowPlanToolCall = result.messages.findLast((msg): msg is ToolMessage => {
|
||||
return msg.name === 'generate_workflow_plan';
|
||||
});
|
||||
|
||||
if (!workflowPlanToolCall) {
|
||||
const lastAiMessage = result.messages.findLast((msg) => {
|
||||
return isAIMessage(msg) && (msg.tool_calls ?? []).length === 0;
|
||||
});
|
||||
|
||||
if (lastAiMessage) {
|
||||
return {
|
||||
text: lastAiMessage.text,
|
||||
};
|
||||
}
|
||||
|
||||
throw new ToolExecutionError('Invalid response from agent - no plan generated');
|
||||
}
|
||||
|
||||
try {
|
||||
if (typeof workflowPlanToolCall.content !== 'string') {
|
||||
throw new ToolExecutionError('Workflow plan tool call content is not a string');
|
||||
}
|
||||
|
||||
const workflowPlan = jsonParse<WorkflowPlan>(workflowPlanToolCall.content);
|
||||
return {
|
||||
plan: workflowPlan,
|
||||
toolMessages,
|
||||
};
|
||||
} catch (error) {
|
||||
throw new ToolExecutionError(
|
||||
`Failed to parse workflow plan: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -34,5 +34,3 @@ export const MAX_WORKFLOW_LENGTH_TOKENS = 30_000; // Tokens
|
||||
* Used for rough token count estimation from character counts.
|
||||
*/
|
||||
export const AVG_CHARS_PER_TOKEN_ANTHROPIC = 2.5;
|
||||
|
||||
export const PLAN_APPROVAL_MESSAGE = 'Proceed with the plan';
|
||||
|
||||
@@ -3,15 +3,13 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'
|
||||
import type { ToolMessage } from '@langchain/core/messages';
|
||||
import { AIMessage, HumanMessage } from '@langchain/core/messages';
|
||||
import type { MemorySaver } from '@langchain/langgraph';
|
||||
import { GraphRecursionError, Command } from '@langchain/langgraph';
|
||||
import { GraphRecursionError } from '@langchain/langgraph';
|
||||
import type { Logger } from '@n8n/backend-common';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import type { INodeTypeDescription } from 'n8n-workflow';
|
||||
import { ApplicationError } from 'n8n-workflow';
|
||||
|
||||
import type { WorkflowPlanNode } from '@/agents/workflow-planner-agent';
|
||||
import { createWorkflowPlannerAgent } from '@/agents/workflow-planner-agent';
|
||||
import { MAX_AI_BUILDER_PROMPT_LENGTH, PLAN_APPROVAL_MESSAGE } from '@/constants';
|
||||
import { MAX_AI_BUILDER_PROMPT_LENGTH } from '@/constants';
|
||||
import { ValidationError } from '@/errors';
|
||||
import type { StreamOutput } from '@/types/streaming';
|
||||
import { createStreamProcessor, formatMessages } from '@/utils/stream-processor';
|
||||
@@ -63,9 +61,6 @@ jest.mock('@/chains/conversation-compact', () => ({
|
||||
jest.mock('@/chains/workflow-name', () => ({
|
||||
workflowNameChain: jest.fn(),
|
||||
}));
|
||||
jest.mock('@/agents/workflow-planner-agent', () => ({
|
||||
createWorkflowPlannerAgent: jest.fn(),
|
||||
}));
|
||||
|
||||
const mockRandomUUID = jest.fn();
|
||||
Object.defineProperty(global, 'crypto', {
|
||||
@@ -370,420 +365,4 @@ describe('WorkflowBuilderAgent', () => {
|
||||
expect(result.sessions[0].messages).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Workflow Planning', () => {
|
||||
let mockPlannerAgent: ReturnType<typeof createWorkflowPlannerAgent>;
|
||||
const mockCreateWorkflowPlannerAgent = createWorkflowPlannerAgent as jest.MockedFunction<
|
||||
typeof createWorkflowPlannerAgent
|
||||
>;
|
||||
|
||||
// Helper function to mock stream processor with custom output
|
||||
const mockStreamProcessor = (output: StreamOutput | Error) => {
|
||||
if (output instanceof Error) {
|
||||
mockCreateStreamProcessor.mockImplementation(() => {
|
||||
// eslint-disable-next-line require-yield
|
||||
return (async function* () {
|
||||
throw output;
|
||||
})();
|
||||
});
|
||||
} else {
|
||||
mockCreateStreamProcessor.mockImplementation(() => {
|
||||
return (async function* () {
|
||||
yield output;
|
||||
})();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Helper function to run chat and collect results
|
||||
const runChatAndCollectResults = async (payload: ChatPayload) => {
|
||||
const generator = agent.chat(payload);
|
||||
const results = [];
|
||||
for await (const result of generator) {
|
||||
results.push(result);
|
||||
}
|
||||
return results;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
// Reset the mock stream processor for planning tests
|
||||
mockCreateStreamProcessor.mockReset();
|
||||
|
||||
mockPlannerAgent = {
|
||||
plan: jest.fn(),
|
||||
};
|
||||
mockCreateWorkflowPlannerAgent.mockReturnValue(mockPlannerAgent);
|
||||
});
|
||||
|
||||
describe('create_plan', () => {
|
||||
it('should create a workflow plan from user message', async () => {
|
||||
const payload: ChatPayload = {
|
||||
message: 'Create a workflow to process data',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Creating workflow plan...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const results = await runChatAndCollectResults(payload);
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]).toHaveProperty('messages');
|
||||
});
|
||||
|
||||
it('should handle planning errors gracefully', async () => {
|
||||
const payload: ChatPayload = {
|
||||
message: 'Create invalid workflow',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
mockStreamProcessor(new ValidationError('Invalid plan request'));
|
||||
|
||||
await expect(runChatAndCollectResults(payload)).rejects.toThrow(ValidationError);
|
||||
});
|
||||
});
|
||||
|
||||
describe('reviewPlan', () => {
|
||||
it('should handle plan approval via interrupt', async () => {
|
||||
const mockPlan: WorkflowPlanNode[] = [
|
||||
{
|
||||
nodeType: 'n8n-nodes-base.manualTrigger',
|
||||
nodeName: 'Manual Trigger',
|
||||
reasoning: 'Start the workflow manually',
|
||||
},
|
||||
];
|
||||
|
||||
const payload: ChatPayload = {
|
||||
message: PLAN_APPROVAL_MESSAGE,
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
const testAgent = new WorkflowBuilderAgent(config);
|
||||
|
||||
// Mock the agent with a pending interrupt
|
||||
const mockCompiledAgent = {
|
||||
stream: jest.fn().mockImplementation(async function* (input: unknown) {
|
||||
// If it's a Command with resume, it means plan was approved
|
||||
if (input instanceof Command && input.resume) {
|
||||
yield [
|
||||
'agent',
|
||||
{
|
||||
planStatus: 'approved',
|
||||
messages: [new AIMessage('Plan approved, executing...')],
|
||||
},
|
||||
];
|
||||
}
|
||||
}),
|
||||
getState: jest.fn().mockResolvedValue({
|
||||
values: {
|
||||
messages: [],
|
||||
workflowPlan: {
|
||||
intro: 'Test plan',
|
||||
plan: mockPlan,
|
||||
},
|
||||
},
|
||||
tasks: [
|
||||
{
|
||||
interrupts: [
|
||||
{
|
||||
value: {
|
||||
plan: mockPlan,
|
||||
message: 'Test plan',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
updateState: jest.fn(),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
jest.spyOn(testAgent as any, 'createWorkflow').mockReturnValue({
|
||||
compile: jest.fn().mockReturnValue(mockCompiledAgent),
|
||||
});
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Processing...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const generator = testAgent.chat(payload);
|
||||
const results = [];
|
||||
for await (const result of generator) {
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
// Verify that stream was called with a Command containing approval
|
||||
expect(mockCompiledAgent.stream).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
resume: {
|
||||
action: 'approve',
|
||||
feedback: undefined,
|
||||
},
|
||||
}),
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it('should handle plan rejection with feedback', async () => {
|
||||
const mockPlan: WorkflowPlanNode[] = [
|
||||
{
|
||||
nodeType: 'n8n-nodes-base.manualTrigger',
|
||||
nodeName: 'Manual Trigger',
|
||||
reasoning: 'Start the workflow manually',
|
||||
},
|
||||
];
|
||||
|
||||
const feedback = 'Please add error handling';
|
||||
const payload: ChatPayload = {
|
||||
message: feedback,
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
const testAgent = new WorkflowBuilderAgent(config);
|
||||
|
||||
// Mock the agent with a pending interrupt
|
||||
const mockCompiledAgent = {
|
||||
stream: jest.fn().mockImplementation(async function* (input: unknown) {
|
||||
// If it's a Command with resume and feedback, it means plan was rejected
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||
if (input instanceof Command && input.resume?.action === 'adjust') {
|
||||
yield [
|
||||
'adjustPlan',
|
||||
{
|
||||
planStatus: 'rejected',
|
||||
planFeedback: feedback,
|
||||
messages: [new HumanMessage(feedback)],
|
||||
},
|
||||
];
|
||||
}
|
||||
}),
|
||||
getState: jest.fn().mockResolvedValue({
|
||||
values: {
|
||||
messages: [],
|
||||
workflowPlan: {
|
||||
intro: 'Test plan',
|
||||
plan: mockPlan,
|
||||
},
|
||||
},
|
||||
tasks: [
|
||||
{
|
||||
interrupts: [
|
||||
{
|
||||
value: {
|
||||
plan: mockPlan,
|
||||
message: 'Test plan',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}),
|
||||
updateState: jest.fn(),
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
jest.spyOn(testAgent as any, 'createWorkflow').mockReturnValue({
|
||||
compile: jest.fn().mockReturnValue(mockCompiledAgent),
|
||||
});
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Processing...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const generator = testAgent.chat(payload);
|
||||
const results = [];
|
||||
for await (const result of generator) {
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
// Verify that stream was called with a Command containing rejection and feedback
|
||||
expect(mockCompiledAgent.stream).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
resume: {
|
||||
action: 'adjust',
|
||||
feedback,
|
||||
},
|
||||
}),
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('adjustPlan', () => {
|
||||
it('should adjust plan based on user feedback', async () => {
|
||||
const payload: ChatPayload = {
|
||||
message: 'Add error handling',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Adjusting plan with error handling...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const results = await runChatAndCollectResults(payload);
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0].messages).toBeDefined();
|
||||
expect(results[0].messages[0]).toHaveProperty('text');
|
||||
});
|
||||
|
||||
it('should remove previous plan tool messages when adjusting', async () => {
|
||||
const payload: ChatPayload = {
|
||||
message: 'Use webhook instead',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Adjusting plan to use webhook...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const results = await runChatAndCollectResults(payload);
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0].messages).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe('Plan state routing', () => {
|
||||
it('should route to createPlan when no plan exists', async () => {
|
||||
const payload: ChatPayload = {
|
||||
message: 'Build a workflow',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Creating plan...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const results = await runChatAndCollectResults(payload);
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]).toHaveProperty('messages');
|
||||
});
|
||||
|
||||
it('should route to agent when plan is approved', async () => {
|
||||
const payload: ChatPayload = {
|
||||
message: 'Continue building',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
mockStreamProcessor({
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Building workflow based on approved plan...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput);
|
||||
|
||||
const results = await runChatAndCollectResults(payload);
|
||||
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0]).toHaveProperty('messages');
|
||||
});
|
||||
});
|
||||
|
||||
describe('Interrupt handling', () => {
|
||||
it('should properly handle interrupt for plan review', async () => {
|
||||
// This test verifies that the interrupt mechanism is properly set up
|
||||
// The actual interrupt is handled by LangGraph, we just verify the setup
|
||||
const payload: ChatPayload = {
|
||||
message: 'Create workflow to fetch data',
|
||||
workflowContext: {
|
||||
currentWorkflow: { nodes: [] },
|
||||
},
|
||||
};
|
||||
|
||||
// Mock the stream processor to simulate interrupt handling
|
||||
mockCreateStreamProcessor.mockImplementation(() => {
|
||||
return (async function* () {
|
||||
yield {
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Creating plan for review...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput;
|
||||
|
||||
yield {
|
||||
messages: [
|
||||
{
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: 'Plan created, awaiting approval...',
|
||||
},
|
||||
],
|
||||
} as StreamOutput;
|
||||
})();
|
||||
});
|
||||
|
||||
const generator = agent.chat(payload);
|
||||
const results = [];
|
||||
for await (const result of generator) {
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
// Verify we got results from the stream
|
||||
expect(results.length).toBeGreaterThan(1);
|
||||
expect(results[0]).toHaveProperty('messages');
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { ChatPromptTemplate } from '@langchain/core/prompts';
|
||||
|
||||
import type { WorkflowPlan } from '../../agents/workflow-planner-agent';
|
||||
import { instanceUrlPrompt } from '../../chains/prompts/instance-url';
|
||||
|
||||
const systemPrompt = `You are an AI assistant specialized in creating and editing n8n workflows. Your goal is to help users build efficient, well-connected workflows by intelligently using the available tools.
|
||||
@@ -408,34 +407,6 @@ const previousConversationSummary = `
|
||||
{previousSummary}
|
||||
</previous_summary>`;
|
||||
|
||||
const workflowPlan = '{workflowPlan}';
|
||||
|
||||
export const planFormatter = (plan?: WorkflowPlan | null) => {
|
||||
if (!plan) return '<workflow_plan>EMPTY</workflow_plan>';
|
||||
|
||||
const nodesPlan = plan.plan.map((node) => {
|
||||
return `
|
||||
<workflow_plan_node>
|
||||
<type>${node.nodeType}</type>
|
||||
<name>${node.nodeName}</name>
|
||||
<reasoning>${node.reasoning}</reasoning>
|
||||
</workflow_plan_node>
|
||||
`;
|
||||
});
|
||||
|
||||
return `
|
||||
<workflow_plan>
|
||||
<workflow_plan_intro>
|
||||
${plan.intro}
|
||||
</workflow_plan_intro>
|
||||
|
||||
<workflow_plan_nodes>
|
||||
${nodesPlan.join('\n')}
|
||||
</workflow_plan_nodes>
|
||||
</workflow_plan>
|
||||
`;
|
||||
};
|
||||
|
||||
export const mainAgentPrompt = ChatPromptTemplate.fromMessages([
|
||||
[
|
||||
'system',
|
||||
@@ -471,11 +442,6 @@ export const mainAgentPrompt = ChatPromptTemplate.fromMessages([
|
||||
text: previousConversationSummary,
|
||||
cache_control: { type: 'ephemeral' },
|
||||
},
|
||||
{
|
||||
type: 'text',
|
||||
text: workflowPlan,
|
||||
cache_control: { type: 'ephemeral' },
|
||||
},
|
||||
],
|
||||
],
|
||||
['placeholder', '{messages}'],
|
||||
|
||||
@@ -35,15 +35,6 @@ export interface ExecutionRequestChunk {
|
||||
reason: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Plan chunk for streaming
|
||||
*/
|
||||
export interface PlanChunk {
|
||||
role: 'assistant';
|
||||
type: 'plan';
|
||||
plan: unknown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Union type for all stream chunks
|
||||
*/
|
||||
@@ -51,8 +42,7 @@ export type StreamChunk =
|
||||
| AgentMessageChunk
|
||||
| ToolProgressChunk
|
||||
| WorkflowUpdateChunk
|
||||
| ExecutionRequestChunk
|
||||
| PlanChunk;
|
||||
| ExecutionRequestChunk;
|
||||
|
||||
/**
|
||||
* Stream output containing messages
|
||||
|
||||
@@ -2,7 +2,6 @@ import { AIMessage, HumanMessage, ToolMessage } from '@langchain/core/messages';
|
||||
import type { ToolCall } from '@langchain/core/messages/tool';
|
||||
import type { DynamicStructuredTool } from '@langchain/core/tools';
|
||||
|
||||
import type { WorkflowPlan } from '../agents/workflow-planner-agent';
|
||||
import type {
|
||||
AgentMessageChunk,
|
||||
ToolProgressChunk,
|
||||
@@ -45,24 +44,6 @@ export function processStreamChunk(streamMode: string, chunk: unknown): StreamOu
|
||||
workflowJSON?: unknown;
|
||||
workflowOperations?: unknown;
|
||||
};
|
||||
create_plan?: {
|
||||
workflowPlan?: unknown;
|
||||
planStatus?: string;
|
||||
messages?: Array<{ content: string | Array<{ type: string; text: string }> }>;
|
||||
};
|
||||
review_plan?: {
|
||||
planStatus?: string;
|
||||
};
|
||||
adjust_plan?: {
|
||||
workflowPlan?: unknown;
|
||||
planStatus?: string;
|
||||
};
|
||||
__interrupt__?: Array<{
|
||||
value: unknown;
|
||||
resumable: boolean;
|
||||
ns: string[];
|
||||
when: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
if ((agentChunk?.delete_messages?.messages ?? []).length > 0) {
|
||||
@@ -117,40 +98,6 @@ export function processStreamChunk(streamMode: string, chunk: unknown): StreamOu
|
||||
}
|
||||
}
|
||||
|
||||
// Handle plan creation
|
||||
if (agentChunk?.create_plan?.workflowPlan) {
|
||||
const workflowPlan = agentChunk.create_plan.workflowPlan as WorkflowPlan;
|
||||
const planChunk = {
|
||||
role: 'assistant' as const,
|
||||
type: 'plan' as const,
|
||||
plan: workflowPlan.plan,
|
||||
message: workflowPlan.intro,
|
||||
};
|
||||
return { messages: [planChunk] };
|
||||
} else if ((agentChunk?.create_plan?.messages ?? []).length > 0) {
|
||||
// When planner didn't create a plan, but responded with a message
|
||||
const lastMessage =
|
||||
agentChunk.create_plan!.messages![agentChunk.create_plan!.messages!.length - 1];
|
||||
const messageChunk: AgentMessageChunk = {
|
||||
role: 'assistant',
|
||||
type: 'message',
|
||||
text: lastMessage.content as string,
|
||||
};
|
||||
|
||||
return { messages: [messageChunk] };
|
||||
}
|
||||
|
||||
if (agentChunk?.adjust_plan?.workflowPlan) {
|
||||
const workflowPlan = agentChunk.adjust_plan.workflowPlan as WorkflowPlan;
|
||||
const planChunk = {
|
||||
role: 'assistant' as const,
|
||||
type: 'plan' as const,
|
||||
plan: workflowPlan.plan,
|
||||
message: workflowPlan.intro,
|
||||
};
|
||||
return { messages: [planChunk] };
|
||||
}
|
||||
|
||||
// Handle process_operations updates - emit workflow update after operations are processed
|
||||
if (agentChunk?.process_operations) {
|
||||
// Check if operations were processed (indicated by cleared operations array)
|
||||
@@ -250,16 +197,6 @@ function createToolCallMessage(
|
||||
toolCall: ToolCall,
|
||||
builderTool?: BuilderTool,
|
||||
): Record<string, unknown> {
|
||||
if (toolCall.name === 'generate_workflow_plan') {
|
||||
const workflowPlan = toolCall.args as WorkflowPlan;
|
||||
return {
|
||||
role: 'assistant',
|
||||
type: 'plan',
|
||||
plan: workflowPlan.plan,
|
||||
message: workflowPlan.intro,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
id: toolCall.id,
|
||||
toolCallId: toolCall.id,
|
||||
|
||||
@@ -388,9 +388,6 @@ describe('operations-processor', () => {
|
||||
messages: [],
|
||||
workflowContext: {},
|
||||
previousSummary: 'EMPTY',
|
||||
workflowPlan: null,
|
||||
planStatus: null,
|
||||
planFeedback: null,
|
||||
});
|
||||
|
||||
it('should process operations and clear them', () => {
|
||||
|
||||
@@ -49,9 +49,6 @@ describe('tool-executor', () => {
|
||||
messages,
|
||||
workflowContext: {},
|
||||
previousSummary: 'EMPTY',
|
||||
planFeedback: null,
|
||||
planStatus: null,
|
||||
workflowPlan: null,
|
||||
});
|
||||
|
||||
// Helper to create mock tool
|
||||
|
||||
@@ -127,7 +127,7 @@ describe('trimWorkflowJSON', () => {
|
||||
type: 'n8n-nodes-base.test',
|
||||
parameters: {
|
||||
nullValue: null,
|
||||
// eslint-disable-next-line no-undefined
|
||||
|
||||
undefinedValue: undefined,
|
||||
validValue: 'test',
|
||||
},
|
||||
|
||||
@@ -1,15 +1,9 @@
|
||||
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
|
||||
import { ToolMessage, AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages';
|
||||
import type { ToolMessage } from '@langchain/core/messages';
|
||||
import { AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages';
|
||||
import type { RunnableConfig } from '@langchain/core/runnables';
|
||||
import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain';
|
||||
import {
|
||||
StateGraph,
|
||||
MemorySaver,
|
||||
END,
|
||||
GraphRecursionError,
|
||||
Command,
|
||||
interrupt,
|
||||
} from '@langchain/langgraph';
|
||||
import { StateGraph, MemorySaver, END, GraphRecursionError } from '@langchain/langgraph';
|
||||
import type { Logger } from '@n8n/backend-common';
|
||||
import {
|
||||
ApplicationError,
|
||||
@@ -23,12 +17,10 @@ import {
|
||||
DEFAULT_AUTO_COMPACT_THRESHOLD_TOKENS,
|
||||
MAX_AI_BUILDER_PROMPT_LENGTH,
|
||||
MAX_INPUT_TOKENS,
|
||||
PLAN_APPROVAL_MESSAGE,
|
||||
} from '@/constants';
|
||||
import { createGetNodeParameterTool } from '@/tools/get-node-parameter.tool';
|
||||
import { trimWorkflowJSON } from '@/utils/trim-workflow-context';
|
||||
|
||||
import { type WorkflowPlanNode, createWorkflowPlannerAgent } from './agents/workflow-planner-agent';
|
||||
import { conversationCompactChain } from './chains/conversation-compact';
|
||||
import { workflowNameChain } from './chains/workflow-name';
|
||||
import { LLMServiceError, ValidationError, WorkflowStateError } from './errors';
|
||||
@@ -36,7 +28,7 @@ import { createAddNodeTool } from './tools/add-node.tool';
|
||||
import { createConnectNodesTool } from './tools/connect-nodes.tool';
|
||||
import { createNodeDetailsTool } from './tools/node-details.tool';
|
||||
import { createNodeSearchTool } from './tools/node-search.tool';
|
||||
import { mainAgentPrompt, planFormatter } from './tools/prompts/main-agent.prompt';
|
||||
import { mainAgentPrompt } from './tools/prompts/main-agent.prompt';
|
||||
import { createRemoveNodeTool } from './tools/remove-node.tool';
|
||||
import { createUpdateNodeParametersTool } from './tools/update-node-parameters.tool';
|
||||
import type { SimpleWorkflow } from './types/workflow';
|
||||
@@ -129,7 +121,6 @@ export class WorkflowBuilderAgent {
|
||||
workflowJSON: trimWorkflowJSON(state.workflowJSON),
|
||||
executionData: state.workflowContext?.executionData ?? {},
|
||||
executionSchema: state.workflowContext?.executionSchema ?? [],
|
||||
workflowPlan: planFormatter(state.workflowPlan),
|
||||
instanceUrl: this.instanceUrl,
|
||||
});
|
||||
|
||||
@@ -165,145 +156,8 @@ export class WorkflowBuilderAgent {
|
||||
return tokensUsed > this.autoCompactThresholdTokens;
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a plan for the workflow based on user requirements
|
||||
*/
|
||||
const createPlan = async (state: typeof WorkflowState.State) => {
|
||||
const { messages } = state;
|
||||
const lastHumanMessage = messages.findLast((m) => m instanceof HumanMessage)!;
|
||||
|
||||
if (typeof lastHumanMessage.content !== 'string') {
|
||||
throw new ValidationError('Invalid message content for planning');
|
||||
}
|
||||
|
||||
// Create the planner agent with tools
|
||||
const plannerAgent = createWorkflowPlannerAgent(this.llmSimpleTask, this.parsedNodeTypes);
|
||||
|
||||
// Generate the workflow plan
|
||||
const plannerResult = await plannerAgent.plan(lastHumanMessage.content);
|
||||
|
||||
// If we got a structured plan, return it
|
||||
if ('plan' in plannerResult) {
|
||||
const { plan, toolMessages } = plannerResult;
|
||||
this.logger?.debug('Generated workflow plan: ' + JSON.stringify(plan, null, 2));
|
||||
|
||||
return {
|
||||
workflowPlan: plan,
|
||||
planStatus: 'pending' as const,
|
||||
messages: toolMessages,
|
||||
};
|
||||
}
|
||||
|
||||
// If we didn't get a plan, just return the text response
|
||||
this.logger?.debug('Planner returned text response: ' + plannerResult.text);
|
||||
return {
|
||||
messages: [
|
||||
new AIMessage({
|
||||
content: plannerResult.text,
|
||||
}),
|
||||
],
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Reviews the plan with the user for approval
|
||||
*/
|
||||
const reviewPlan = async (state: typeof WorkflowState.State) => {
|
||||
const { workflowPlan } = state;
|
||||
|
||||
if (!workflowPlan) {
|
||||
throw new ValidationError('No workflow plan to review');
|
||||
}
|
||||
|
||||
// Use interrupt to pause and show the plan to the user
|
||||
// The frontend will display the plan and wait for user action
|
||||
const userResponse = interrupt<
|
||||
{ plan: WorkflowPlanNode[]; message: string },
|
||||
{ action: 'approve' | 'adjust'; feedback?: string }
|
||||
>({
|
||||
plan: workflowPlan.plan,
|
||||
message: workflowPlan.intro,
|
||||
});
|
||||
|
||||
// Process the user's response
|
||||
if (userResponse.action === 'approve') {
|
||||
// User approved the plan, mark as approved and continue
|
||||
return {
|
||||
planStatus: 'approved' as const,
|
||||
};
|
||||
} else if (userResponse.action === 'adjust') {
|
||||
// User wants adjustments, add feedback and mark for adjustment
|
||||
return {
|
||||
planStatus: 'rejected' as const,
|
||||
planFeedback: userResponse.feedback ?? 'Please adjust the plan',
|
||||
};
|
||||
}
|
||||
|
||||
return {};
|
||||
};
|
||||
|
||||
/**
|
||||
* Adjusts the plan based on user feedback
|
||||
*/
|
||||
const adjustPlan = async (state: typeof WorkflowState.State) => {
|
||||
const { messages, planFeedback, workflowPlan } = state;
|
||||
const lastHumanMessage = messages.findLast((m) => m instanceof HumanMessage)!;
|
||||
|
||||
if (typeof lastHumanMessage.content !== 'string') {
|
||||
throw new ValidationError('Invalid message content for plan adjustment');
|
||||
}
|
||||
|
||||
// Create the planner agent with tools
|
||||
const plannerAgent = createWorkflowPlannerAgent(this.llmSimpleTask, this.parsedNodeTypes);
|
||||
|
||||
// Generate an adjusted plan with feedback
|
||||
const adjustedPlan = await plannerAgent.plan(
|
||||
lastHumanMessage.content,
|
||||
workflowPlan ?? undefined,
|
||||
planFeedback ?? undefined,
|
||||
);
|
||||
|
||||
// If we get a text response instead of a plan, just return that
|
||||
if ('text' in adjustedPlan) {
|
||||
return {
|
||||
messages: [
|
||||
new AIMessage({
|
||||
content: adjustedPlan.text,
|
||||
}),
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
// Remove previous plan tool messages to avoid confusion
|
||||
const filteredMessages = messages.map((m) => {
|
||||
if (m instanceof ToolMessage && m.name === 'generate_workflow_plan') {
|
||||
return new RemoveMessage({ id: m.id! });
|
||||
}
|
||||
|
||||
if (m instanceof AIMessage && m.tool_calls && m.tool_calls.length > 0) {
|
||||
const hasPlanCall = m.tool_calls.find((tc) => tc.name === 'generate_workflow_plan');
|
||||
if (hasPlanCall) {
|
||||
return new RemoveMessage({ id: m.id! });
|
||||
}
|
||||
}
|
||||
|
||||
return m;
|
||||
});
|
||||
|
||||
const planAdjustmentMessage = new HumanMessage({ content: planFeedback ?? '' });
|
||||
|
||||
this.logger?.debug('Adjusted workflow plan: ' + JSON.stringify(adjustedPlan, null, 2));
|
||||
|
||||
return {
|
||||
workflowPlan: adjustedPlan.plan,
|
||||
messages: [...filteredMessages, planAdjustmentMessage, ...adjustedPlan.toolMessages],
|
||||
planStatus: 'pending' as const,
|
||||
planFeedback: null,
|
||||
};
|
||||
};
|
||||
|
||||
const shouldModifyState = (state: typeof WorkflowState.State) => {
|
||||
const { messages, workflowContext, planStatus } = state;
|
||||
const { messages, workflowContext } = state;
|
||||
const lastHumanMessage = messages.findLast((m) => m instanceof HumanMessage)!; // There always should be at least one human message in the array
|
||||
|
||||
if (lastHumanMessage.content === '/compact') {
|
||||
@@ -324,11 +178,6 @@ export class WorkflowBuilderAgent {
|
||||
return 'auto_compact_messages';
|
||||
}
|
||||
|
||||
// If we don't have a plan yet, and the workflow is empty, create a plan
|
||||
if (!planStatus && workflowContext?.currentWorkflow?.nodes?.length === 0) {
|
||||
return 'create_plan';
|
||||
}
|
||||
|
||||
return 'agent';
|
||||
};
|
||||
|
||||
@@ -356,9 +205,6 @@ export class WorkflowBuilderAgent {
|
||||
connections: {},
|
||||
name: '',
|
||||
},
|
||||
workflowPlan: null,
|
||||
planStatus: null,
|
||||
planFeedback: null,
|
||||
};
|
||||
|
||||
return stateUpdate;
|
||||
@@ -447,24 +293,11 @@ export class WorkflowBuilderAgent {
|
||||
.addNode('compact_messages', compactSession)
|
||||
.addNode('auto_compact_messages', compactSession)
|
||||
.addNode('create_workflow_name', createWorkflowName)
|
||||
.addNode('create_plan', createPlan)
|
||||
.addNode('review_plan', reviewPlan)
|
||||
.addNode('adjust_plan', adjustPlan)
|
||||
.addConditionalEdges('__start__', shouldModifyState)
|
||||
// .addEdge('create_plan', 'review_plan')
|
||||
.addConditionalEdges('create_plan', (state) => {
|
||||
// If a plan was created, move to review, otherwise back to planning
|
||||
return state.workflowPlan ? 'review_plan' : END;
|
||||
})
|
||||
.addConditionalEdges('review_plan', (state) => {
|
||||
// Route based on the plan status after review
|
||||
return state.planStatus === 'approved' ? 'agent' : 'adjust_plan';
|
||||
})
|
||||
.addEdge('adjust_plan', 'review_plan')
|
||||
.addEdge('tools', 'process_operations')
|
||||
.addEdge('process_operations', 'agent')
|
||||
.addEdge('auto_compact_messages', 'agent')
|
||||
.addEdge('create_workflow_name', 'create_plan')
|
||||
.addEdge('create_workflow_name', 'agent')
|
||||
.addEdge('delete_messages', END)
|
||||
.addEdge('compact_messages', END)
|
||||
.addConditionalEdges('agent', shouldContinue);
|
||||
@@ -505,7 +338,7 @@ export class WorkflowBuilderAgent {
|
||||
);
|
||||
|
||||
try {
|
||||
const stream = await this.createAgentStream(payload, streamConfig, agent, threadConfig);
|
||||
const stream = await this.createAgentStream(payload, streamConfig, agent);
|
||||
yield* this.processAgentStream(stream, agent, threadConfig);
|
||||
} catch (error: unknown) {
|
||||
this.handleStreamError(error);
|
||||
@@ -551,33 +384,7 @@ export class WorkflowBuilderAgent {
|
||||
payload: ChatPayload,
|
||||
streamConfig: RunnableConfig,
|
||||
agent: ReturnType<ReturnType<typeof this.createWorkflow>['compile']>,
|
||||
threadConfig: RunnableConfig,
|
||||
) {
|
||||
const currentState = await agent.getState(threadConfig);
|
||||
|
||||
// Check if there are pending interrupts (e.g., plan review)
|
||||
const interruptedTask = currentState.tasks.find(
|
||||
(task) => task.interrupts && task.interrupts.length > 0,
|
||||
);
|
||||
|
||||
if (interruptedTask) {
|
||||
// We have a pending interrupt - likely a plan review
|
||||
|
||||
// Check if the message is an approval message, right now we only check for exact match
|
||||
// in the future we might want to use a LLM classifier for more flexibility
|
||||
const action = payload.message.trim() === PLAN_APPROVAL_MESSAGE ? 'approve' : 'adjust';
|
||||
|
||||
// Resume with the appropriate command
|
||||
const resumeCommand = new Command({
|
||||
resume: {
|
||||
action,
|
||||
feedback: action === 'adjust' ? payload.message : undefined,
|
||||
},
|
||||
});
|
||||
|
||||
return await agent.stream(resumeCommand, streamConfig);
|
||||
}
|
||||
|
||||
return await agent.stream(
|
||||
{
|
||||
messages: [new HumanMessage({ content: payload.message })],
|
||||
|
||||
@@ -2,7 +2,6 @@ import type { BaseMessage } from '@langchain/core/messages';
|
||||
import { HumanMessage } from '@langchain/core/messages';
|
||||
import { Annotation, messagesStateReducer } from '@langchain/langgraph';
|
||||
|
||||
import type { WorkflowPlan } from './agents/workflow-planner-agent';
|
||||
import type { SimpleWorkflow, WorkflowOperation } from './types/workflow';
|
||||
import type { ChatPayload } from './workflow-builder-agent';
|
||||
|
||||
@@ -76,21 +75,6 @@ export const WorkflowState = Annotation.Root({
|
||||
reducer: operationsReducer,
|
||||
default: () => [],
|
||||
}),
|
||||
// The planned workflow nodes
|
||||
workflowPlan: Annotation<WorkflowPlan | null>({
|
||||
reducer: (x, y) => y ?? x,
|
||||
default: () => null,
|
||||
}),
|
||||
// Status of the workflow plan
|
||||
planStatus: Annotation<'pending' | 'approved' | 'rejected' | null>({
|
||||
reducer: (x, y) => y ?? x,
|
||||
default: () => null,
|
||||
}),
|
||||
// User feedback on the plan
|
||||
planFeedback: Annotation<string | null>({
|
||||
reducer: (x, y) => y ?? x,
|
||||
default: () => null,
|
||||
}),
|
||||
// Latest workflow context
|
||||
workflowContext: Annotation<ChatPayload['workflowContext'] | undefined>({
|
||||
reducer: (x, y) => y ?? x,
|
||||
|
||||
Reference in New Issue
Block a user