From 4ca9826d7e51092999e36854f299b4caab1c6ed9 Mon Sep 17 00:00:00 2001 From: Eugene Date: Mon, 7 Jul 2025 18:35:13 +0200 Subject: [PATCH] chore(core): Add first-shot evaluation for the T2WF (no-changelog) (#16996) --- .../src/ai-workflow-builder.service.ts | 41 +--- .../@n8n/ai-workflow-builder/src/index.ts | 1 - .../ai-workflow-builder/src/interfaces.ts | 4 - packages/cli/src/commands/ttwf/generate.ts | 227 ++++++++++++++++++ packages/cli/src/commands/ttwf/worker-pool.ts | 47 ++++ .../services/ai-workflow-builder.service.ts | 28 ++- pnpm-lock.yaml | 14 +- pnpm-workspace.yaml | 2 +- 8 files changed, 314 insertions(+), 50 deletions(-) delete mode 100644 packages/@n8n/ai-workflow-builder/src/interfaces.ts create mode 100644 packages/cli/src/commands/ttwf/generate.ts create mode 100644 packages/cli/src/commands/ttwf/worker-pool.ts diff --git a/packages/@n8n/ai-workflow-builder/src/ai-workflow-builder.service.ts b/packages/@n8n/ai-workflow-builder/src/ai-workflow-builder.service.ts index c1686ab5e1..0577ccf14c 100644 --- a/packages/@n8n/ai-workflow-builder/src/ai-workflow-builder.service.ts +++ b/packages/@n8n/ai-workflow-builder/src/ai-workflow-builder.service.ts @@ -2,7 +2,6 @@ import { dispatchCustomEvent } from '@langchain/core/callbacks/dispatch'; import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; import type { RunnableConfig } from '@langchain/core/runnables'; import { StateGraph, END, START } from '@langchain/langgraph'; -import { GlobalConfig } from '@n8n/config'; import { Service } from '@n8n/di'; import { AiAssistantClient } from '@n8n_io/ai-assistant-sdk'; import { OperationalError, assert, INodeTypes } from 'n8n-workflow'; @@ -13,7 +12,6 @@ import { nodesSelectionChain } from './chains/node-selector'; import { nodesComposerChain } from './chains/nodes-composer'; import { plannerChain } from './chains/planner'; import { validatorChain } from './chains/validator'; -import { ILicenseService } from './interfaces'; import { anthropicClaude37Sonnet, gpt41mini } from './llm-config'; import type { MessageResponse } from './types'; import { WorkflowState } from './workflow-state'; @@ -26,42 +24,26 @@ export class AiWorkflowBuilderService { private llmComplexTask: BaseChatModel | undefined; - private client: AiAssistantClient | undefined; - constructor( - private readonly licenseService: ILicenseService, private readonly nodeTypes: INodeTypes, - private readonly globalConfig: GlobalConfig, - private readonly n8nVersion: string, + private readonly client?: AiAssistantClient, ) { this.parsedNodeTypes = this.getNodeTypes(); } - private async setupModels(user: IUser) { + private async setupModels(user?: IUser) { if (this.llmSimpleTask && this.llmComplexTask) { return; } - const baseUrl = this.globalConfig.aiAssistant.baseUrl; - // If base URL is set, use api-proxy to access LLMs - if (baseUrl) { - if (!this.client) { - const licenseCert = await this.licenseService.loadCertStr(); - const consumerId = this.licenseService.getConsumerId(); - - this.client = new AiAssistantClient({ - licenseCert, - consumerId, - baseUrl, - n8nVersion: this.n8nVersion, - }); - } - - assert(this.client, 'Client not setup'); - + // If client is provided, use it for API proxy + if (this.client && user) { const authHeaders = await this.client.generateApiProxyCredentials(user); + // Extract baseUrl from client configuration + const baseUrl = this.client.getApiProxyBaseUrl(); + this.llmSimpleTask = await gpt41mini({ - baseUrl: baseUrl + '/v1/api-proxy/openai', + baseUrl: baseUrl + '/openai', // When using api-proxy the key will be populated automatically, we just need to pass a placeholder apiKey: '-', headers: { @@ -69,7 +51,7 @@ export class AiWorkflowBuilderService { }, }); this.llmComplexTask = await anthropicClaude37Sonnet({ - baseUrl: baseUrl + '/v1/api-proxy/anthropic', + baseUrl: baseUrl + '/anthropic', apiKey: '-', headers: { Authorization: authHeaders.apiKey, @@ -77,7 +59,8 @@ export class AiWorkflowBuilderService { }); return; } - // If base URL is not set, use environment variables + + // If no client provided, use environment variables this.llmSimpleTask = await gpt41mini({ apiKey: process.env.N8N_AI_OPENAI_API_KEY ?? '', }); @@ -349,7 +332,7 @@ export class AiWorkflowBuilderService { return workflowGraph; } - async *chat(payload: { question: string }, user: IUser) { + async *chat(payload: { question: string }, user?: IUser) { if (!this.llmComplexTask || !this.llmSimpleTask) { await this.setupModels(user); } diff --git a/packages/@n8n/ai-workflow-builder/src/index.ts b/packages/@n8n/ai-workflow-builder/src/index.ts index 898f70df9d..e542f3459e 100644 --- a/packages/@n8n/ai-workflow-builder/src/index.ts +++ b/packages/@n8n/ai-workflow-builder/src/index.ts @@ -1,4 +1,3 @@ export * from './ai-workflow-builder.service'; export * from './types'; export * from './workflow-state'; -export * from './interfaces'; diff --git a/packages/@n8n/ai-workflow-builder/src/interfaces.ts b/packages/@n8n/ai-workflow-builder/src/interfaces.ts deleted file mode 100644 index 90e1a7f770..0000000000 --- a/packages/@n8n/ai-workflow-builder/src/interfaces.ts +++ /dev/null @@ -1,4 +0,0 @@ -export interface ILicenseService { - loadCertStr(): Promise; - getConsumerId(): string; -} diff --git a/packages/cli/src/commands/ttwf/generate.ts b/packages/cli/src/commands/ttwf/generate.ts new file mode 100644 index 0000000000..d46cdb7b1c --- /dev/null +++ b/packages/cli/src/commands/ttwf/generate.ts @@ -0,0 +1,227 @@ +import { AiWorkflowBuilderService } from '@n8n/ai-workflow-builder'; +import { Container } from '@n8n/di'; +import { Command } from '@n8n/decorators'; + +import fs from 'fs'; +import { jsonParse, UserError } from 'n8n-workflow'; +import { z } from 'zod'; + +import { NodeTypes } from '@/node-types'; + +import { WorkerPool } from './worker-pool'; +import { BaseCommand } from '../base-command'; + +interface WorkflowGeneratedMessage { + role: 'assistant'; + type: 'workflow-generated'; + codeSnippet: string; +} + +interface WorkflowGenerationDatasetItem { + prompt: string; + referenceWorkflow: string; +} + +async function waitForWorkflowGenerated(aiResponse: AsyncGenerator<{ messages: any[] }>) { + let workflowJson: string | undefined; + + for await (const chunk of aiResponse) { + const wfGeneratedMessage = chunk.messages.find( + (m): m is WorkflowGeneratedMessage => + 'type' in m && (m as { type?: string }).type === 'workflow-generated', + ); + + if (wfGeneratedMessage?.codeSnippet) { + workflowJson = wfGeneratedMessage.codeSnippet; + } + } + + if (!workflowJson) { + // FIXME: Use proper error class + throw new UserError('No workflow generated message found in AI response'); + } + + return workflowJson; +} + +const flagsSchema = z.object({ + prompt: z + .string() + .alias('p') + .describe('Prompt to generate a workflow from. Mutually exclusive with --input.') + .optional(), + input: z + .string() + .alias('i') + .describe('Input dataset file name. Mutually exclusive with --prompt.') + .optional(), + output: z + .string() + .alias('o') + .describe('Output file name to save the results. Default is ttwf-results.jsonl') + .default('ttwf-results.jsonl'), + limit: z + .number() + .int() + .alias('l') + .describe('Number of items from the dataset to process. Only valid with --input.') + .default(-1), + concurrency: z + .number() + .int() + .alias('c') + .describe('Number of items to process in parallel. Only valid with --input.') + .default(1), +}); + +@Command({ + name: 'ttwf:generate', + description: 'Create a workflow(s) using AI Text-to-Workflow builder', + examples: [ + '$ n8n ttwf:generate --prompt "Create a telegram chatbot that can tell current weather in Berlin" --output result.json', + '$ n8n ttwf:generate --input dataset.jsonl --output results.jsonl', + ], + flagsSchema, +}) +export class TTWFGenerateCommand extends BaseCommand> { + /** + * Reads the dataset file in JSONL format + */ + private async readDataset(filePath: string): Promise { + try { + const data = await fs.promises.readFile(filePath, { encoding: 'utf-8' }); + + const lines = data.split('\n').filter((line) => line.trim() !== ''); + + if (lines.length === 0) { + throw new UserError('Dataset file is empty or contains no valid lines'); + } + + return lines.map((line, index) => { + try { + return jsonParse(line); + } catch (error) { + throw new UserError(`Invalid JSON line on index: ${index}`); + } + }); + } catch (error) { + throw new UserError(`Failed to read dataset file: ${error}`); + } + } + + async run() { + const { flags } = this; + + if (!flags.input && !flags.prompt) { + throw new UserError('Either --input or --prompt must be provided.'); + } + + if (flags.input && flags.prompt) { + throw new UserError('You cannot use --input and --prompt together. Use one or the other.'); + } + + const nodeTypes = Container.get(NodeTypes); + const wfBuilder = new AiWorkflowBuilderService(nodeTypes); + + if (flags.prompt) { + // Single prompt mode + if (flags.output && fs.existsSync(flags.output)) { + if (fs.lstatSync(flags.output).isDirectory()) { + this.logger.info('The parameter --output must be a writeable file'); + return; + } + + this.logger.warn('The output file already exists. It will be overwritten.'); + fs.unlinkSync(flags.output); + } + + try { + this.logger.info(`Processing prompt: ${flags.prompt}`); + + const aiResponse = wfBuilder.chat({ question: flags.prompt }); + + const generatedWorkflow = await waitForWorkflowGenerated(aiResponse); + + this.logger.info(`Generated workflow for prompt: ${flags.prompt}`); + + if (flags.output) { + fs.writeFileSync(flags.output, generatedWorkflow); + this.logger.info(`Workflow saved to ${flags.output}`); + } else { + this.logger.info('Generated Workflow:'); + // Pretty print JSON + this.logger.info(JSON.stringify(JSON.parse(generatedWorkflow), null, 2)); + } + } catch (e) { + const errorMessage = e instanceof Error ? e.message : 'An error occurred'; + this.logger.error(`Error processing prompt "${flags.prompt}": ${errorMessage}`); + } + } else if (flags.input) { + // Batch mode + const output = flags.output ?? 'ttwf-results.jsonl'; + if (fs.existsSync(output)) { + if (fs.lstatSync(output).isDirectory()) { + this.logger.info('The parameter --output must be a writeable file'); + return; + } + + this.logger.warn('The output file already exists. It will be overwritten.'); + fs.unlinkSync(output); + } + + const pool = new WorkerPool(flags.concurrency ?? 1); + + const dataset = await this.readDataset(flags.input); + + // Open file for writing results + const outputStream = fs.createWriteStream(output, { flags: 'a' }); + + const datasetWithLimit = (flags.limit ?? -1) > 0 ? dataset.slice(0, flags.limit) : dataset; + + await Promise.allSettled( + datasetWithLimit.map(async (item) => { + try { + const generatedWorkflow = await pool.execute(async () => { + this.logger.info(`Processing prompt: ${item.prompt}`); + + const aiResponse = wfBuilder.chat({ question: item.prompt }); + + return await waitForWorkflowGenerated(aiResponse); + }); + + this.logger.info(`Generated workflow for prompt: ${item.prompt}`); + + // Write the generated workflow to the output file + outputStream.write( + JSON.stringify({ + prompt: item.prompt, + generatedWorkflow, + referenceWorkflow: item.referenceWorkflow, + }) + '\n', + ); + } catch (e) { + const errorMessage = e instanceof Error ? e.message : 'An error occurred'; + this.logger.error(`Error processing prompt "${item.prompt}": ${errorMessage}`); + // Optionally write the error to the output file + outputStream.write( + JSON.stringify({ + prompt: item.prompt, + referenceWorkflow: item.referenceWorkflow, + errorMessage, + }) + '\n', + ); + } + }), + ); + + outputStream.end(); + } + } + + async catch(error: Error) { + this.logger.error('\nGOT ERROR'); + this.logger.error('===================================='); + this.logger.error(error.message); + this.logger.error(error.stack!); + } +} diff --git a/packages/cli/src/commands/ttwf/worker-pool.ts b/packages/cli/src/commands/ttwf/worker-pool.ts new file mode 100644 index 0000000000..2f634bea9a --- /dev/null +++ b/packages/cli/src/commands/ttwf/worker-pool.ts @@ -0,0 +1,47 @@ +export class WorkerPool { + private queue: Array<() => Promise> = []; + + private activeWorkers = 0; + + constructor(private maxWorkers: number) {} + + async execute(task: () => Promise): Promise { + // If under limit, execute immediately + if (this.activeWorkers < this.maxWorkers) { + this.activeWorkers++; + try { + const result = await task(); + this.activeWorkers--; + this.processQueue(); + + return result; + } catch (error) { + this.activeWorkers--; + this.processQueue(); + + throw error; + } + } + + // Otherwise queue the task + return await new Promise((resolve, reject) => { + this.queue.push(async () => { + try { + const result = await task(); + resolve(result); + return result; + } catch (error) { + reject(error); + throw error; + } + }); + }); + } + + private processQueue() { + if (this.queue.length > 0 && this.activeWorkers < this.maxWorkers) { + const task = this.queue.shift()!; + void this.execute(task); + } + } +} diff --git a/packages/cli/src/services/ai-workflow-builder.service.ts b/packages/cli/src/services/ai-workflow-builder.service.ts index 145f19b2ac..1f044797bb 100644 --- a/packages/cli/src/services/ai-workflow-builder.service.ts +++ b/packages/cli/src/services/ai-workflow-builder.service.ts @@ -1,6 +1,7 @@ import { AiWorkflowBuilderService } from '@n8n/ai-workflow-builder'; import { GlobalConfig } from '@n8n/config'; import { Service } from '@n8n/di'; +import { AiAssistantClient } from '@n8n_io/ai-assistant-sdk'; import type { IUser } from 'n8n-workflow'; import { N8N_VERSION } from '@/constants'; @@ -21,20 +22,31 @@ export class WorkflowBuilderService { private readonly config: GlobalConfig, ) {} - private getService(): AiWorkflowBuilderService { + private async getService(): Promise { if (!this.service) { - this.service = new AiWorkflowBuilderService( - this.license, - this.nodeTypes, - this.config, - N8N_VERSION, - ); + let client: AiAssistantClient | undefined; + + // Create AiAssistantClient if baseUrl is configured + const baseUrl = this.config.aiAssistant.baseUrl; + if (baseUrl) { + const licenseCert = await this.license.loadCertStr(); + const consumerId = this.license.getConsumerId(); + + client = new AiAssistantClient({ + licenseCert, + consumerId, + baseUrl, + n8nVersion: N8N_VERSION, + }); + } + + this.service = new AiWorkflowBuilderService(this.nodeTypes, client); } return this.service; } async *chat(payload: { question: string }, user: IUser) { - const service = this.getService(); + const service = await this.getService(); yield* service.chat(payload, user); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a4f1d61a38..72c487ce72 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -22,8 +22,8 @@ catalogs: specifier: 0.3.20-12 version: 0.3.20-12 '@n8n_io/ai-assistant-sdk': - specifier: 1.14.1 - version: 1.14.1 + specifier: 1.15.0 + version: 1.15.0 '@sentry/node': specifier: 8.52.1 version: 8.52.1 @@ -401,7 +401,7 @@ importers: version: link:../di '@n8n_io/ai-assistant-sdk': specifier: 'catalog:' - version: 1.14.1 + version: 1.15.0 n8n-workflow: specifier: workspace:* version: link:../../workflow @@ -1302,7 +1302,7 @@ importers: version: 0.3.20-12(@sentry/node@8.52.1)(ioredis@5.3.2)(mssql@10.0.2)(mysql2@3.11.0)(pg@8.12.0)(redis@4.6.14)(sqlite3@5.1.7)(ts-node@10.9.2(@types/node@20.19.1)(typescript@5.8.3)) '@n8n_io/ai-assistant-sdk': specifier: 'catalog:' - version: 1.14.1 + version: 1.15.0 '@n8n_io/license-sdk': specifier: 2.22.0 version: 2.22.0 @@ -5508,8 +5508,8 @@ packages: engines: {node: '>=18.10', pnpm: '>=9.6'} hasBin: true - '@n8n_io/ai-assistant-sdk@1.14.1': - resolution: {integrity: sha512-I2WXfNnDltrSqaMTXFJUZKq/uff6wuHBhFv0oiCyi0NK+CNwFkU1FCcmPWLQrQlj9llda4urwv5MuXygH0zUVw==} + '@n8n_io/ai-assistant-sdk@1.15.0': + resolution: {integrity: sha512-M/bNnxyVGxwLGU/mzQrZOkZK4NkR9x8cUMZHfVJlv1z6YTlHX56BYH+0jSlb2c15DEwPkku9l0RFVLTTt0ExQQ==} engines: {node: '>=20.15', pnpm: '>=8.14'} '@n8n_io/license-sdk@2.22.0': @@ -19071,7 +19071,7 @@ snapshots: acorn: 8.12.1 acorn-walk: 8.3.4 - '@n8n_io/ai-assistant-sdk@1.14.1': {} + '@n8n_io/ai-assistant-sdk@1.15.0': {} '@n8n_io/license-sdk@2.22.0': dependencies: diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 32410291e5..6cd9ed49a8 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -8,7 +8,7 @@ packages: catalog: '@n8n/typeorm': 0.3.20-12 - '@n8n_io/ai-assistant-sdk': 1.14.1 + '@n8n_io/ai-assistant-sdk': 1.15.0 '@langchain/core': 0.3.61 '@langchain/openai': 0.5.16 '@langchain/anthropic': 0.3.23