From ce98f7c175d1875e84bcfa0681bda6035e386dc6 Mon Sep 17 00:00:00 2001 From: oleg Date: Mon, 4 Aug 2025 09:55:07 +0200 Subject: [PATCH] feat: Abort AI builder requests on chat stop (#17854) --- .../src/ai-workflow-builder-agent.service.ts | 4 +- .../src/workflow-builder-agent.ts | 107 ++++----- .../__tests__/ai.controller.test.ts | 153 +++++++++++++ packages/cli/src/controllers/ai.controller.ts | 11 + .../services/ai-workflow-builder.service.ts | 4 +- .../AskAssistantChat/AskAssistantChat.vue | 24 ++- .../AskAssistantChat.test.ts.snap | 42 +++- .../CanvasThinkingPill/CanvasThinkingPill.vue | 26 ++- .../CanvasThinkingPill.test.ts.snap | 3 +- .../frontend/@n8n/i18n/src/locales/en.json | 1 + .../@n8n/rest-api-client/src/utils.ts | 2 + packages/frontend/editor-ui/src/api/ai.ts | 3 + .../AskAssistant/Agent/AskAssistantBuild.vue | 2 + .../nodes/render-types/CanvasNodeAIPrompt.vue | 6 +- .../src/composables/useBuilderMessages.ts | 11 + .../src/stores/builder.store.test.ts | 204 ++++++++++++++++++ .../editor-ui/src/stores/builder.store.ts | 23 ++ .../frontend/editor-ui/src/views/NodeView.vue | 7 +- pnpm-lock.yaml | 43 ++-- 19 files changed, 585 insertions(+), 91 deletions(-) diff --git a/packages/@n8n/ai-workflow-builder.ee/src/ai-workflow-builder-agent.service.ts b/packages/@n8n/ai-workflow-builder.ee/src/ai-workflow-builder-agent.service.ts index 9cee0dad57..8c88881aed 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/ai-workflow-builder-agent.service.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/ai-workflow-builder-agent.service.ts @@ -167,10 +167,10 @@ export class AiWorkflowBuilderService { return this.agent; } - async *chat(payload: ChatPayload, user?: IUser) { + async *chat(payload: ChatPayload, user?: IUser, abortSignal?: AbortSignal) { const agent = await this.getAgent(user); - for await (const output of agent.chat(payload, user?.id?.toString())) { + for await (const output of agent.chat(payload, user?.id?.toString(), abortSignal)) { yield output; } } diff --git a/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts b/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts index a0f9172f54..20cd539fde 100644 --- a/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts +++ b/packages/@n8n/ai-workflow-builder.ee/src/workflow-builder-agent.ts @@ -1,6 +1,7 @@ import type { BaseChatModel } from '@langchain/core/language_models/chat_models'; -import type { AIMessage, ToolMessage } from '@langchain/core/messages'; -import { HumanMessage, RemoveMessage } from '@langchain/core/messages'; +import type { ToolMessage } from '@langchain/core/messages'; +import { AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages'; +import type { RunnableConfig } from '@langchain/core/runnables'; import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain'; import { StateGraph, MemorySaver, END } from '@langchain/langgraph'; import type { Logger } from '@n8n/backend-common'; @@ -180,71 +181,73 @@ export class WorkflowBuilderAgent { : crypto.randomUUID(); } - async *chat(payload: ChatPayload, userId?: string) { + private getDefaultWorkflowJSON(payload: ChatPayload): SimpleWorkflow { + return ( + (payload.workflowContext?.currentWorkflow as SimpleWorkflow) ?? { + nodes: [], + connections: {}, + } + ); + } + + async *chat(payload: ChatPayload, userId?: string, abortSignal?: AbortSignal) { const agent = this.createWorkflow().compile({ checkpointer: this.checkpointer }); const workflowId = payload.workflowContext?.currentWorkflow?.id; // Generate thread ID from workflowId and userId // This ensures one session per workflow per user const threadId = WorkflowBuilderAgent.generateThreadId(workflowId, userId); - - // Configure thread for checkpointing - const threadConfig = { + const threadConfig: RunnableConfig = { configurable: { thread_id: threadId, }, }; + const streamConfig = { + ...threadConfig, + streamMode: ['updates', 'custom'], + recursionLimit: 30, + signal: abortSignal, + callbacks: this.tracer ? [this.tracer] : undefined, + } as RunnableConfig; - // Check if this is a subsequent message - // If so, update the workflowJSON with the current editor state - const existingCheckpoint = await this.checkpointer.getTuple(threadConfig); - - let stream; - - if (!existingCheckpoint?.checkpoint) { - // First message - use initial state - const initialState: typeof WorkflowState.State = { + const stream = await agent.stream( + { messages: [new HumanMessage({ content: payload.message })], - workflowJSON: (payload.workflowContext?.currentWorkflow as SimpleWorkflow) ?? { - nodes: [], - connections: {}, - }, + workflowJSON: this.getDefaultWorkflowJSON(payload), workflowOperations: [], workflowContext: payload.workflowContext, - }; + }, + streamConfig, + ); - stream = await agent.stream(initialState, { - ...threadConfig, - streamMode: ['updates', 'custom'], - recursionLimit: 30, - callbacks: this.tracer ? [this.tracer] : undefined, - }); - } else { - // Subsequent message - update the state with current workflow - const stateUpdate: Partial = { - messages: [new HumanMessage({ content: payload.message })], - workflowOperations: [], // Clear any pending operations from previous message - workflowContext: payload.workflowContext, - workflowJSON: { nodes: [], connections: {} }, // Default to empty workflow - }; - - if (payload.workflowContext?.currentWorkflow) { - stateUpdate.workflowJSON = payload.workflowContext?.currentWorkflow as SimpleWorkflow; + try { + const streamProcessor = createStreamProcessor(stream); + for await (const output of streamProcessor) { + yield output; } + } catch (error) { + if ( + error && + typeof error === 'object' && + 'message' in error && + typeof error.message === 'string' && + // This is naive, but it's all we get from LangGraph AbortError + ['Abort', 'Aborted'].includes(error.message) + ) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const messages = (await agent.getState(threadConfig)).values.messages as Array< + AIMessage | HumanMessage | ToolMessage + >; - // Stream with just the new message - stream = await agent.stream(stateUpdate, { - ...threadConfig, - streamMode: ['updates', 'custom'], - recursionLimit: 80, - callbacks: this.tracer ? [this.tracer] : undefined, - }); - } - - // Use the stream processor utility to handle chunk processing - const streamProcessor = createStreamProcessor(stream); - - for await (const output of streamProcessor) { - yield output; + // Handle abort errors gracefully + const abortedAiMessage = new AIMessage({ + content: '[Task aborted]', + id: crypto.randomUUID(), + }); + // TODO: Should we clear tool calls that are in progress? + await agent.updateState(threadConfig, { messages: [...messages, abortedAiMessage] }); + return; + } + throw error; } } @@ -256,7 +259,7 @@ export class WorkflowBuilderAgent { if (workflowId) { const threadId = WorkflowBuilderAgent.generateThreadId(workflowId, userId); - const threadConfig = { + const threadConfig: RunnableConfig = { configurable: { thread_id: threadId, }, diff --git a/packages/cli/src/controllers/__tests__/ai.controller.test.ts b/packages/cli/src/controllers/__tests__/ai.controller.test.ts index 200f5ddd86..e931b35e44 100644 --- a/packages/cli/src/controllers/__tests__/ai.controller.test.ts +++ b/packages/cli/src/controllers/__tests__/ai.controller.test.ts @@ -152,6 +152,7 @@ describe('AiController', () => { }, }, request.user, + expect.any(AbortSignal), ); expect(response.header).toHaveBeenCalledWith('Content-type', 'application/json-lines'); expect(response.flush).toHaveBeenCalled(); @@ -241,5 +242,157 @@ describe('AiController', () => { expect(response.json).not.toHaveBeenCalled(); expect(response.end).toHaveBeenCalled(); }); + + describe('Abort handling', () => { + it('should create AbortController and handle connection close', async () => { + let abortHandler: (() => void) | undefined; + let abortSignalPassed: AbortSignal | undefined; + + // Mock response.on to capture the close handler + response.on.mockImplementation((event: string, handler: () => void) => { + if (event === 'close') { + abortHandler = handler; + } + return response; + }); + + // Create a generator that yields once then checks for abort + async function* testGenerator() { + yield { + messages: [{ role: 'assistant', type: 'message', text: 'Processing...' } as const], + }; + // Check if aborted and throw if so + if (abortSignalPassed?.aborted) { + throw new Error('Aborted'); + } + } + + workflowBuilderService.chat.mockImplementation((_payload, _user, signal) => { + abortSignalPassed = signal; + return testGenerator(); + }); + + // Start the request (but don't await it) + const buildPromise = controller.build(request, response, payload); + + // Wait a bit to ensure the generator is created and starts processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Verify abort signal was passed to the service + expect(abortSignalPassed).toBeDefined(); + expect(abortSignalPassed).toBeInstanceOf(AbortSignal); + expect(abortSignalPassed?.aborted).toBe(false); + + // Verify close handler was registered + expect(response.on).toHaveBeenCalledWith('close', expect.any(Function)); + expect(abortHandler).toBeDefined(); + + // Simulate connection close + abortHandler!(); + + // Verify the signal was aborted + expect(abortSignalPassed?.aborted).toBe(true); + + // Wait for the promise to settle + await buildPromise.catch(() => { + // Expected to throw due to abort + }); + + // Verify response was ended + expect(response.end).toHaveBeenCalled(); + }); + + it('should pass abort signal to workflow builder service', async () => { + let capturedSignal: AbortSignal | undefined; + + async function* mockGenerator() { + yield { messages: [{ role: 'assistant', type: 'message', text: 'Test' } as const] }; + } + + workflowBuilderService.chat.mockImplementation((_payload, _user, signal) => { + capturedSignal = signal; + return mockGenerator(); + }); + + await controller.build(request, response, payload); + + expect(capturedSignal).toBeDefined(); + expect(capturedSignal).toBeInstanceOf(AbortSignal); + expect(workflowBuilderService.chat).toHaveBeenCalledWith( + expect.any(Object), + request.user, + capturedSignal, + ); + }); + + it('should handle stream interruption when connection closes', async () => { + let abortHandler: (() => void) | undefined; + let abortSignalPassed: AbortSignal | undefined; + + response.on.mockImplementation((event: string, handler: () => void) => { + if (event === 'close') { + abortHandler = handler; + } + return response; + }); + + // Create a generator that yields multiple chunks + async function* mockChatGenerator() { + yield { messages: [{ role: 'assistant', type: 'message', text: 'Chunk 1' } as const] }; + + // Check if aborted before yielding next chunk + if (abortSignalPassed?.aborted) { + throw new Error('Aborted'); + } + + // This second chunk should not be reached if aborted + yield { messages: [{ role: 'assistant', type: 'message', text: 'Chunk 2' } as const] }; + } + + workflowBuilderService.chat.mockImplementation((_payload, _user, signal) => { + abortSignalPassed = signal; + return mockChatGenerator(); + }); + + // Start the build process + const buildPromise = controller.build(request, response, payload); + + // Wait for first chunk to be written + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Should have written at least one chunk + expect(response.write).toHaveBeenCalled(); + const writeCallsBeforeAbort = response.write.mock.calls.length; + + // Simulate connection close + abortHandler!(); + + // Wait for the build to complete + await buildPromise.catch(() => { + // Expected to catch abort error + }); + + // Should not have written additional chunks after abort + expect(response.write).toHaveBeenCalledTimes(writeCallsBeforeAbort); + expect(response.end).toHaveBeenCalled(); + }); + + it('should cleanup abort listener on successful completion', async () => { + const onSpy = jest.spyOn(response, 'on'); + const offSpy = jest.spyOn(response, 'off'); + + async function* mockGenerator() { + yield { messages: [{ role: 'assistant', type: 'message', text: 'Complete' } as const] }; + } + + workflowBuilderService.chat.mockReturnValue(mockGenerator()); + + await controller.build(request, response, payload); + + // Verify close handler was registered and then removed + expect(onSpy).toHaveBeenCalledWith('close', expect.any(Function)); + expect(offSpy).toHaveBeenCalledWith('close', expect.any(Function)); + }); + }); }); }); diff --git a/packages/cli/src/controllers/ai.controller.ts b/packages/cli/src/controllers/ai.controller.ts index d4fcf5abb2..1496db5865 100644 --- a/packages/cli/src/controllers/ai.controller.ts +++ b/packages/cli/src/controllers/ai.controller.ts @@ -46,6 +46,13 @@ export class AiController { @Body payload: AiBuilderChatRequestDto, ) { try { + const abortController = new AbortController(); + const { signal } = abortController; + + const handleClose = () => abortController.abort(); + + res.on('close', handleClose); + const { text, workflowContext } = payload.payload; const aiResponse = this.workflowBuilderService.chat( { @@ -57,6 +64,7 @@ export class AiController { }, }, req.user, + signal, ); res.header('Content-type', 'application/json-lines').flush(); @@ -83,6 +91,9 @@ export class AiController { ], }; res.write(JSON.stringify(errorChunk) + '⧉⇋⇋➽⌑⧉§§\n'); + } finally { + // Clean up event listener + res.off('close', handleClose); } res.end(); diff --git a/packages/cli/src/services/ai-workflow-builder.service.ts b/packages/cli/src/services/ai-workflow-builder.service.ts index 5728a9536f..892d1afe9a 100644 --- a/packages/cli/src/services/ai-workflow-builder.service.ts +++ b/packages/cli/src/services/ai-workflow-builder.service.ts @@ -48,9 +48,9 @@ export class WorkflowBuilderService { return this.service; } - async *chat(payload: ChatPayload, user: IUser) { + async *chat(payload: ChatPayload, user: IUser, abortSignal?: AbortSignal) { const service = await this.getService(); - yield* service.chat(payload, user); + yield* service.chat(payload, user, abortSignal); } async getSessions(workflowId: string | undefined, user: IUser) { diff --git a/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue b/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue index 2e644fbf8b..d7c34063b5 100644 --- a/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue +++ b/packages/frontend/@n8n/design-system/src/components/AskAssistantChat/AskAssistantChat.vue @@ -10,7 +10,6 @@ import AssistantText from '../AskAssistantText/AssistantText.vue'; import InlineAskAssistantButton from '../InlineAskAssistantButton/InlineAskAssistantButton.vue'; import N8nButton from '../N8nButton'; import N8nIcon from '../N8nIcon'; -import N8nIconButton from '../N8nIconButton'; const { t } = useI18n(); @@ -28,10 +27,12 @@ interface Props { title?: string; placeholder?: string; scrollOnNewMessage?: boolean; + showStop?: boolean; } const emit = defineEmits<{ close: []; + stop: []; message: [string, string?, boolean?]; codeReplace: [number]; codeUndo: [number]; @@ -253,11 +254,24 @@ watch( @input.prevent="growInput" @keydown.stop /> - + does not render retry button if no error is present rows="1" wrap="hard" /> - @@ -988,15 +992,19 @@ Testing more code rows="1" wrap="hard" /> - @@ -1169,15 +1177,19 @@ exports[`AskAssistantChat > renders default placeholder chat correctly 1`] = ` rows="1" wrap="hard" /> - @@ -1438,15 +1450,19 @@ exports[`AskAssistantChat > renders end of session chat correctly 1`] = ` rows="1" wrap="hard" /> - @@ -1641,15 +1657,19 @@ exports[`AskAssistantChat > renders error message correctly with retry button 1` rows="1" wrap="hard" /> - @@ -1900,15 +1920,19 @@ catch(e) { rows="1" wrap="hard" /> - @@ -2092,15 +2116,19 @@ exports[`AskAssistantChat > renders streaming chat correctly 1`] = ` rows="1" wrap="hard" /> - diff --git a/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/CanvasThinkingPill.vue b/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/CanvasThinkingPill.vue index 2ca854014f..0fa66412b9 100644 --- a/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/CanvasThinkingPill.vue +++ b/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/CanvasThinkingPill.vue @@ -3,11 +3,20 @@ import { useCssModule } from 'vue'; import { useI18n } from '../../composables/useI18n'; import AssistantIcon from '../AskAssistantIcon/AssistantIcon.vue'; +import N8nButton from '../N8nButton'; defineOptions({ name: 'CanvasThinkingPill', }); +defineProps<{ + showStop?: boolean; +}>(); + +const emit = defineEmits<{ + stop: []; +}>(); + const { t } = useI18n(); const $style = useCssModule(); @@ -17,7 +26,17 @@ const $style = useCssModule();
- {{ t('aiAssistant.builder.canvas.thinking') }} + {{ t('aiAssistant.builder.canvas.thinking') }} + + @@ -28,7 +47,7 @@ const $style = useCssModule(); padding: 0 var(--spacing-s) 0 var(--spacing-xs); justify-content: center; align-items: center; - gap: var(--spacing-3xs); + gap: var(--spacing-2xs); border-radius: 22px; border: 1px solid var(--prim-gray-740); background: rgba(65, 66, 68, 0.92); @@ -51,6 +70,9 @@ const $style = useCssModule(); justify-content: center; } +.stopButton { + margin-left: var(--spacing-xs); +} .text { color: white; font-size: var(--font-size-s); diff --git a/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/__snapshots__/CanvasThinkingPill.test.ts.snap b/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/__snapshots__/CanvasThinkingPill.test.ts.snap index b7b3bae710..dbe0e2fc78 100644 --- a/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/__snapshots__/CanvasThinkingPill.test.ts.snap +++ b/packages/frontend/@n8n/design-system/src/components/CanvasThinkingPill/__snapshots__/CanvasThinkingPill.test.ts.snap @@ -46,7 +46,8 @@ exports[`CanvasThinkingPill > renders canvas thinking pill correctly 1`] = ` - Working... + Working... + diff --git a/packages/frontend/@n8n/i18n/src/locales/en.json b/packages/frontend/@n8n/i18n/src/locales/en.json index e5f50e23c6..ccc73b2028 100644 --- a/packages/frontend/@n8n/i18n/src/locales/en.json +++ b/packages/frontend/@n8n/i18n/src/locales/en.json @@ -190,6 +190,7 @@ "aiAssistant.builder.canvasPrompt.cancelButton": "Cancel", "aiAssistant.builder.canvasPrompt.startManually.title": "Start manually", "aiAssistant.builder.canvasPrompt.startManually.subTitle": "Add the first node", + "aiAssistant.builder.streamAbortedMessage": "[Task aborted]", "aiAssistant.assistant": "AI Assistant", "aiAssistant.newSessionModal.title.part1": "Start new", "aiAssistant.newSessionModal.title.part2": "session", diff --git a/packages/frontend/@n8n/rest-api-client/src/utils.ts b/packages/frontend/@n8n/rest-api-client/src/utils.ts index f0a9836d69..bfe2577a04 100644 --- a/packages/frontend/@n8n/rest-api-client/src/utils.ts +++ b/packages/frontend/@n8n/rest-api-client/src/utils.ts @@ -219,6 +219,7 @@ export async function streamRequest( onDone?: () => void, onError?: (e: Error) => void, separator = STREAM_SEPERATOR, + abortSignal?: AbortSignal, ): Promise { const headers: Record = { 'browser-id': getBrowserId(), @@ -229,6 +230,7 @@ export async function streamRequest( method: 'POST', credentials: 'include', body: JSON.stringify(payload), + signal: abortSignal, }; try { const response = await fetch(`${context.baseUrl}${apiEndpoint}`, assistantRequest); diff --git a/packages/frontend/editor-ui/src/api/ai.ts b/packages/frontend/editor-ui/src/api/ai.ts index 382a5e4e75..f3f36ce908 100644 --- a/packages/frontend/editor-ui/src/api/ai.ts +++ b/packages/frontend/editor-ui/src/api/ai.ts @@ -13,6 +13,7 @@ export function chatWithBuilder( onMessageUpdated: (data: ChatRequest.ResponsePayload) => void, onDone: () => void, onError: (e: Error) => void, + abortSignal?: AbortSignal, ): void { void streamRequest( ctx, @@ -21,6 +22,8 @@ export function chatWithBuilder( onMessageUpdated, onDone, onError, + undefined, + abortSignal, ); } diff --git a/packages/frontend/editor-ui/src/components/AskAssistant/Agent/AskAssistantBuild.vue b/packages/frontend/editor-ui/src/components/AskAssistant/Agent/AskAssistantBuild.vue index 632e2f3c48..dc67df77a8 100644 --- a/packages/frontend/editor-ui/src/components/AskAssistant/Agent/AskAssistantBuild.vue +++ b/packages/frontend/editor-ui/src/components/AskAssistant/Agent/AskAssistantBuild.vue @@ -132,11 +132,13 @@ watch(currentRoute, () => { :loading-message="loadingMessage" :mode="i18n.baseText('aiAssistant.builder.mode')" :title="'n8n AI'" + :show-stop="true" :scroll-on-new-message="true" :placeholder="i18n.baseText('aiAssistant.builder.placeholder')" @close="emit('close')" @message="onUserMessage" @feedback="onFeedback" + @stop="builderStore.stopStreaming" >