From 3969425925b0f67d6d95d8295c80f0b2315c0465 Mon Sep 17 00:00:00 2001 From: Eugene Date: Mon, 23 Jun 2025 08:59:40 +0200 Subject: [PATCH] feat(MCP Server Trigger Node): Terminate sessions on DELETE request (#16550) --- .../nodes/mcp/McpTrigger/McpServer.ts | 24 ++++++ .../nodes/mcp/McpTrigger/McpTrigger.node.ts | 38 ++++++--- .../mcp/McpTrigger/__test__/McpServer.test.ts | 84 +++++++++++++++++++ .../__test__/McpTrigger.node.test.ts | 32 ++++++- 4 files changed, 167 insertions(+), 11 deletions(-) diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts index f1f0f5ba03..676ce65ce1 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts @@ -219,6 +219,30 @@ export class McpServerManager { return wasToolCall(req.rawBody.toString()); } + async handleDeleteRequest(req: express.Request, resp: CompressionResponse) { + const sessionId = this.getSessionId(req); + + if (!sessionId) { + resp.status(400).send('No sessionId provided'); + return; + } + + const transport = this.getTransport(sessionId); + + if (transport) { + if (transport instanceof FlushingStreamableHTTPTransport) { + await transport.handleRequest(req, resp); + return; + } else { + // For SSE transport, we don't support DELETE requests + resp.status(405).send('Method Not Allowed'); + return; + } + } + + resp.status(404).send('Session not found'); + } + setUpHandlers(server: Server) { server.setRequestHandler( ListToolsRequestSchema, diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts index bd2383e937..ff3a111171 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts @@ -125,6 +125,16 @@ export class McpTrigger extends Node { ndvHideMethod: true, ndvHideUrl: true, }, + { + name: 'default', + httpMethod: 'DELETE', + responseMode: 'onReceived', + isFullPath: true, + path: '={{$parameter["path"]}}', + nodeType: 'mcp', + ndvHideMethod: true, + ndvHideUrl: true, + }, ], }; @@ -162,22 +172,30 @@ export class McpTrigger extends Node { return { noWebhookResponse: true }; } else if (webhookName === 'default') { - // Here we handle POST requests. These can be either + // Here we handle POST and DELETE requests. + // POST can be either: // 1) Client calls in an established session using the SSE transport, or // 2) Client calls in an established session using the StreamableHTTPServerTransport // 3) Session setup requests using the StreamableHTTPServerTransport + // DELETE is used to terminate the session using the StreamableHTTPServerTransport - // Check if there is a session and a transport is already established - const sessionId = mcpServerManager.getSessionId(req); - if (sessionId && mcpServerManager.getTransport(sessionId)) { - const connectedTools = await getConnectedTools(context, true); - const wasToolCall = await mcpServerManager.handlePostMessage(req, resp, connectedTools); - if (wasToolCall) return { noWebhookResponse: true, workflowData: [[{ json: {} }]] }; + if (req.method === 'DELETE') { + await mcpServerManager.handleDeleteRequest(req, resp); } else { - // If no session is established, this is a setup request - // for the StreamableHTTPServerTransport, so we create a new transport - await mcpServerManager.createServerWithStreamableHTTPTransport(serverName, resp, req); + // Check if there is a session and a transport is already established + const sessionId = mcpServerManager.getSessionId(req); + + if (sessionId && mcpServerManager.getTransport(sessionId)) { + const connectedTools = await getConnectedTools(context, true); + const wasToolCall = await mcpServerManager.handlePostMessage(req, resp, connectedTools); + if (wasToolCall) return { noWebhookResponse: true, workflowData: [[{ json: {} }]] }; + } else { + // If no session is established, this is a setup request + // for the StreamableHTTPServerTransport, so we create a new transport + await mcpServerManager.createServerWithStreamableHTTPTransport(serverName, resp, req); + } } + return { noWebhookResponse: true }; } diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts index ff8dec5e7e..4487704595 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts @@ -445,4 +445,88 @@ describe('McpServer', () => { expect(result2).toBe(mockTransport2); }); }); + + describe('handleDeleteRequest', () => { + beforeEach(() => { + // Clear transports and servers before each test + mcpServerManager.transports = {}; + mcpServerManager.servers = {}; + }); + + it('should handle DELETE request for StreamableHTTP transport', async () => { + const deleteSessionId = 'delete-session-id'; + const mockDeleteRequest = mock({ + headers: { 'mcp-session-id': deleteSessionId }, + }); + const mockDeleteResponse = mock(); + mockDeleteResponse.status.mockReturnThis(); + + // Create a mock transport that passes instanceof check + const mockHttpTransport = Object.create(FlushingStreamableHTTPTransport.prototype); + mockHttpTransport.handleRequest = jest.fn(); + + // Set up the transport + mcpServerManager.transports[deleteSessionId] = mockHttpTransport; + + // Call handleDeleteRequest + await mcpServerManager.handleDeleteRequest(mockDeleteRequest, mockDeleteResponse); + + // Verify transport.handleRequest was called + expect(mockHttpTransport.handleRequest).toHaveBeenCalledWith( + mockDeleteRequest, + mockDeleteResponse, + ); + }); + + it('should return 400 when no sessionId provided', async () => { + const mockDeleteRequest = mock({ + query: {}, + headers: {}, + }); + const mockDeleteResponse = mock(); + mockDeleteResponse.status.mockReturnThis(); + + // Mock getSessionId to return undefined + jest.spyOn(mcpServerManager, 'getSessionId').mockReturnValueOnce(undefined); + + // Call handleDeleteRequest without sessionId + await mcpServerManager.handleDeleteRequest(mockDeleteRequest, mockDeleteResponse); + + // Verify 400 response + expect(mockDeleteResponse.status).toHaveBeenCalledWith(400); + }); + + it('should return 404 for non-existent session', async () => { + const mockDeleteRequest = mock({ + headers: { 'mcp-session-id': 'non-existent-session' }, + }); + const mockDeleteResponse = mock(); + mockDeleteResponse.status.mockReturnThis(); + + // Call handleDeleteRequest with non-existent sessionId + await mcpServerManager.handleDeleteRequest(mockDeleteRequest, mockDeleteResponse); + + // Verify 404 response (session not found) + expect(mockDeleteResponse.status).toHaveBeenCalledWith(404); + }); + + it('should return 405 for SSE transport session', async () => { + const sseSessionId = 'sse-session-id'; + const mockDeleteRequest = mock({ + query: { sessionId: sseSessionId }, + }); + const mockDeleteResponse = mock(); + mockDeleteResponse.status.mockReturnThis(); + const mockSSETransport = mock(); + + // Set up SSE transport + mcpServerManager.transports[sseSessionId] = mockSSETransport; + + // Call handleDeleteRequest + await mcpServerManager.handleDeleteRequest(mockDeleteRequest, mockDeleteResponse); + + // Verify 405 response (DELETE not supported for SSE) + expect(mockDeleteResponse.status).toHaveBeenCalledWith(405); + }); + }); }); diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts index dd690fa46f..a2ce257733 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts @@ -5,7 +5,10 @@ import type { INode, IWebhookFunctions } from 'n8n-workflow'; import * as helpers from '@utils/helpers'; -import type { FlushingSSEServerTransport } from '../FlushingTransport'; +import type { + FlushingSSEServerTransport, + FlushingStreamableHTTPTransport, +} from '../FlushingTransport'; import type { McpServerManager } from '../McpServer'; import { McpTrigger } from '../McpTrigger.node'; @@ -137,5 +140,32 @@ describe('McpTrigger Node', () => { mockResponse, ); }); + + it('should handle DELETE webhook for StreamableHTTP session termination', async () => { + // Configure the context for DELETE webhook + mockContext.getWebhookName.mockReturnValue('default'); + const mockDeleteRequest = mock({ + method: 'DELETE', + headers: { 'mcp-session-id': sessionId }, + path: '/custom-path', + }); + mockContext.getRequestObject.mockReturnValueOnce(mockDeleteRequest); + + // Mock existing StreamableHTTP transport + mockServerManager.getSessionId.mockReturnValue(sessionId); + mockServerManager.getTransport.mockReturnValue(mock({})); + + // Call the webhook method + const result = await mcpTrigger.webhook(mockContext); + + // Verify that handleDeleteRequest was called + expect(mockServerManager.handleDeleteRequest).toHaveBeenCalledWith( + mockDeleteRequest, + mockResponse, + ); + + // Verify the returned result + expect(result).toEqual({ noWebhookResponse: true }); + }); }); });