feat: Abort AI builder requests on chat stop (#17854)

This commit is contained in:
oleg
2025-08-04 09:55:07 +02:00
committed by GitHub
parent 1554e76500
commit ce98f7c175
19 changed files with 585 additions and 91 deletions

View File

@@ -167,10 +167,10 @@ export class AiWorkflowBuilderService {
return this.agent;
}
async *chat(payload: ChatPayload, user?: IUser) {
async *chat(payload: ChatPayload, user?: IUser, abortSignal?: AbortSignal) {
const agent = await this.getAgent(user);
for await (const output of agent.chat(payload, user?.id?.toString())) {
for await (const output of agent.chat(payload, user?.id?.toString(), abortSignal)) {
yield output;
}
}

View File

@@ -1,6 +1,7 @@
import type { BaseChatModel } from '@langchain/core/language_models/chat_models';
import type { AIMessage, ToolMessage } from '@langchain/core/messages';
import { HumanMessage, RemoveMessage } from '@langchain/core/messages';
import type { ToolMessage } from '@langchain/core/messages';
import { AIMessage, HumanMessage, RemoveMessage } from '@langchain/core/messages';
import type { RunnableConfig } from '@langchain/core/runnables';
import type { LangChainTracer } from '@langchain/core/tracers/tracer_langchain';
import { StateGraph, MemorySaver, END } from '@langchain/langgraph';
import type { Logger } from '@n8n/backend-common';
@@ -180,71 +181,73 @@ export class WorkflowBuilderAgent {
: crypto.randomUUID();
}
async *chat(payload: ChatPayload, userId?: string) {
private getDefaultWorkflowJSON(payload: ChatPayload): SimpleWorkflow {
return (
(payload.workflowContext?.currentWorkflow as SimpleWorkflow) ?? {
nodes: [],
connections: {},
}
);
}
async *chat(payload: ChatPayload, userId?: string, abortSignal?: AbortSignal) {
const agent = this.createWorkflow().compile({ checkpointer: this.checkpointer });
const workflowId = payload.workflowContext?.currentWorkflow?.id;
// Generate thread ID from workflowId and userId
// This ensures one session per workflow per user
const threadId = WorkflowBuilderAgent.generateThreadId(workflowId, userId);
// Configure thread for checkpointing
const threadConfig = {
const threadConfig: RunnableConfig = {
configurable: {
thread_id: threadId,
},
};
const streamConfig = {
...threadConfig,
streamMode: ['updates', 'custom'],
recursionLimit: 30,
signal: abortSignal,
callbacks: this.tracer ? [this.tracer] : undefined,
} as RunnableConfig;
// Check if this is a subsequent message
// If so, update the workflowJSON with the current editor state
const existingCheckpoint = await this.checkpointer.getTuple(threadConfig);
let stream;
if (!existingCheckpoint?.checkpoint) {
// First message - use initial state
const initialState: typeof WorkflowState.State = {
const stream = await agent.stream(
{
messages: [new HumanMessage({ content: payload.message })],
workflowJSON: (payload.workflowContext?.currentWorkflow as SimpleWorkflow) ?? {
nodes: [],
connections: {},
},
workflowJSON: this.getDefaultWorkflowJSON(payload),
workflowOperations: [],
workflowContext: payload.workflowContext,
};
},
streamConfig,
);
stream = await agent.stream(initialState, {
...threadConfig,
streamMode: ['updates', 'custom'],
recursionLimit: 30,
callbacks: this.tracer ? [this.tracer] : undefined,
});
} else {
// Subsequent message - update the state with current workflow
const stateUpdate: Partial<typeof WorkflowState.State> = {
messages: [new HumanMessage({ content: payload.message })],
workflowOperations: [], // Clear any pending operations from previous message
workflowContext: payload.workflowContext,
workflowJSON: { nodes: [], connections: {} }, // Default to empty workflow
};
if (payload.workflowContext?.currentWorkflow) {
stateUpdate.workflowJSON = payload.workflowContext?.currentWorkflow as SimpleWorkflow;
try {
const streamProcessor = createStreamProcessor(stream);
for await (const output of streamProcessor) {
yield output;
}
} catch (error) {
if (
error &&
typeof error === 'object' &&
'message' in error &&
typeof error.message === 'string' &&
// This is naive, but it's all we get from LangGraph AbortError
['Abort', 'Aborted'].includes(error.message)
) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const messages = (await agent.getState(threadConfig)).values.messages as Array<
AIMessage | HumanMessage | ToolMessage
>;
// Stream with just the new message
stream = await agent.stream(stateUpdate, {
...threadConfig,
streamMode: ['updates', 'custom'],
recursionLimit: 80,
callbacks: this.tracer ? [this.tracer] : undefined,
});
}
// Use the stream processor utility to handle chunk processing
const streamProcessor = createStreamProcessor(stream);
for await (const output of streamProcessor) {
yield output;
// Handle abort errors gracefully
const abortedAiMessage = new AIMessage({
content: '[Task aborted]',
id: crypto.randomUUID(),
});
// TODO: Should we clear tool calls that are in progress?
await agent.updateState(threadConfig, { messages: [...messages, abortedAiMessage] });
return;
}
throw error;
}
}
@@ -256,7 +259,7 @@ export class WorkflowBuilderAgent {
if (workflowId) {
const threadId = WorkflowBuilderAgent.generateThreadId(workflowId, userId);
const threadConfig = {
const threadConfig: RunnableConfig = {
configurable: {
thread_id: threadId,
},