diff --git a/packages/@n8n/ai-workflow-builder.ee/evaluations/core/test-runner.ts b/packages/@n8n/ai-workflow-builder.ee/evaluations/core/test-runner.ts index 9929d875fe..0864573316 100644 --- a/packages/@n8n/ai-workflow-builder.ee/evaluations/core/test-runner.ts +++ b/packages/@n8n/ai-workflow-builder.ee/evaluations/core/test-runner.ts @@ -1,11 +1,13 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import type { SimpleWorkflow } from '../../src/types/workflow.js'; -import type { WorkflowBuilderAgent, ChatPayload } from '../../src/workflow-builder-agent.js'; -import { evaluateWorkflow } from '../chains/workflow-evaluator.js'; -import type { EvaluationInput, EvaluationResult, TestCase } from '../types/evaluation.js'; -import { isWorkflowStateValues } from '../types/langsmith.js'; -import type { TestResult } from '../types/test-result.js'; +import { PLAN_APPROVAL_MESSAGE } from '../../src/constants'; +import type { SimpleWorkflow } from '../../src/types/workflow'; +import type { WorkflowBuilderAgent } from '../../src/workflow-builder-agent'; +import { evaluateWorkflow } from '../chains/workflow-evaluator'; +import type { EvaluationInput, EvaluationResult, TestCase } from '../types/evaluation'; +import { isWorkflowStateValues } from '../types/langsmith'; +import type { TestResult } from '../types/test-result'; +import { consumeGenerator, getChatPayload } from '../utils/evaluation-helpers'; /** * Creates an error result for a failed test @@ -48,19 +50,12 @@ export async function runSingleTest( userId: string = 'test-user', ): Promise { try { - const chatPayload: ChatPayload = { - message: testCase.prompt, - workflowContext: { - currentWorkflow: { id: testCase.id, nodes: [], connections: {} }, - }, - }; - // Generate workflow const startTime = Date.now(); - let messageCount = 0; - for await (const _output of agent.chat(chatPayload, userId)) { - messageCount++; - } + // First generate plan + await consumeGenerator(agent.chat(getChatPayload(testCase.prompt, testCase.id), userId)); + // Confirm plan + await consumeGenerator(agent.chat(getChatPayload(PLAN_APPROVAL_MESSAGE, testCase.id), userId)); const generationTime = Date.now() - startTime; // Get generated workflow with validation diff --git a/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/evaluator.ts b/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/evaluator.ts index c5ab6406f1..773b9e74ef 100644 --- a/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/evaluator.ts +++ b/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/evaluator.ts @@ -138,7 +138,8 @@ export function createLangsmithEvaluator( for (const metric of usageMetrics) { if (metric.value !== undefined) { - results.push({ key: metric.key, score: metric.value }); + // Langsmith has a limitation on large scores (>99999) so we track in thousands + results.push({ key: metric.key, score: metric.value / 1000 }); } } diff --git a/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/runner.ts b/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/runner.ts index 01da15d219..7ede6eb13d 100644 --- a/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/runner.ts +++ b/packages/@n8n/ai-workflow-builder.ee/evaluations/langsmith/runner.ts @@ -1,20 +1,20 @@ -import type { BaseChatModel } from '@langchain/core/language_models/chat_models.js'; -import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain.js'; +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; +import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain'; import { evaluate } from 'langsmith/evaluation'; import type { INodeTypeDescription } from 'n8n-workflow'; import pc from 'picocolors'; -import { createLangsmithEvaluator } from './evaluator.js'; -import type { ChatPayload } from '../../src/workflow-builder-agent.js'; -import type { WorkflowState } from '../../src/workflow-state.js'; -import { setupTestEnvironment, createAgent } from '../core/environment.js'; +import { createLangsmithEvaluator } from './evaluator'; +import { PLAN_APPROVAL_MESSAGE } from '../../src/constants'; +import type { WorkflowState } from '../../src/workflow-state'; +import { setupTestEnvironment, createAgent } from '../core/environment'; import { generateRunId, safeExtractUsage, isWorkflowStateValues, extractMessageContent, -} from '../types/langsmith.js'; -import { formatHeader } from '../utils/evaluation-helpers.js'; +} from '../types/langsmith'; +import { consumeGenerator, formatHeader, getChatPayload } from '../utils/evaluation-helpers'; /** * Creates a workflow generation function for Langsmith evaluation @@ -44,18 +44,14 @@ function createWorkflowGenerator( // Create agent for this run const agent = createAgent(parsedNodeTypes, llm, tracer); - const chatPayload: ChatPayload = { - message: messageContent, - workflowContext: { - currentWorkflow: { id: runId, nodes: [], connections: {} }, - }, - }; - - // Generate workflow - let messageCount = 0; - for await (const _output of agent.chat(chatPayload, 'langsmith-eval-user')) { - messageCount++; - } + // First generate plan + await consumeGenerator( + agent.chat(getChatPayload(messageContent, runId), 'langsmith-eval-user'), + ); + // Confirm plan + await consumeGenerator( + agent.chat(getChatPayload(PLAN_APPROVAL_MESSAGE, runId), 'langsmith-eval-user'), + ); // Get generated workflow with validation const state = await agent.getState(runId, 'langsmith-eval-user'); @@ -77,7 +73,7 @@ function createWorkflowGenerator( return { workflow: generatedWorkflow, - prompt: chatPayload.message, + prompt: messageContent, usage, }; }; diff --git a/packages/@n8n/ai-workflow-builder.ee/evaluations/utils/evaluation-helpers.ts b/packages/@n8n/ai-workflow-builder.ee/evaluations/utils/evaluation-helpers.ts index f9caf1208e..a4e754e356 100644 --- a/packages/@n8n/ai-workflow-builder.ee/evaluations/utils/evaluation-helpers.ts +++ b/packages/@n8n/ai-workflow-builder.ee/evaluations/utils/evaluation-helpers.ts @@ -7,10 +7,11 @@ import type { INodeTypeDescription } from 'n8n-workflow'; import { join } from 'path'; import pc from 'picocolors'; -import { anthropicClaudeSonnet4 } from '../../src/llm-config.js'; -import { WorkflowBuilderAgent } from '../../src/workflow-builder-agent.js'; -import type { Violation } from '../types/evaluation.js'; -import type { TestResult } from '../types/test-result.js'; +import { anthropicClaudeSonnet4 } from '../../src/llm-config'; +import type { ChatPayload } from '../../src/workflow-builder-agent'; +import { WorkflowBuilderAgent } from '../../src/workflow-builder-agent'; +import type { Violation } from '../types/evaluation'; +import type { TestResult } from '../types/test-result'; /** * Sets up the LLM with proper configuration @@ -268,3 +269,18 @@ export function saveEvaluationResults( return { reportPath, resultsPath }; } + +export async function consumeGenerator(gen: AsyncGenerator) { + for await (const _ of gen) { + /* consume all */ + } +} + +export function getChatPayload(message: string, id: string): ChatPayload { + return { + message, + workflowContext: { + currentWorkflow: { id, nodes: [], connections: {} }, + }, + }; +} diff --git a/packages/@n8n/ai-workflow-builder.ee/src/agents/workflow-planner-agent.ts b/packages/@n8n/ai-workflow-builder.ee/src/agents/workflow-planner-agent.ts new file mode 100644 index 0000000000..725bb1b0b2 --- /dev/null +++ b/packages/@n8n/ai-workflow-builder.ee/src/agents/workflow-planner-agent.ts @@ -0,0 +1,333 @@ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ + +import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; +import type { BaseMessage, ToolMessage } from '@langchain/core/messages'; +import { HumanMessage, SystemMessage } from '@langchain/core/messages'; +import { DynamicStructuredTool } from '@langchain/core/tools'; +import { StateGraph, MessagesAnnotation, END, START } from '@langchain/langgraph'; +import { ToolNode } from '@langchain/langgraph/prebuilt'; +import { jsonParse, type INodeTypeDescription } from 'n8n-workflow'; +import { z } from 'zod'; + +import { isAIMessage } from '@/types/langchain'; + +import { LLMServiceError, ToolExecutionError } from '../errors'; +import { createNodeDetailsTool } from '../tools/node-details.tool'; +import { createNodeSearchTool } from '../tools/node-search.tool'; + +const planNodeSchema = z.object({ + nodeType: z + .string() + .describe('The exact n8n node type identifier (e.g., "n8n-nodes-base.httpRequest")'), + nodeName: z + .string() + .describe('A descriptive name for this node instance (e.g., "Get Weather Data")'), + reasoning: z + .string() + .describe('Brief explanation of why this node is needed and what it will do'), +}); + +const workflowPlanSchema = z.object({ + intro: z.string().describe('A concise summary of the workflow plan'), + plan: z + .array(planNodeSchema) + .min(1) + .describe('Ordered list of nodes that will be used to build the workflow'), +}); + +const generateWorkflowPlanTool = new DynamicStructuredTool({ + name: 'generate_workflow_plan', + description: + 'Create a structured plan of n8n nodes based on user requirements and available node information', + schema: workflowPlanSchema, + func: async (input) => { + return input; + }, +}); + +export type WorkflowPlanNode = z.infer; +export type WorkflowPlan = z.infer; + +const SYSTEM_PROMPT = `You are a Workflow Planning Assistant for n8n. Your task is to analyze user requests and create a detailed plan of n8n nodes that will be used to build the workflow. + +## Your Process +1. Analyze the user's request to understand what they want to automate +2. Use the search_nodes tool to find relevant n8n nodes for each part of the workflow +3. Use the get_node_details tool to understand specific nodes' capabilities +4. Use the generate_workflow_plan tool to create the final structured plan + +## Guidelines +- Be thorough in your search - search for different keywords and concepts +- Use exact node type identifiers (e.g., "n8n-nodes-base.httpRequest") +- Order nodes logically from trigger/start to final output +- Place sub-nodes (e.g., AI tools) immediately after their root node (e.g. AI Agent) +- Only include nodes that directly fulfill the user's requirements +- Consider data transformation needs between nodes +- For AI workflows, search for AI-related nodes and sub-nodes +- ALWAYS start with a trigger node and workflow configuration node (see Workflow Structure Requirements) + +## Workflow Structure Requirements +CRITICAL: Every workflow MUST follow this structure: + +1. **First Node - Trigger (MANDATORY)** + - Every workflow MUST start with a trigger node + - Choose the appropriate trigger based on user intent (see Trigger Selection Logic) + - If no trigger is specified, intelligently select the most appropriate one + +2. **Second Node - Workflow Configuration (MANDATORY)** + - ALWAYS add an Edit Fields (Set) node immediately after the trigger + - Name it "Workflow Configuration" + - This node serves as the main configuration point for the workflow + - Downstream nodes will reference values from this node via expressions + - This centralizes key workflow parameters and makes configuration easier + +## Trigger Selection Logic +Choose the trigger based on the workflow's purpose: + +### Manual Trigger (n8n-nodes-base.manualTrigger) +Use when: +- User wants to test or debug the workflow +- It's a one-time or ad-hoc process +- User hasn't specified any trigger requirements +- The workflow is for data processing or transformation tasks + +### Chat Trigger (n8n-nodes-langchain.chatTrigger) +Use when: +- Building conversational AI or chatbot workflows +- User mentions chat, conversation, or interactive communication +- The workflow needs to respond to user messages +- Building AI agents that interact with users + +### Webhook Trigger (n8n-nodes-base.webhook) +Use when: +- Integrating with external systems or APIs +- User mentions webhooks, API calls, or external events +- The workflow needs to be triggered by external applications +- Building automated responses to system events + +## Search Strategy +- Search for nodes by functionality (e.g., "email", "database", "api") +- Search for specific service names mentioned by the user +- For AI workflows, search for sub-nodes using connection types: + - NodeConnectionTypes.AiLanguageModel for LLM providers + - NodeConnectionTypes.AiTool for AI tools + - NodeConnectionTypes.AiMemory for memory nodes + - NodeConnectionTypes.AiEmbedding for embeddings + - NodeConnectionTypes.AiVectorStore for vector stores + +## Connection Parameter Rules +When planning nodes, consider their connection requirements: + +### Static vs Dynamic Nodes +- **Static nodes** (standard inputs/outputs): HTTP Request, Set, Code +- **Dynamic nodes** (parameter-dependent connections): AI nodes, Vector Stores, Document Loaders + +### Dynamic Node Parameters That Affect Connections +- AI Agent: hasOutputParser creates additional input for schema +- Vector Store: mode parameter affects available connections (insert vs retrieve-as-tool) +- Document Loader: textSplittingMode and dataType affect input structure + +## AI Node Connection Patterns +CRITICAL: AI sub-nodes PROVIDE capabilities, making them the SOURCE in connections: + +### Main AI Connections +- OpenAI Chat Model → AI Agent [ai_languageModel] +- Calculator Tool → AI Agent [ai_tool] +- Window Buffer Memory → AI Agent [ai_memory] +- Token Splitter → Default Data Loader [ai_textSplitter] +- Default Data Loader → Vector Store [ai_document] +- Embeddings OpenAI → Vector Store [ai_embedding] + +Why: Sub-nodes enhance main nodes with their capabilities + +## RAG Workflow Pattern +CRITICAL: For RAG (Retrieval-Augmented Generation) workflows, follow this specific pattern: + +Main data flow: +- Data source (e.g., HTTP Request) → Vector Store [main connection] +- The Vector Store receives the actual data through its main input + +AI capability connections: +- Document Loader → Vector Store [ai_document] - provides document processing +- Embeddings → Vector Store [ai_embedding] - provides embedding generation +- Text Splitter → Document Loader [ai_textSplitter] - provides text chunking + +Common mistake to avoid: +- NEVER connect Document Loader to main data outputs +- Document Loader is NOT a data processor in the main flow +- Document Loader is an AI sub-node that gives Vector Store the ability to process documents + +## Agent Node Distinction +CRITICAL: Distinguish between two different agent node types: + +1. **AI Agent** (n8n-nodes-langchain.agent) + - Main workflow node that orchestrates AI tasks + - Accepts inputs: trigger data, memory, tools, language models + - Use for: Primary AI logic, chatbots, autonomous workflows + +2. **AI Agent Tool** (n8n-nodes-langchain.agentTool) + - Sub-node that acts as a tool for another AI Agent + - Provides agent-as-a-tool capability to parent agents + - Use for: Multi-agent systems where one agent calls another + +Default assumption: When users ask for "an agent" or "AI agent", they mean the main AI Agent node unless they explicitly mention "tool", "sub-agent", or "agent for another agent". + +## Output Format +After searching and analyzing available nodes, use the generate_workflow_plan tool to create a structured plan with: +- The exact node type to use +- A descriptive name for the node instance +- Clear reasoning for why this node is needed AND how it connects to other nodes +- Consider connection requirements in your reasoning + +Your plan MUST always include: +1. An appropriate trigger node as the first node +2. An Edit Fields (Set) node named "Workflow Configuration" as the second node +3. All other nodes needed to fulfill the user's requirements + +After using the generate_workflow_plan tool, only respond with a single word "DONE" to indicate the plan is complete. +Remember: Be precise about node types, understand connection patterns, and always include trigger and configuration nodes.`; + +function formatPlanFeedback(previousPlan: WorkflowPlan, feedback: string) { + return `Previous plan: ${JSON.stringify(previousPlan, null, 2)}\n\nUser feedback: ${feedback}\n\nPlease adjust the plan based on the feedback.`; +} + +/** + * Creates a workflow planner agent that can search for and analyze nodes + */ +export function createWorkflowPlannerAgent(llm: BaseChatModel, nodeTypes: INodeTypeDescription[]) { + if (!llm.bindTools) { + throw new LLMServiceError("LLM doesn't support binding tools", { llmModel: llm._llmType() }); + } + + // Create the tools for the planner + const tools = [ + createNodeSearchTool(nodeTypes).tool, + createNodeDetailsTool(nodeTypes).tool, + generateWorkflowPlanTool, + ]; + + // Create a ToolNode with our tools + const toolNode = new ToolNode(tools); + + // Bind tools to the LLM + const modelWithTools = llm.bindTools(tools); + + // Define the function that determines whether to continue + const shouldContinue = (state: typeof MessagesAnnotation.State) => { + const { messages } = state; + const lastMessage = messages[messages.length - 1]; + + // Check if the last message has tool calls + if ( + 'tool_calls' in lastMessage && + Array.isArray(lastMessage.tool_calls) && + lastMessage.tool_calls?.length + ) { + // Check if one of the tool calls is the final plan generation + const hasPlanTool = lastMessage.tool_calls.some((tc) => tc.name === 'generate_workflow_plan'); + + if (hasPlanTool) { + // If we're generating the plan, still need to execute the tool + return 'tools'; + } + + return 'tools'; + } + return END; + }; + + // Define the function that calls the model + const callModel = async (state: typeof MessagesAnnotation.State) => { + const { messages } = state; + const response = await modelWithTools.invoke(messages); + return { messages: [response] }; + }; + + // Build the graph + const workflow = new StateGraph(MessagesAnnotation) + .addNode('agent', callModel) + .addNode('tools', toolNode) + .addEdge(START, 'agent') + .addConditionalEdges('agent', shouldContinue, ['tools', END]) + .addEdge('tools', 'agent'); + + const app = workflow.compile(); + + return { + async plan( + userRequest: string, + previousPlan?: WorkflowPlan, + feedback?: string, + ): Promise< + | { + plan: WorkflowPlan; + toolMessages: BaseMessage[]; + } + | { text: string } + > { + // Prepare the initial messages + const systemMessage = new SystemMessage(SYSTEM_PROMPT); + + let userMessage = userRequest; + if (previousPlan && feedback) { + userMessage += '\n\n'; + userMessage += formatPlanFeedback(previousPlan, feedback); + } + + const humanMessage = new HumanMessage(userMessage); + + // Invoke the graph + const result = await app.invoke({ + messages: [systemMessage, humanMessage], + }); + + // Extract tools messages + const toolMessages = result.messages.filter((msg) => { + if (['system', 'human'].includes(msg.getType())) { + return false; + } + + // Do not include final AI message + if (isAIMessage(msg) && (msg.tool_calls ?? []).length === 0) { + return false; + } + + return true; + }); + + const workflowPlanToolCall = result.messages.findLast((msg): msg is ToolMessage => { + return msg.name === 'generate_workflow_plan'; + }); + + if (!workflowPlanToolCall) { + const lastAiMessage = result.messages.findLast((msg) => { + return isAIMessage(msg) && (msg.tool_calls ?? []).length === 0; + }); + + if (lastAiMessage) { + return { + text: lastAiMessage.text, + }; + } + + throw new ToolExecutionError('Invalid response from agent - no plan generated'); + } + + try { + if (typeof workflowPlanToolCall.content !== 'string') { + throw new ToolExecutionError('Workflow plan tool call content is not a string'); + } + + const workflowPlan = jsonParse(workflowPlanToolCall.content); + return { + plan: workflowPlan, + toolMessages, + }; + } catch (error) { + throw new ToolExecutionError( + `Failed to parse workflow plan: ${error instanceof Error ? error.message : 'Unknown error'}`, + ); + } + }, + }; +} diff --git a/packages/@n8n/ai-workflow-builder.ee/src/chains/planner.ts b/packages/@n8n/ai-workflow-builder.ee/src/chains/planner.ts deleted file mode 100644 index a0b375d243..0000000000 --- a/packages/@n8n/ai-workflow-builder.ee/src/chains/planner.ts +++ /dev/null @@ -1,102 +0,0 @@ -import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import type { AIMessageChunk } from '@langchain/core/messages'; -import { SystemMessage } from '@langchain/core/messages'; -import { ChatPromptTemplate, HumanMessagePromptTemplate } from '@langchain/core/prompts'; -import { DynamicStructuredTool } from '@langchain/core/tools'; -import { z } from 'zod'; - -import { LLMServiceError } from '../errors'; - -export const plannerPrompt = new SystemMessage( - `You are a Workflow Planner for n8n, a platform that helps users automate processes across different services and APIs. - -## Your Task -Convert user requests into clear, sequential workflow steps that can be implemented with n8n nodes. ONLY include steps that are explicitly stated or directly implied in the user request. - -## Guidelines -1. Analyze the user request to understand their end goal and required process -2. Break down the automation into logical steps based on complexity - simpler workflows need fewer steps, complex ones may need more -3. Focus ONLY on actions mentioned directly in the user prompt -4. Create steps that can be mapped to n8n nodes later -5. Order steps sequentially from trigger to final action -6. Be specific about data transformations needed ONLY if mentioned in the request -7. NEVER add extra steps like storing data or sending notifications unless explicitly requested -8. Only recommend raw HTTP requests if you think there isn't a suitable n8n node - -## CRITICAL REQUIREMENTS -- DO NOT add any steps not directly mentioned or implied in the user request -- DO NOT assume the user wants to store data in a database unless explicitly stated -- DO NOT assume the user wants to send notifications or emails unless explicitly stated -- DO NOT add any "nice to have" steps that aren't clearly part of the user's request -- Keep the workflow EXACTLY focused on what was requested, nothing more - -## Output Format -Return ONLY a JSON object with this structure: -\`\`\`json -{ - "steps": [ - "[Brief action-oriented description]", - "[Brief action-oriented description]", - ... - ] -} -\`\`\` - -## Examples of Good Step Descriptions -- "Trigger when a new email arrives in Gmail inbox" -- "Filter emails to only include those with attachments" -- "Extract data from CSV attachments" -- "Transform data to required format for the API" -- "Send HTTP request to external API with extracted data" -- "Post success message to Slack channel" - -IMPORTANT: Do not include HTML tags, markdown formatting, or explanations outside the JSON.`, -); - -const planSchema = z.object({ - steps: z - .array( - z - .string() - .describe( - 'A clear, action-oriented description of a single workflow step. Do not include "Step N" or similar, just the action', - ), - ) - .min(1) - .describe( - 'An ordered list of workflow steps that, when implemented, will fulfill the user request. Each step should be concise, action-oriented, and implementable with n8n nodes.', - ), -}); - -const generatePlanTool = new DynamicStructuredTool({ - name: 'generate_plan', - description: - 'Convert a user workflow request into a logical sequence of clear, achievable steps that can be implemented with n8n nodes.', - schema: planSchema, - func: async (input) => { - return { steps: input.steps }; - }, -}); - -const humanTemplate = '{prompt}'; -const chatPrompt = ChatPromptTemplate.fromMessages([ - plannerPrompt, - HumanMessagePromptTemplate.fromTemplate(humanTemplate), -]); - -export const plannerChain = (llm: BaseChatModel) => { - if (!llm.bindTools) { - throw new LLMServiceError("LLM doesn't support binding tools", { llmModel: llm._llmType() }); - } - - return chatPrompt - .pipe( - llm.bindTools([generatePlanTool], { - tool_choice: generatePlanTool.name, - }), - ) - .pipe((x: AIMessageChunk) => { - const toolCall = x.tool_calls?.[0]; - return (toolCall?.args as z.infer).steps; - }); -}; diff --git a/packages/@n8n/ai-workflow-builder.ee/src/constants.ts b/packages/@n8n/ai-workflow-builder.ee/src/constants.ts index 66b7f7c258..7725dbfe0a 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/constants.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/constants.ts @@ -34,3 +34,5 @@ export const MAX_WORKFLOW_LENGTH_TOKENS = 30_000; // Tokens * Used for rough token count estimation from character counts. */ export const AVG_CHARS_PER_TOKEN_ANTHROPIC = 2.5; + +export const PLAN_APPROVAL_MESSAGE = 'Proceed with the plan'; diff --git a/packages/@n8n/ai-workflow-builder.ee/src/test/workflow-builder-agent.test.ts b/packages/@n8n/ai-workflow-builder.ee/src/test/workflow-builder-agent.test.ts index 8e385582ce..c7c3cb49af 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/test/workflow-builder-agent.test.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/test/workflow-builder-agent.test.ts @@ -1,14 +1,17 @@ +/* eslint-disable @typescript-eslint/require-await */ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import type { ToolMessage } from '@langchain/core/messages'; import { AIMessage, HumanMessage } from '@langchain/core/messages'; import type { MemorySaver } from '@langchain/langgraph'; -import { GraphRecursionError } from '@langchain/langgraph'; +import { GraphRecursionError, Command } from '@langchain/langgraph'; import type { Logger } from '@n8n/backend-common'; import { mock } from 'jest-mock-extended'; import type { INodeTypeDescription } from 'n8n-workflow'; import { ApplicationError } from 'n8n-workflow'; -import { MAX_AI_BUILDER_PROMPT_LENGTH } from '@/constants'; +import type { WorkflowPlanNode } from '@/agents/workflow-planner-agent'; +import { createWorkflowPlannerAgent } from '@/agents/workflow-planner-agent'; +import { MAX_AI_BUILDER_PROMPT_LENGTH, PLAN_APPROVAL_MESSAGE } from '@/constants'; import { ValidationError } from '@/errors'; import type { StreamOutput } from '@/types/streaming'; import { createStreamProcessor, formatMessages } from '@/utils/stream-processor'; @@ -57,6 +60,12 @@ jest.mock('@/utils/tool-executor', () => ({ jest.mock('@/chains/conversation-compact', () => ({ conversationCompactChain: jest.fn(), })); +jest.mock('@/chains/workflow-name', () => ({ + workflowNameChain: jest.fn(), +})); +jest.mock('@/agents/workflow-planner-agent', () => ({ + createWorkflowPlannerAgent: jest.fn(), +})); const mockRandomUUID = jest.fn(); Object.defineProperty(global, 'crypto', { @@ -361,4 +370,420 @@ describe('WorkflowBuilderAgent', () => { expect(result.sessions[0].messages).toHaveLength(0); }); }); + + describe('Workflow Planning', () => { + let mockPlannerAgent: ReturnType; + const mockCreateWorkflowPlannerAgent = createWorkflowPlannerAgent as jest.MockedFunction< + typeof createWorkflowPlannerAgent + >; + + // Helper function to mock stream processor with custom output + const mockStreamProcessor = (output: StreamOutput | Error) => { + if (output instanceof Error) { + mockCreateStreamProcessor.mockImplementation(() => { + // eslint-disable-next-line require-yield + return (async function* () { + throw output; + })(); + }); + } else { + mockCreateStreamProcessor.mockImplementation(() => { + return (async function* () { + yield output; + })(); + }); + } + }; + + // Helper function to run chat and collect results + const runChatAndCollectResults = async (payload: ChatPayload) => { + const generator = agent.chat(payload); + const results = []; + for await (const result of generator) { + results.push(result); + } + return results; + }; + + beforeEach(() => { + // Reset the mock stream processor for planning tests + mockCreateStreamProcessor.mockReset(); + + mockPlannerAgent = { + plan: jest.fn(), + }; + mockCreateWorkflowPlannerAgent.mockReturnValue(mockPlannerAgent); + }); + + describe('create_plan', () => { + it('should create a workflow plan from user message', async () => { + const payload: ChatPayload = { + message: 'Create a workflow to process data', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Creating workflow plan...', + }, + ], + } as StreamOutput); + + const results = await runChatAndCollectResults(payload); + + expect(results).toHaveLength(1); + expect(results[0]).toHaveProperty('messages'); + }); + + it('should handle planning errors gracefully', async () => { + const payload: ChatPayload = { + message: 'Create invalid workflow', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + mockStreamProcessor(new ValidationError('Invalid plan request')); + + await expect(runChatAndCollectResults(payload)).rejects.toThrow(ValidationError); + }); + }); + + describe('reviewPlan', () => { + it('should handle plan approval via interrupt', async () => { + const mockPlan: WorkflowPlanNode[] = [ + { + nodeType: 'n8n-nodes-base.manualTrigger', + nodeName: 'Manual Trigger', + reasoning: 'Start the workflow manually', + }, + ]; + + const payload: ChatPayload = { + message: PLAN_APPROVAL_MESSAGE, + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + const testAgent = new WorkflowBuilderAgent(config); + + // Mock the agent with a pending interrupt + const mockCompiledAgent = { + stream: jest.fn().mockImplementation(async function* (input: unknown) { + // If it's a Command with resume, it means plan was approved + if (input instanceof Command && input.resume) { + yield [ + 'agent', + { + planStatus: 'approved', + messages: [new AIMessage('Plan approved, executing...')], + }, + ]; + } + }), + getState: jest.fn().mockResolvedValue({ + values: { + messages: [], + workflowPlan: { + intro: 'Test plan', + plan: mockPlan, + }, + }, + tasks: [ + { + interrupts: [ + { + value: { + plan: mockPlan, + message: 'Test plan', + }, + }, + ], + }, + ], + }), + updateState: jest.fn(), + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + jest.spyOn(testAgent as any, 'createWorkflow').mockReturnValue({ + compile: jest.fn().mockReturnValue(mockCompiledAgent), + }); + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Processing...', + }, + ], + } as StreamOutput); + + const generator = testAgent.chat(payload); + const results = []; + for await (const result of generator) { + results.push(result); + } + + // Verify that stream was called with a Command containing approval + expect(mockCompiledAgent.stream).toHaveBeenCalledWith( + expect.objectContaining({ + resume: { + action: 'approve', + feedback: undefined, + }, + }), + expect.any(Object), + ); + }); + + it('should handle plan rejection with feedback', async () => { + const mockPlan: WorkflowPlanNode[] = [ + { + nodeType: 'n8n-nodes-base.manualTrigger', + nodeName: 'Manual Trigger', + reasoning: 'Start the workflow manually', + }, + ]; + + const feedback = 'Please add error handling'; + const payload: ChatPayload = { + message: feedback, + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + const testAgent = new WorkflowBuilderAgent(config); + + // Mock the agent with a pending interrupt + const mockCompiledAgent = { + stream: jest.fn().mockImplementation(async function* (input: unknown) { + // If it's a Command with resume and feedback, it means plan was rejected + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + if (input instanceof Command && input.resume?.action === 'adjust') { + yield [ + 'adjustPlan', + { + planStatus: 'rejected', + planFeedback: feedback, + messages: [new HumanMessage(feedback)], + }, + ]; + } + }), + getState: jest.fn().mockResolvedValue({ + values: { + messages: [], + workflowPlan: { + intro: 'Test plan', + plan: mockPlan, + }, + }, + tasks: [ + { + interrupts: [ + { + value: { + plan: mockPlan, + message: 'Test plan', + }, + }, + ], + }, + ], + }), + updateState: jest.fn(), + }; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + jest.spyOn(testAgent as any, 'createWorkflow').mockReturnValue({ + compile: jest.fn().mockReturnValue(mockCompiledAgent), + }); + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Processing...', + }, + ], + } as StreamOutput); + + const generator = testAgent.chat(payload); + const results = []; + for await (const result of generator) { + results.push(result); + } + + // Verify that stream was called with a Command containing rejection and feedback + expect(mockCompiledAgent.stream).toHaveBeenCalledWith( + expect.objectContaining({ + resume: { + action: 'adjust', + feedback, + }, + }), + expect.any(Object), + ); + }); + }); + + describe('adjustPlan', () => { + it('should adjust plan based on user feedback', async () => { + const payload: ChatPayload = { + message: 'Add error handling', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Adjusting plan with error handling...', + }, + ], + } as StreamOutput); + + const results = await runChatAndCollectResults(payload); + + expect(results).toHaveLength(1); + expect(results[0].messages).toBeDefined(); + expect(results[0].messages[0]).toHaveProperty('text'); + }); + + it('should remove previous plan tool messages when adjusting', async () => { + const payload: ChatPayload = { + message: 'Use webhook instead', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Adjusting plan to use webhook...', + }, + ], + } as StreamOutput); + + const results = await runChatAndCollectResults(payload); + + expect(results).toHaveLength(1); + expect(results[0].messages).toBeDefined(); + }); + }); + + describe('Plan state routing', () => { + it('should route to createPlan when no plan exists', async () => { + const payload: ChatPayload = { + message: 'Build a workflow', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Creating plan...', + }, + ], + } as StreamOutput); + + const results = await runChatAndCollectResults(payload); + + expect(results).toHaveLength(1); + expect(results[0]).toHaveProperty('messages'); + }); + + it('should route to agent when plan is approved', async () => { + const payload: ChatPayload = { + message: 'Continue building', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + mockStreamProcessor({ + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Building workflow based on approved plan...', + }, + ], + } as StreamOutput); + + const results = await runChatAndCollectResults(payload); + + expect(results).toHaveLength(1); + expect(results[0]).toHaveProperty('messages'); + }); + }); + + describe('Interrupt handling', () => { + it('should properly handle interrupt for plan review', async () => { + // This test verifies that the interrupt mechanism is properly set up + // The actual interrupt is handled by LangGraph, we just verify the setup + const payload: ChatPayload = { + message: 'Create workflow to fetch data', + workflowContext: { + currentWorkflow: { nodes: [] }, + }, + }; + + // Mock the stream processor to simulate interrupt handling + mockCreateStreamProcessor.mockImplementation(() => { + return (async function* () { + yield { + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Creating plan for review...', + }, + ], + } as StreamOutput; + + yield { + messages: [ + { + role: 'assistant', + type: 'message', + text: 'Plan created, awaiting approval...', + }, + ], + } as StreamOutput; + })(); + }); + + const generator = agent.chat(payload); + const results = []; + for await (const result of generator) { + results.push(result); + } + + // Verify we got results from the stream + expect(results.length).toBeGreaterThan(1); + expect(results[0]).toHaveProperty('messages'); + }); + }); + }); }); diff --git a/packages/@n8n/ai-workflow-builder.ee/src/tools/prompts/main-agent.prompt.ts b/packages/@n8n/ai-workflow-builder.ee/src/tools/prompts/main-agent.prompt.ts index fb01294166..649e18168b 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/tools/prompts/main-agent.prompt.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/tools/prompts/main-agent.prompt.ts @@ -1,6 +1,7 @@ import { ChatPromptTemplate } from '@langchain/core/prompts'; -import { instanceUrlPrompt } from '@/chains/prompts/instance-url'; +import type { WorkflowPlan } from '../../agents/workflow-planner-agent'; +import { instanceUrlPrompt } from '../../chains/prompts/instance-url'; const systemPrompt = `You are an AI assistant specialized in creating and editing n8n workflows. Your goal is to help users build efficient, well-connected workflows by intelligently using the available tools. @@ -177,6 +178,35 @@ Common failures from relying on defaults: ALWAYS check node details obtained in Analysis Phase and configure accordingly. Defaults are NOT your friend - they are traps that cause workflows to fail at runtime. + +CRITICAL: Always include a Workflow Configuration node at the start of every workflow. + +The Workflow Configuration node (n8n-nodes-base.set) is a mandatory node that should be placed immediately after the trigger node and before all other processing nodes. +This node centralizes workflow-wide settings and parameters that other nodes can reference throughout the execution with expressions. + +Placement rules: +- ALWAYS add between trigger and first processing node +- Connect: Trigger → Workflow Configuration → First processing node +- This creates a single source of truth for workflow parameters + +Configuration approach: +- Include URLs, thresholds, string constants and any reusable values +- Other nodes reference these via expressions: {{ $('Workflow Configuration').first().json.variableName }} +- Add only parameters that are used by other nodes, DO NOT add unnecessary fields + +Workflow configuration node usage example: +1. Schedule Trigger → Workflow Configuration → HTTP Request → Process Data +2. Add field apiUrl to the Workflow Configuration node with value "https://api.example.com/data" +3. Reference in HTTP Request node: "{{ $('Workflow Configuration').first().json.apiUrl }}" instead of directly setting the URL + +IMPORTANT: +- Workflow Configuration node is not meant for credentials or sensitive data. +- Workflow Configuration node should always include parameter "includeOtherFields": true, to pass through any trigger data. +- Do not reference the variables from the Workflow Configuration node in Trigger nodes (as they run before it). + +Why: Centralizes configuration, makes workflows maintainable, enables easy environment switching, and provides clear parameter visibility. + + ALWAYS configure nodes after adding and connecting them. This is NOT optional. @@ -375,6 +405,34 @@ const previousConversationSummary = ` {previousSummary} `; +const workflowPlan = '{workflowPlan}'; + +export const planFormatter = (plan?: WorkflowPlan | null) => { + if (!plan) return 'EMPTY'; + + const nodesPlan = plan.plan.map((node) => { + return ` + + ${node.nodeType} + ${node.nodeName} + ${node.reasoning} + + `; + }); + + return ` + + + ${plan.intro} + + + + ${nodesPlan.join('\n')} + + + `; +}; + export const mainAgentPrompt = ChatPromptTemplate.fromMessages([ [ 'system', @@ -410,6 +468,11 @@ export const mainAgentPrompt = ChatPromptTemplate.fromMessages([ text: previousConversationSummary, cache_control: { type: 'ephemeral' }, }, + { + type: 'text', + text: workflowPlan, + cache_control: { type: 'ephemeral' }, + }, ], ], ['placeholder', '{messages}'], diff --git a/packages/@n8n/ai-workflow-builder.ee/src/types/langchain.ts b/packages/@n8n/ai-workflow-builder.ee/src/types/langchain.ts new file mode 100644 index 0000000000..e1dcf37466 --- /dev/null +++ b/packages/@n8n/ai-workflow-builder.ee/src/types/langchain.ts @@ -0,0 +1,5 @@ +import type { AIMessage, BaseMessage } from '@langchain/core/messages'; + +export function isAIMessage(msg: BaseMessage): msg is AIMessage { + return msg.getType() === 'ai'; +} diff --git a/packages/@n8n/ai-workflow-builder.ee/src/types/messages.ts b/packages/@n8n/ai-workflow-builder.ee/src/types/messages.ts index ce11b77bc0..92e6332ada 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/types/messages.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/types/messages.ts @@ -47,26 +47,11 @@ export interface AgentChatMessage { text: string; } -/** - * Prompt validation message - */ -export interface PromptValidationMessage { - role: 'assistant'; - type: 'prompt-validation'; - isWorkflowPrompt: boolean; - id: string; -} - /** * Union type for all possible message responses */ export type MessageResponse = - | (( - | AssistantChatMessage - | AssistantSummaryMessage - | AgentChatMessage - | PromptValidationMessage - ) & { + | ((AssistantChatMessage | AssistantSummaryMessage | AgentChatMessage) & { quickReplies?: QuickReplyOption[]; }) | EndSessionMessage; diff --git a/packages/@n8n/ai-workflow-builder.ee/src/types/streaming.ts b/packages/@n8n/ai-workflow-builder.ee/src/types/streaming.ts index c7a55d8f88..82b40eaa10 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/types/streaming.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/types/streaming.ts @@ -35,6 +35,15 @@ export interface ExecutionRequestChunk { reason: string; } +/** + * Plan chunk for streaming + */ +export interface PlanChunk { + role: 'assistant'; + type: 'plan'; + plan: unknown; +} + /** * Union type for all stream chunks */ @@ -42,7 +51,8 @@ export type StreamChunk = | AgentMessageChunk | ToolProgressChunk | WorkflowUpdateChunk - | ExecutionRequestChunk; + | ExecutionRequestChunk + | PlanChunk; /** * Stream output containing messages diff --git a/packages/@n8n/ai-workflow-builder.ee/src/utils/stream-processor.ts b/packages/@n8n/ai-workflow-builder.ee/src/utils/stream-processor.ts index 1483762617..cd011266e4 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/utils/stream-processor.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/utils/stream-processor.ts @@ -2,6 +2,7 @@ import { AIMessage, HumanMessage, ToolMessage } from '@langchain/core/messages'; import type { ToolCall } from '@langchain/core/messages/tool'; import type { DynamicStructuredTool } from '@langchain/core/tools'; +import type { WorkflowPlan } from '../agents/workflow-planner-agent'; import type { AgentMessageChunk, ToolProgressChunk, @@ -44,6 +45,24 @@ export function processStreamChunk(streamMode: string, chunk: unknown): StreamOu workflowJSON?: unknown; workflowOperations?: unknown; }; + create_plan?: { + workflowPlan?: unknown; + planStatus?: string; + messages?: Array<{ content: string | Array<{ type: string; text: string }> }>; + }; + review_plan?: { + planStatus?: string; + }; + adjust_plan?: { + workflowPlan?: unknown; + planStatus?: string; + }; + __interrupt__?: Array<{ + value: unknown; + resumable: boolean; + ns: string[]; + when: string; + }>; }; if ((agentChunk?.delete_messages?.messages ?? []).length > 0) { @@ -98,6 +117,40 @@ export function processStreamChunk(streamMode: string, chunk: unknown): StreamOu } } + // Handle plan creation + if (agentChunk?.create_plan?.workflowPlan) { + const workflowPlan = agentChunk.create_plan.workflowPlan as WorkflowPlan; + const planChunk = { + role: 'assistant' as const, + type: 'plan' as const, + plan: workflowPlan.plan, + message: workflowPlan.intro, + }; + return { messages: [planChunk] }; + } else if ((agentChunk?.create_plan?.messages ?? []).length > 0) { + // When planner didn't create a plan, but responded with a message + const lastMessage = + agentChunk.create_plan!.messages![agentChunk.create_plan!.messages!.length - 1]; + const messageChunk: AgentMessageChunk = { + role: 'assistant', + type: 'message', + text: lastMessage.content as string, + }; + + return { messages: [messageChunk] }; + } + + if (agentChunk?.adjust_plan?.workflowPlan) { + const workflowPlan = agentChunk.adjust_plan.workflowPlan as WorkflowPlan; + const planChunk = { + role: 'assistant' as const, + type: 'plan' as const, + plan: workflowPlan.plan, + message: workflowPlan.intro, + }; + return { messages: [planChunk] }; + } + // Handle process_operations updates - emit workflow update after operations are processed if (agentChunk?.process_operations) { // Check if operations were processed (indicated by cleared operations array) @@ -197,6 +250,16 @@ function createToolCallMessage( toolCall: ToolCall, builderTool?: BuilderTool, ): Record { + if (toolCall.name === 'generate_workflow_plan') { + const workflowPlan = toolCall.args as WorkflowPlan; + return { + role: 'assistant', + type: 'plan', + plan: workflowPlan.plan, + message: workflowPlan.intro, + }; + } + return { id: toolCall.id, toolCallId: toolCall.id, diff --git a/packages/@n8n/ai-workflow-builder.ee/src/utils/test/operations-processor.test.ts b/packages/@n8n/ai-workflow-builder.ee/src/utils/test/operations-processor.test.ts index 4feca4305f..238010e482 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/utils/test/operations-processor.test.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/utils/test/operations-processor.test.ts @@ -388,6 +388,9 @@ describe('operations-processor', () => { messages: [], workflowContext: {}, previousSummary: 'EMPTY', + workflowPlan: null, + planStatus: null, + planFeedback: null, }); it('should process operations and clear them', () => { diff --git a/packages/@n8n/ai-workflow-builder.ee/src/utils/test/tool-executor.test.ts b/packages/@n8n/ai-workflow-builder.ee/src/utils/test/tool-executor.test.ts index 3178db8824..8545febfbc 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/utils/test/tool-executor.test.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/utils/test/tool-executor.test.ts @@ -49,6 +49,9 @@ describe('tool-executor', () => { messages, workflowContext: {}, previousSummary: 'EMPTY', + planFeedback: null, + planStatus: null, + workflowPlan: null, }); // Helper to create mock tool diff --git a/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts b/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts index f55dbde796..fd46d0a297 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts @@ -1,9 +1,15 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import type { ToolMessage } from '@langchain/core/messages'; -import { AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages'; +import { ToolMessage, AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages'; import type { RunnableConfig } from '@langchain/core/runnables'; import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain'; -import { StateGraph, MemorySaver, END, GraphRecursionError } from '@langchain/langgraph'; +import { + StateGraph, + MemorySaver, + END, + GraphRecursionError, + Command, + interrupt, +} from '@langchain/langgraph'; import type { Logger } from '@n8n/backend-common'; import { ApplicationError, @@ -17,10 +23,12 @@ import { DEFAULT_AUTO_COMPACT_THRESHOLD_TOKENS, MAX_AI_BUILDER_PROMPT_LENGTH, MAX_INPUT_TOKENS, + PLAN_APPROVAL_MESSAGE, } from '@/constants'; import { createGetNodeParameterTool } from '@/tools/get-node-parameter.tool'; import { trimWorkflowJSON } from '@/utils/trim-workflow-context'; +import { type WorkflowPlanNode, createWorkflowPlannerAgent } from './agents/workflow-planner-agent'; import { conversationCompactChain } from './chains/conversation-compact'; import { workflowNameChain } from './chains/workflow-name'; import { LLMServiceError, ValidationError, WorkflowStateError } from './errors'; @@ -28,7 +36,7 @@ import { createAddNodeTool } from './tools/add-node.tool'; import { createConnectNodesTool } from './tools/connect-nodes.tool'; import { createNodeDetailsTool } from './tools/node-details.tool'; import { createNodeSearchTool } from './tools/node-search.tool'; -import { mainAgentPrompt } from './tools/prompts/main-agent.prompt'; +import { mainAgentPrompt, planFormatter } from './tools/prompts/main-agent.prompt'; import { createRemoveNodeTool } from './tools/remove-node.tool'; import { createUpdateNodeParametersTool } from './tools/update-node-parameters.tool'; import type { SimpleWorkflow } from './types/workflow'; @@ -121,6 +129,7 @@ export class WorkflowBuilderAgent { workflowJSON: trimWorkflowJSON(state.workflowJSON), executionData: state.workflowContext?.executionData ?? {}, executionSchema: state.workflowContext?.executionSchema ?? [], + workflowPlan: planFormatter(state.workflowPlan), instanceUrl: this.instanceUrl, }); @@ -156,8 +165,145 @@ export class WorkflowBuilderAgent { return tokensUsed > this.autoCompactThresholdTokens; }; + /** + * Creates a plan for the workflow based on user requirements + */ + const createPlan = async (state: typeof WorkflowState.State) => { + const { messages } = state; + const lastHumanMessage = messages.findLast((m) => m instanceof HumanMessage)!; + + if (typeof lastHumanMessage.content !== 'string') { + throw new ValidationError('Invalid message content for planning'); + } + + // Create the planner agent with tools + const plannerAgent = createWorkflowPlannerAgent(this.llmSimpleTask, this.parsedNodeTypes); + + // Generate the workflow plan + const plannerResult = await plannerAgent.plan(lastHumanMessage.content); + + // If we got a structured plan, return it + if ('plan' in plannerResult) { + const { plan, toolMessages } = plannerResult; + this.logger?.debug('Generated workflow plan: ' + JSON.stringify(plan, null, 2)); + + return { + workflowPlan: plan, + planStatus: 'pending' as const, + messages: toolMessages, + }; + } + + // If we didn't get a plan, just return the text response + this.logger?.debug('Planner returned text response: ' + plannerResult.text); + return { + messages: [ + new AIMessage({ + content: plannerResult.text, + }), + ], + }; + }; + + /** + * Reviews the plan with the user for approval + */ + const reviewPlan = async (state: typeof WorkflowState.State) => { + const { workflowPlan } = state; + + if (!workflowPlan) { + throw new ValidationError('No workflow plan to review'); + } + + // Use interrupt to pause and show the plan to the user + // The frontend will display the plan and wait for user action + const userResponse = interrupt< + { plan: WorkflowPlanNode[]; message: string }, + { action: 'approve' | 'adjust'; feedback?: string } + >({ + plan: workflowPlan.plan, + message: workflowPlan.intro, + }); + + // Process the user's response + if (userResponse.action === 'approve') { + // User approved the plan, mark as approved and continue + return { + planStatus: 'approved' as const, + }; + } else if (userResponse.action === 'adjust') { + // User wants adjustments, add feedback and mark for adjustment + return { + planStatus: 'rejected' as const, + planFeedback: userResponse.feedback ?? 'Please adjust the plan', + }; + } + + return {}; + }; + + /** + * Adjusts the plan based on user feedback + */ + const adjustPlan = async (state: typeof WorkflowState.State) => { + const { messages, planFeedback, workflowPlan } = state; + const lastHumanMessage = messages.findLast((m) => m instanceof HumanMessage)!; + + if (typeof lastHumanMessage.content !== 'string') { + throw new ValidationError('Invalid message content for plan adjustment'); + } + + // Create the planner agent with tools + const plannerAgent = createWorkflowPlannerAgent(this.llmSimpleTask, this.parsedNodeTypes); + + // Generate an adjusted plan with feedback + const adjustedPlan = await plannerAgent.plan( + lastHumanMessage.content, + workflowPlan ?? undefined, + planFeedback ?? undefined, + ); + + // If we get a text response instead of a plan, just return that + if ('text' in adjustedPlan) { + return { + messages: [ + new AIMessage({ + content: adjustedPlan.text, + }), + ], + }; + } + + // Remove previous plan tool messages to avoid confusion + const filteredMessages = messages.map((m) => { + if (m instanceof ToolMessage && m.name === 'generate_workflow_plan') { + return new RemoveMessage({ id: m.id! }); + } + + if (m instanceof AIMessage && m.tool_calls && m.tool_calls.length > 0) { + const hasPlanCall = m.tool_calls.find((tc) => tc.name === 'generate_workflow_plan'); + if (hasPlanCall) { + return new RemoveMessage({ id: m.id! }); + } + } + + return m; + }); + + const planAdjustmentMessage = new HumanMessage({ content: planFeedback ?? '' }); + + this.logger?.debug('Adjusted workflow plan: ' + JSON.stringify(adjustedPlan, null, 2)); + + return { + workflowPlan: adjustedPlan.plan, + messages: [...filteredMessages, planAdjustmentMessage, ...adjustedPlan.toolMessages], + planStatus: 'pending' as const, + planFeedback: null, + }; + }; + const shouldModifyState = (state: typeof WorkflowState.State) => { - const { messages, workflowContext } = state; + const { messages, workflowContext, planStatus } = state; const lastHumanMessage = messages.findLast((m) => m instanceof HumanMessage)!; // There always should be at least one human message in the array if (lastHumanMessage.content === '/compact') { @@ -178,6 +324,11 @@ export class WorkflowBuilderAgent { return 'auto_compact_messages'; } + // If we don't have a plan yet, and the workflow is empty, create a plan + if (!planStatus && workflowContext?.currentWorkflow?.nodes?.length === 0) { + return 'create_plan'; + } + return 'agent'; }; @@ -205,6 +356,9 @@ export class WorkflowBuilderAgent { connections: {}, name: '', }, + workflowPlan: null, + planStatus: null, + planFeedback: null, }; return stateUpdate; @@ -293,11 +447,24 @@ export class WorkflowBuilderAgent { .addNode('compact_messages', compactSession) .addNode('auto_compact_messages', compactSession) .addNode('create_workflow_name', createWorkflowName) + .addNode('create_plan', createPlan) + .addNode('review_plan', reviewPlan) + .addNode('adjust_plan', adjustPlan) .addConditionalEdges('__start__', shouldModifyState) + // .addEdge('create_plan', 'review_plan') + .addConditionalEdges('create_plan', (state) => { + // If a plan was created, move to review, otherwise back to planning + return state.workflowPlan ? 'review_plan' : END; + }) + .addConditionalEdges('review_plan', (state) => { + // Route based on the plan status after review + return state.planStatus === 'approved' ? 'agent' : 'adjust_plan'; + }) + .addEdge('adjust_plan', 'review_plan') .addEdge('tools', 'process_operations') .addEdge('process_operations', 'agent') .addEdge('auto_compact_messages', 'agent') - .addEdge('create_workflow_name', 'agent') + .addEdge('create_workflow_name', 'create_plan') .addEdge('delete_messages', END) .addEdge('compact_messages', END) .addConditionalEdges('agent', shouldContinue); @@ -338,7 +505,7 @@ export class WorkflowBuilderAgent { ); try { - const stream = await this.createAgentStream(payload, streamConfig, agent); + const stream = await this.createAgentStream(payload, streamConfig, agent, threadConfig); yield* this.processAgentStream(stream, agent, threadConfig); } catch (error: unknown) { this.handleStreamError(error); @@ -384,7 +551,33 @@ export class WorkflowBuilderAgent { payload: ChatPayload, streamConfig: RunnableConfig, agent: ReturnType['compile']>, + threadConfig: RunnableConfig, ) { + const currentState = await agent.getState(threadConfig); + + // Check if there are pending interrupts (e.g., plan review) + const interruptedTask = currentState.tasks.find( + (task) => task.interrupts && task.interrupts.length > 0, + ); + + if (interruptedTask) { + // We have a pending interrupt - likely a plan review + + // Check if the message is an approval message, right now we only check for exact match + // in the future we might want to use a LLM classifier for more flexibility + const action = payload.message.trim() === PLAN_APPROVAL_MESSAGE ? 'approve' : 'adjust'; + + // Resume with the appropriate command + const resumeCommand = new Command({ + resume: { + action, + feedback: action === 'adjust' ? payload.message : undefined, + }, + }); + + return await agent.stream(resumeCommand, streamConfig); + } + return await agent.stream( { messages: [new HumanMessage({ content: payload.message })], diff --git a/packages/@n8n/ai-workflow-builder.ee/src/workflow-state.ts b/packages/@n8n/ai-workflow-builder.ee/src/workflow-state.ts index 641a6a045c..8b65332cb2 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/workflow-state.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/workflow-state.ts @@ -2,6 +2,7 @@ import type { BaseMessage } from '@langchain/core/messages'; import { HumanMessage } from '@langchain/core/messages'; import { Annotation, messagesStateReducer } from '@langchain/langgraph'; +import type { WorkflowPlan } from './agents/workflow-planner-agent'; import type { SimpleWorkflow, WorkflowOperation } from './types/workflow'; import type { ChatPayload } from './workflow-builder-agent'; @@ -75,7 +76,21 @@ export const WorkflowState = Annotation.Root({ reducer: operationsReducer, default: () => [], }), - // Whether the user prompt is a workflow prompt. + // The planned workflow nodes + workflowPlan: Annotation({ + reducer: (x, y) => y ?? x, + default: () => null, + }), + // Status of the workflow plan + planStatus: Annotation<'pending' | 'approved' | 'rejected' | null>({ + reducer: (x, y) => y ?? x, + default: () => null, + }), + // User feedback on the plan + planFeedback: Annotation({ + reducer: (x, y) => y ?? x, + default: () => null, + }), // Latest workflow context workflowContext: Annotation({ reducer: (x, y) => y ?? x, diff --git a/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue b/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue index 728d1277fd..7c0f7837ac 100644 --- a/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue +++ b/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue @@ -24,6 +24,7 @@ interface Props { }; messages?: ChatUI.AssistantMessage[]; streaming?: boolean; + disabled?: boolean; loadingMessage?: string; sessionId?: string; title?: string; @@ -66,7 +67,9 @@ function normalizeMessages(messages: ChatUI.AssistantMessage[]): ChatUI.Assistan // filter out these messages so that tool collapsing works correctly function filterOutHiddenMessages(messages: ChatUI.AssistantMessage[]): ChatUI.AssistantMessage[] { - return messages.filter((message) => Boolean(getSupportedMessageComponent(message.type))); + return messages.filter( + (message) => Boolean(getSupportedMessageComponent(message.type)) || message.type === 'custom', + ); } function collapseToolMessages(messages: ChatUI.AssistantMessage[]): ChatUI.AssistantMessage[] { @@ -165,7 +168,7 @@ const sessionEnded = computed(() => { }); const sendDisabled = computed(() => { - return !textInputValue.value || props.streaming || sessionEnded.value; + return !textInputValue.value || props.streaming || sessionEnded.value || props.disabled; }); const showPlaceholder = computed(() => { @@ -226,6 +229,13 @@ watch( }, { immediate: true, deep: true }, ); + +// Expose focusInput method to parent components +defineExpose({ + focusInput: () => { + chatInput.value?.focus(); + }, +});