diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/FlushingSSEServerTransport.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/FlushingSSEServerTransport.ts deleted file mode 100644 index fe7cac525a..0000000000 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/FlushingSSEServerTransport.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; -import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; -import type { Response } from 'express'; - -export type CompressionResponse = Response & { - /** - * `flush()` is defined in the compression middleware. - * This is necessary because the compression middleware sometimes waits - * for a certain amount of data before sending the data to the client - */ - flush: () => void; -}; - -export class FlushingSSEServerTransport extends SSEServerTransport { - constructor( - _endpoint: string, - private response: CompressionResponse, - ) { - super(_endpoint, response); - } - - async send(message: JSONRPCMessage): Promise { - await super.send(message); - this.response.flush(); - } -} diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/FlushingTransport.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/FlushingTransport.ts new file mode 100644 index 0000000000..beb5495c47 --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/FlushingTransport.ts @@ -0,0 +1,61 @@ +import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import type { StreamableHTTPServerTransportOptions } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; +import type { Response } from 'express'; +import type { IncomingMessage, ServerResponse } from 'http'; + +export type CompressionResponse = Response & { + /** + * `flush()` is defined in the compression middleware. + * This is necessary because the compression middleware sometimes waits + * for a certain amount of data before sending the data to the client + */ + flush: () => void; +}; + +export class FlushingSSEServerTransport extends SSEServerTransport { + constructor( + _endpoint: string, + private response: CompressionResponse, + ) { + super(_endpoint, response); + } + + async send(message: JSONRPCMessage): Promise { + await super.send(message); + this.response.flush(); + } + + async handleRequest( + req: IncomingMessage, + resp: ServerResponse, + message: IncomingMessage, + ): Promise { + await super.handlePostMessage(req, resp, message); + this.response.flush(); + } +} + +export class FlushingStreamableHTTPTransport extends StreamableHTTPServerTransport { + private response: CompressionResponse; + + constructor(options: StreamableHTTPServerTransportOptions, response: CompressionResponse) { + super(options); + this.response = response; + } + + async send(message: JSONRPCMessage): Promise { + await super.send(message); + this.response.flush(); + } + + async handleRequest( + req: IncomingMessage, + resp: ServerResponse, + parsedBody?: unknown, + ): Promise { + await super.handleRequest(req, resp, parsedBody); + this.response.flush(); + } +} diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts index 961d2aa5e2..f1f0f5ba03 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpServer.ts @@ -11,12 +11,14 @@ import { ListToolsRequestSchema, CallToolRequestSchema, } from '@modelcontextprotocol/sdk/types.js'; +import { randomUUID } from 'crypto'; import type * as express from 'express'; -import { OperationalError, type Logger } from 'n8n-workflow'; +import type { IncomingMessage } from 'http'; +import { jsonParse, OperationalError, type Logger } from 'n8n-workflow'; import { zodToJsonSchema } from 'zod-to-json-schema'; -import { FlushingSSEServerTransport } from './FlushingSSEServerTransport'; -import type { CompressionResponse } from './FlushingSSEServerTransport'; +import { FlushingSSEServerTransport, FlushingStreamableHTTPTransport } from './FlushingTransport'; +import type { CompressionResponse } from './FlushingTransport'; /** * Parses the JSONRPC message and checks whether the method used was a tool @@ -42,9 +44,8 @@ function wasToolCall(body: string) { * Returns undefined if the message doesn't have an ID (for example on a tool list request) * */ -function getRequestId(body: string): string | undefined { +function getRequestId(message: unknown): string | undefined { try { - const message: unknown = JSON.parse(body); const parsedMessage: JSONRPCMessage = JSONRPCMessageSchema.parse(message); return 'id' in parsedMessage ? String(parsedMessage.id) : undefined; } catch { @@ -63,7 +64,9 @@ export class McpServerManager { servers: { [sessionId: string]: Server } = {}; - transports: { [sessionId: string]: FlushingSSEServerTransport } = {}; + transports: { + [sessionId: string]: FlushingSSEServerTransport | FlushingStreamableHTTPTransport; + } = {}; private tools: { [sessionId: string]: Tool[] } = {}; @@ -85,24 +88,28 @@ export class McpServerManager { return McpServerManager.#instance; } - async createServerAndTransport( + async createServerWithSSETransport( serverName: string, postUrl: string, resp: CompressionResponse, ): Promise { - const transport = new FlushingSSEServerTransport(postUrl, resp); const server = new Server( { name: serverName, version: '0.1.0', }, { - capabilities: { tools: {} }, + capabilities: { + tools: {}, + }, }, ); + const transport = new FlushingSSEServerTransport(postUrl, resp); + this.setUpHandlers(server); - const { sessionId } = transport; + + const sessionId = transport.sessionId; this.transports[sessionId] = transport; this.servers[sessionId] = server; @@ -112,7 +119,6 @@ export class McpServerManager { delete this.transports[sessionId]; delete this.servers[sessionId]; }); - await server.connect(transport); // Make sure we flush the compression middleware, so that it's not waiting for more content to be added to the buffer @@ -121,16 +127,74 @@ export class McpServerManager { } } + getSessionId(req: express.Request): string | undefined { + // Session ID can be passed either as a query parameter (SSE transport) + // or in the header (StreamableHTTP transport). + return (req.query.sessionId ?? req.headers['mcp-session-id']) as string | undefined; + } + + getTransport( + sessionId: string, + ): FlushingSSEServerTransport | FlushingStreamableHTTPTransport | undefined { + return this.transports[sessionId]; + } + + async createServerWithStreamableHTTPTransport( + serverName: string, + resp: CompressionResponse, + req?: express.Request, + ): Promise { + const server = new Server( + { + name: serverName, + version: '0.1.0', + }, + { + capabilities: { + tools: {}, + }, + }, + ); + + const transport = new FlushingStreamableHTTPTransport( + { + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (sessionId) => { + this.logger.debug(`New session initialized: ${sessionId}`); + transport.onclose = () => { + this.logger.debug(`Deleting transport for ${sessionId}`); + delete this.tools[sessionId]; + delete this.transports[sessionId]; + delete this.servers[sessionId]; + }; + this.transports[sessionId] = transport; + this.servers[sessionId] = server; + }, + }, + resp, + ); + + this.setUpHandlers(server); + + await server.connect(transport); + + await transport.handleRequest(req as IncomingMessage, resp, req?.body); + if (resp.flush) { + resp.flush(); + } + } + async handlePostMessage(req: express.Request, resp: CompressionResponse, connectedTools: Tool[]) { - const sessionId = req.query.sessionId as string; - const transport = this.transports[sessionId]; - if (transport) { + // Session ID can be passed either as a query parameter (SSE transport) + // or in the header (StreamableHTTP transport). + const sessionId = this.getSessionId(req); + const transport = this.getTransport(sessionId as string); + if (sessionId && transport) { // We need to add a promise here because the `handlePostMessage` will send something to the // MCP Server, that will run in a different context. This means that the return will happen // almost immediately, and will lead to marking the sub-node as "running" in the final execution - const bodyString = req.rawBody.toString(); - const messageId = getRequestId(bodyString); - + const message = jsonParse(req.rawBody.toString()); + const messageId = getRequestId(message); // Use session & message ID if available, otherwise fall back to sessionId const callId = messageId ? `${sessionId}_${messageId}` : sessionId; this.tools[sessionId] = connectedTools; @@ -138,7 +202,7 @@ export class McpServerManager { try { await new Promise(async (resolve) => { this.resolveFunctions[callId] = resolve; - await transport.handlePostMessage(req, resp, bodyString); + await transport.handleRequest(req, resp, message as IncomingMessage); }); } finally { delete this.resolveFunctions[callId]; diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts index 7719098e48..bd2383e937 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/McpTrigger.node.ts @@ -5,7 +5,7 @@ import { NodeConnectionTypes, Node } from 'n8n-workflow'; import { getConnectedTools, nodeNameToToolName } from '@utils/helpers'; -import type { CompressionResponse } from './FlushingSSEServerTransport'; +import type { CompressionResponse } from './FlushingTransport'; import { McpServerManager } from './McpServer'; const MCP_SSE_SETUP_PATH = 'sse'; @@ -20,9 +20,10 @@ export class McpTrigger extends Node { dark: 'file:../mcp.dark.svg', }, group: ['trigger'], - version: [1, 1.1], + version: [1, 1.1, 2], description: 'Expose n8n tools as an MCP Server endpoint', - activationMessage: 'You can now connect your MCP Clients to the SSE URL.', + activationMessage: + 'You can now connect your MCP Clients to the URL, using SSE or Streamable HTTP transports.', defaults: { name: 'MCP Server Trigger', }, @@ -109,7 +110,7 @@ export class McpTrigger extends Node { httpMethod: 'GET', responseMode: 'onReceived', isFullPath: true, - path: `={{$parameter["path"]}}/${MCP_SSE_SETUP_PATH}`, + path: `={{$parameter["path"]}}{{parseFloat($nodeVersion)<2 ? '/${MCP_SSE_SETUP_PATH}' : ''}}`, nodeType: 'mcp', ndvHideMethod: true, ndvHideUrl: false, @@ -119,7 +120,7 @@ export class McpTrigger extends Node { httpMethod: 'POST', responseMode: 'onReceived', isFullPath: true, - path: `={{$parameter["path"]}}/${MCP_SSE_MESSAGES_PATH}`, + path: `={{$parameter["path"]}}{{parseFloat($nodeVersion)<2 ? '/${MCP_SSE_MESSAGES_PATH}' : ''}}`, nodeType: 'mcp', ndvHideMethod: true, ndvHideUrl: true, @@ -151,22 +152,32 @@ export class McpTrigger extends Node { if (webhookName === 'setup') { // Sets up the transport and opens the long-lived connection. This resp // will stay streaming, and is the channel that sends the events - const postUrl = req.path.replace( - new RegExp(`/${MCP_SSE_SETUP_PATH}$`), - `/${MCP_SSE_MESSAGES_PATH}`, - ); - await mcpServerManager.createServerAndTransport(serverName, postUrl, resp); + + // Prior to version 2.0, we use different paths for the setup and messages. + const postUrl = + node.typeVersion < 2 + ? req.path.replace(new RegExp(`/${MCP_SSE_SETUP_PATH}$`), `/${MCP_SSE_MESSAGES_PATH}`) + : req.path; + await mcpServerManager.createServerWithSSETransport(serverName, postUrl, resp); return { noWebhookResponse: true }; } else if (webhookName === 'default') { - // This is the command-channel, and is actually executing the tools. This - // sends the response back through the long-lived connection setup in the - // 'setup' call - const connectedTools = await getConnectedTools(context, true); + // Here we handle POST requests. These can be either + // 1) Client calls in an established session using the SSE transport, or + // 2) Client calls in an established session using the StreamableHTTPServerTransport + // 3) Session setup requests using the StreamableHTTPServerTransport - const wasToolCall = await mcpServerManager.handlePostMessage(req, resp, connectedTools); - - if (wasToolCall) return { noWebhookResponse: true, workflowData: [[{ json: {} }]] }; + // Check if there is a session and a transport is already established + const sessionId = mcpServerManager.getSessionId(req); + if (sessionId && mcpServerManager.getTransport(sessionId)) { + const connectedTools = await getConnectedTools(context, true); + const wasToolCall = await mcpServerManager.handlePostMessage(req, resp, connectedTools); + if (wasToolCall) return { noWebhookResponse: true, workflowData: [[{ json: {} }]] }; + } else { + // If no session is established, this is a setup request + // for the StreamableHTTPServerTransport, so we create a new transport + await mcpServerManager.createServerWithStreamableHTTPTransport(serverName, resp, req); + } return { noWebhookResponse: true }; } diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/FlushingSSEServerTransport.test.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/FlushingSSEServerTransport.test.ts deleted file mode 100644 index b3471a4b0c..0000000000 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/FlushingSSEServerTransport.test.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { jest } from '@jest/globals'; -import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; -import { mock } from 'jest-mock-extended'; - -import { FlushingSSEServerTransport } from '../FlushingSSEServerTransport'; -import type { CompressionResponse } from '../FlushingSSEServerTransport'; - -describe('FlushingSSEServerTransport', () => { - const mockResponse = mock(); - let transport: FlushingSSEServerTransport; - const endpoint = '/test/endpoint'; - - beforeEach(() => { - jest.resetAllMocks(); - mockResponse.status.mockReturnThis(); - transport = new FlushingSSEServerTransport(endpoint, mockResponse); - }); - - it('should call flush after sending a message', async () => { - // Create a sample JSONRPC message - const message: JSONRPCMessage = { - jsonrpc: '2.0', - id: '123', - result: { success: true }, - }; - - // Send a message through the transport - await transport.start(); - await transport.send(message); - - expect(mockResponse.writeHead).toHaveBeenCalledWith(200, { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache, no-transform', - Connection: 'keep-alive', - }); - expect(mockResponse.write).toHaveBeenCalledWith( - // @ts-expect-error `_sessionId` is private - `event: endpoint\ndata: /test/endpoint?sessionId=${transport._sessionId}\n\n`, - ); - expect(mockResponse.write).toHaveBeenCalledWith( - `event: message\ndata: ${JSON.stringify(message)}\n\n`, - ); - expect(mockResponse.flush).toHaveBeenCalled(); - }); -}); diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/FlushingTransport.test.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/FlushingTransport.test.ts new file mode 100644 index 0000000000..fd77aed54b --- /dev/null +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/FlushingTransport.test.ts @@ -0,0 +1,102 @@ +import { jest } from '@jest/globals'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js'; +import type { IncomingMessage, ServerResponse } from 'http'; +import { mock } from 'jest-mock-extended'; + +import { FlushingSSEServerTransport, FlushingStreamableHTTPTransport } from '../FlushingTransport'; +import type { CompressionResponse } from '../FlushingTransport'; + +describe('FlushingSSEServerTransport', () => { + const mockResponse = mock(); + let transport: FlushingSSEServerTransport; + const endpoint = '/test/endpoint'; + + beforeEach(() => { + jest.resetAllMocks(); + mockResponse.status.mockReturnThis(); + transport = new FlushingSSEServerTransport(endpoint, mockResponse); + }); + + it('should call flush after sending a message', async () => { + // Create a sample JSONRPC message + const message: JSONRPCMessage = { + jsonrpc: '2.0', + id: '123', + result: { success: true }, + }; + + // Send a message through the transport + await transport.start(); + await transport.send(message); + + expect(mockResponse.writeHead).toHaveBeenCalledWith(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache, no-transform', + Connection: 'keep-alive', + }); + expect(mockResponse.write).toHaveBeenCalledWith( + // @ts-expect-error `_sessionId` is private + `event: endpoint\ndata: /test/endpoint?sessionId=${transport._sessionId}\n\n`, + ); + expect(mockResponse.write).toHaveBeenCalledWith( + `event: message\ndata: ${JSON.stringify(message)}\n\n`, + ); + expect(mockResponse.flush).toHaveBeenCalled(); + }); +}); + +describe('FlushingStreamableHTTPTransport', () => { + const mockResponse = mock(); + let transport: FlushingStreamableHTTPTransport; + const options = { + sessionIdGenerator: () => 'test-session-id', + onsessioninitialized: jest.fn(), + }; + + beforeEach(() => { + jest.resetAllMocks(); + mockResponse.status.mockReturnThis(); + + // Mock the parent class methods before creating the instance + jest.spyOn(StreamableHTTPServerTransport.prototype, 'send').mockResolvedValue(); + jest.spyOn(StreamableHTTPServerTransport.prototype, 'handleRequest').mockResolvedValue(); + + transport = new FlushingStreamableHTTPTransport(options, mockResponse); + }); + + it('should call flush after sending a message', async () => { + const message: JSONRPCMessage = { + jsonrpc: '2.0', + id: '123', + result: { success: true }, + }; + + await transport.send(message); + + expect(StreamableHTTPServerTransport.prototype.send).toHaveBeenCalledWith(message); + expect(mockResponse.flush).toHaveBeenCalled(); + }); + + it('should call flush after handling a request', async () => { + const mockRequest = mock(); + const mockServerResponse = mock(); + const parsedBody = { jsonrpc: '2.0', method: 'test', id: '123' }; + + await transport.handleRequest(mockRequest, mockServerResponse, parsedBody); + + expect(StreamableHTTPServerTransport.prototype.handleRequest).toHaveBeenCalledWith( + mockRequest, + mockServerResponse, + parsedBody, + ); + expect(mockResponse.flush).toHaveBeenCalled(); + }); + + it('should pass options correctly to parent constructor', () => { + expect(transport).toBeInstanceOf(FlushingStreamableHTTPTransport); + expect(transport).toBeInstanceOf(StreamableHTTPServerTransport); + expect(typeof transport.send).toBe('function'); + expect(typeof transport.handleRequest).toBe('function'); + }); +}); diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts index 3e64029e47..ff8dec5e7e 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpServer.test.ts @@ -1,11 +1,11 @@ -import { jest } from '@jest/globals'; import type { Tool } from '@langchain/core/tools'; import { Server } from '@modelcontextprotocol/sdk/server/index.js'; +import type { StreamableHTTPServerTransportOptions } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import type { Request } from 'express'; import { captor, mock } from 'jest-mock-extended'; -import type { CompressionResponse } from '../FlushingSSEServerTransport'; -import { FlushingSSEServerTransport } from '../FlushingSSEServerTransport'; +import type { CompressionResponse } from '../FlushingTransport'; +import { FlushingSSEServerTransport, FlushingStreamableHTTPTransport } from '../FlushingTransport'; import { McpServerManager } from '../McpServer'; const sessionId = 'mock-session-id'; @@ -17,9 +17,14 @@ jest.mock('@modelcontextprotocol/sdk/server/index.js', () => { }); const mockTransport = mock({ sessionId }); -jest.mock('../FlushingSSEServerTransport', () => { +mockTransport.handleRequest.mockImplementation(jest.fn()); +const mockStreamableTransport = mock(); +mockStreamableTransport.onclose = jest.fn(); + +jest.mock('../FlushingTransport', () => { return { FlushingSSEServerTransport: jest.fn().mockImplementation(() => mockTransport), + FlushingStreamableHTTPTransport: jest.fn().mockImplementation(() => mockStreamableTransport), }; }); @@ -39,7 +44,7 @@ describe('McpServer', () => { const postUrl = '/post-url'; it('should set up a transport and server', async () => { - await mcpServerManager.createServerAndTransport('mcpServer', postUrl, mockResponse); + await mcpServerManager.createServerWithSSETransport('mcpServer', postUrl, mockResponse); // Check that FlushingSSEServerTransport was initialized with correct params expect(FlushingSSEServerTransport).toHaveBeenCalledWith(postUrl, mockResponse); @@ -59,7 +64,7 @@ describe('McpServer', () => { }); it('should set up close handler that cleans up resources', async () => { - await mcpServerManager.createServerAndTransport('mcpServer', postUrl, mockResponse); + await mcpServerManager.createServerWithSSETransport('mcpServer', postUrl, mockResponse); // Get the close callback and execute it const closeCallbackCaptor = captor<() => Promise>(); @@ -73,8 +78,8 @@ describe('McpServer', () => { }); describe('handlePostMessage', () => { - it('should call transport.handlePostMessage when transport exists', async () => { - mockTransport.handlePostMessage.mockImplementation(async () => { + it('should call transport.handleRequest when transport exists', async () => { + mockTransport.handleRequest.mockImplementation(async () => { // @ts-expect-error private property `resolveFunctions` mcpServerManager.resolveFunctions[`${sessionId}_123`](); }); @@ -96,11 +101,11 @@ describe('McpServer', () => { mockTool, ]); - // Verify that transport's handlePostMessage was called - expect(mockTransport.handlePostMessage).toHaveBeenCalledWith( + // Verify that transport's handleRequest was called + expect(mockTransport.handleRequest).toHaveBeenCalledWith( mockRequest, mockResponse, - expect.any(String), + expect.any(Object), ); // Verify that we check if it was a tool call @@ -114,7 +119,7 @@ describe('McpServer', () => { const firstId = 123; const secondId = 456; - mockTransport.handlePostMessage.mockImplementation(async () => { + mockTransport.handleRequest.mockImplementation(async () => { const requestKey = mockRequest.rawBody?.toString().includes(`"id":${firstId}`) ? `${sessionId}_${firstId}` : `${sessionId}_${secondId}`; @@ -140,10 +145,10 @@ describe('McpServer', () => { mockTool, ]); expect(firstResult).toBe(true); - expect(mockTransport.handlePostMessage).toHaveBeenCalledWith( + expect(mockTransport.handleRequest).toHaveBeenCalledWith( mockRequest, mockResponse, - expect.any(String), + expect.any(Object), ); // Second tool call with different id @@ -162,8 +167,8 @@ describe('McpServer', () => { ]); expect(secondResult).toBe(true); - // Verify transport's handlePostMessage was called twice - expect(mockTransport.handlePostMessage).toHaveBeenCalledTimes(2); + // Verify transport's handleRequest was called twice + expect(mockTransport.handleRequest).toHaveBeenCalledTimes(2); // Verify flush was called for both requests expect(mockResponse.flush).toHaveBeenCalledTimes(2); @@ -192,4 +197,252 @@ describe('McpServer', () => { expect(mockResponse.send).toHaveBeenCalledWith(expect.stringContaining('No transport found')); }); }); + + describe('createServerWithStreamableHTTPTransport', () => { + it('should set up a transport and server with StreamableHTTPServerTransport', async () => { + const mockStreamableRequest = mock({ + headers: { 'mcp-session-id': sessionId }, + path: '/mcp', + body: {}, + }); + + mockStreamableTransport.handleRequest.mockResolvedValue(undefined); + + await mcpServerManager.createServerWithStreamableHTTPTransport( + 'mcpServer', + mockResponse, + mockStreamableRequest, + ); + + // Check that FlushingStreamableHTTPTransport was initialized with correct params + expect(FlushingStreamableHTTPTransport).toHaveBeenCalledWith( + { + sessionIdGenerator: expect.any(Function), + onsessioninitialized: expect.any(Function), + }, + mockResponse, + ); + + // Check that Server was initialized + expect(Server).toHaveBeenCalled(); + + // Check that handleRequest was called + expect(mockStreamableTransport.handleRequest).toHaveBeenCalled(); + }); + + it('should handle session initialization callback', async () => { + const mockStreamableRequest = mock({ + headers: { 'mcp-session-id': sessionId }, + path: '/mcp', + body: {}, + }); + + // Set up the mock to simulate session initialization + mockStreamableTransport.onclose = jest.fn(); + mockStreamableTransport.handleRequest.mockResolvedValue(undefined); + + jest + .mocked(FlushingStreamableHTTPTransport) + .mockImplementationOnce((options: StreamableHTTPServerTransportOptions) => { + // Simulate session initialization asynchronously using queueMicrotask instead of setTimeout + queueMicrotask(() => { + if (options.onsessioninitialized) { + options.onsessioninitialized(sessionId); + } + }); + return mockStreamableTransport; + }); + + await mcpServerManager.createServerWithStreamableHTTPTransport( + 'mcpServer', + mockResponse, + mockStreamableRequest, + ); + + // Wait for microtask to complete + await Promise.resolve(); + + // Check that transport and server are stored after session init + expect(mcpServerManager.transports[sessionId]).toBeDefined(); + expect(mcpServerManager.servers[sessionId]).toBeDefined(); + }); + + it('should handle transport close callback for StreamableHTTPServerTransport', async () => { + const mockStreamableRequest = mock({ + headers: { 'mcp-session-id': sessionId }, + path: '/mcp', + body: {}, + }); + + let onCloseCallback: (() => void) | undefined; + mockStreamableTransport.handleRequest.mockResolvedValue(undefined); + + jest + .mocked(FlushingStreamableHTTPTransport) + .mockImplementationOnce((options: StreamableHTTPServerTransportOptions) => { + // Simulate session initialization and capture onclose callback asynchronously using queueMicrotask + queueMicrotask(() => { + if (options.onsessioninitialized) { + options.onsessioninitialized(sessionId); + onCloseCallback = mockStreamableTransport.onclose; + } + }); + return mockStreamableTransport; + }); + + await mcpServerManager.createServerWithStreamableHTTPTransport( + 'mcpServer', + mockResponse, + mockStreamableRequest, + ); + + // Wait for microtask to complete + await Promise.resolve(); + + // Simulate transport close + if (onCloseCallback) { + onCloseCallback(); + } + + // Check that resources were cleaned up + expect(mcpServerManager.transports[sessionId]).toBeUndefined(); + expect(mcpServerManager.servers[sessionId]).toBeUndefined(); + }); + }); + + describe('handlePostMessage with StreamableHTTPServerTransport', () => { + it('should handle StreamableHTTPServerTransport with session ID in header', async () => { + const mockStreamableRequest = mock({ + headers: { 'mcp-session-id': sessionId }, + path: '/mcp', + }); + + mockStreamableTransport.handleRequest.mockImplementation(async () => { + // @ts-expect-error private property `resolveFunctions` + mcpServerManager.resolveFunctions[`${sessionId}_123`](); + }); + + // Add the transport directly + mcpServerManager.transports[sessionId] = mockStreamableTransport; + + mockStreamableRequest.rawBody = Buffer.from( + JSON.stringify({ + jsonrpc: '2.0', + method: 'tools/call', + id: 123, + params: { name: 'mockTool' }, + }), + ); + + // Call the method + const result = await mcpServerManager.handlePostMessage(mockStreamableRequest, mockResponse, [ + mockTool, + ]); + + // Verify that transport's handleRequest was called + expect(mockStreamableTransport.handleRequest).toHaveBeenCalledWith( + mockStreamableRequest, + mockResponse, + expect.any(Object), + ); + + // Verify that we check if it was a tool call + expect(result).toBe(true); + + // Verify flush was called + expect(mockResponse.flush).toHaveBeenCalled(); + }); + + it('should return 401 when StreamableHTTPServerTransport does not exist', async () => { + const testRequest = mock({ + headers: { 'mcp-session-id': 'non-existent-session' }, + path: '/mcp', + }); + testRequest.rawBody = Buffer.from( + JSON.stringify({ + jsonrpc: '2.0', + method: 'tools/call', + id: 123, + params: { name: 'mockTool' }, + }), + ); + + // Call without setting up transport for this sessionId + await mcpServerManager.handlePostMessage(testRequest, mockResponse, [mockTool]); + + // Verify error status was set + expect(mockResponse.status).toHaveBeenCalledWith(401); + expect(mockResponse.send).toHaveBeenCalledWith(expect.stringContaining('No transport found')); + }); + }); + + describe('getSessionId', () => { + it('should return session ID from query parameter', () => { + const request = mock(); + request.query = { sessionId: 'test-session-query' }; + request.headers = {}; + + const result = mcpServerManager.getSessionId(request); + + expect(result).toBe('test-session-query'); + }); + + it('should return session ID from header when query is not present', () => { + const request = mock(); + request.query = {}; + request.headers = { 'mcp-session-id': 'test-session-header' }; + + const result = mcpServerManager.getSessionId(request); + + expect(result).toBe('test-session-header'); + }); + + it('should return undefined when neither query parameter nor header is present', () => { + const request = mock(); + request.query = {}; + request.headers = {}; + + const result = mcpServerManager.getSessionId(request); + + expect(result).toBeUndefined(); + }); + }); + + describe('getTransport', () => { + const testSessionId = 'test-session-transport'; + + beforeEach(() => { + // Clear transports before each test + mcpServerManager.transports = {}; + }); + + it('should return transport when it exists for the session', () => { + const mockTransportInstance = mock(); + mcpServerManager.transports[testSessionId] = mockTransportInstance; + + const result = mcpServerManager.getTransport(testSessionId); + + expect(result).toBe(mockTransportInstance); + }); + + it('should return undefined when transport does not exist for the session', () => { + const result = mcpServerManager.getTransport('non-existent-session'); + + expect(result).toBeUndefined(); + }); + + it('should return correct transport when multiple transports exist', () => { + const mockTransport1 = mock(); + const mockTransport2 = mock(); + + mcpServerManager.transports['session-1'] = mockTransport1; + mcpServerManager.transports['session-2'] = mockTransport2; + + const result1 = mcpServerManager.getTransport('session-1'); + const result2 = mcpServerManager.getTransport('session-2'); + + expect(result1).toBe(mockTransport1); + expect(result2).toBe(mockTransport2); + }); + }); }); diff --git a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts index 3cfd39180f..dd690fa46f 100644 --- a/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts +++ b/packages/@n8n/nodes-langchain/nodes/mcp/McpTrigger/__test__/McpTrigger.node.test.ts @@ -1,4 +1,3 @@ -import { jest } from '@jest/globals'; import type { Tool } from '@langchain/core/tools'; import type { Request, Response } from 'express'; import { mock } from 'jest-mock-extended'; @@ -6,6 +5,7 @@ import type { INode, IWebhookFunctions } from 'n8n-workflow'; import * as helpers from '@utils/helpers'; +import type { FlushingSSEServerTransport } from '../FlushingTransport'; import type { McpServerManager } from '../McpServer'; import { McpTrigger } from '../McpTrigger.node'; @@ -22,7 +22,7 @@ jest.mock('../McpServer', () => ({ describe('McpTrigger Node', () => { const sessionId = 'mock-session-id'; const mockContext = mock(); - const mockRequest = mock({ query: { sessionId }, path: '/custom-path/sse' }); + const mockRequest = mock({ query: { sessionId }, path: '/custom-path' }); const mockResponse = mock(); let mcpTrigger: McpTrigger; @@ -34,8 +34,9 @@ describe('McpTrigger Node', () => { mockContext.getResponseObject.mockReturnValue(mockResponse); mockContext.getNode.mockReturnValue({ name: 'McpTrigger', - typeVersion: 1.1, + typeVersion: 2, } as INode); + mockServerManager.transports = {}; }); describe('webhook method', () => { @@ -47,9 +48,9 @@ describe('McpTrigger Node', () => { const result = await mcpTrigger.webhook(mockContext); // Verify that the connectTransport method was called with correct URL - expect(mockServerManager.createServerAndTransport).toHaveBeenCalledWith( + expect(mockServerManager.createServerWithSSETransport).toHaveBeenCalledWith( 'McpTrigger', - '/custom-path/messages', + '/custom-path', mockResponse, ); @@ -61,6 +62,10 @@ describe('McpTrigger Node', () => { // Configure the context for default webhook (tool execution) mockContext.getWebhookName.mockReturnValue('default'); + // Mock the session ID retrieval and transport existence + mockServerManager.getSessionId.mockReturnValue(sessionId); + mockServerManager.getTransport.mockReturnValue(mock({})); + // Mock that the server executes a tool and returns true mockServerManager.handlePostMessage.mockResolvedValueOnce(true); @@ -83,6 +88,10 @@ describe('McpTrigger Node', () => { // Configure the context for default webhook mockContext.getWebhookName.mockReturnValue('default'); + // Mock the session ID retrieval and transport existence + mockServerManager.getSessionId.mockReturnValue(sessionId); + mockServerManager.getTransport.mockReturnValue(mock({})); + // Mock that the server doesn't execute a tool and returns false mockServerManager.handlePostMessage.mockResolvedValueOnce(false); @@ -100,14 +109,13 @@ describe('McpTrigger Node', () => { typeVersion: 1.1, } as INode); mockContext.getWebhookName.mockReturnValue('setup'); - // Call the webhook method await mcpTrigger.webhook(mockContext); // Verify that connectTransport was called with the sanitized server name - expect(mockServerManager.createServerAndTransport).toHaveBeenCalledWith( + expect(mockServerManager.createServerWithSSETransport).toHaveBeenCalledWith( 'My_custom_MCP_server_', - '/custom-path/messages', + '/custom-path', mockResponse, ); }); @@ -123,9 +131,9 @@ describe('McpTrigger Node', () => { await mcpTrigger.webhook(mockContext); // Verify that connectTransport was called with the default server name - expect(mockServerManager.createServerAndTransport).toHaveBeenCalledWith( + expect(mockServerManager.createServerWithSSETransport).toHaveBeenCalledWith( 'n8n-mcp-server', - '/custom-path/messages', + '/custom-path', mockResponse, ); }); diff --git a/packages/@n8n/nodes-langchain/package.json b/packages/@n8n/nodes-langchain/package.json index fdec8fc6b4..a3668e1b8e 100644 --- a/packages/@n8n/nodes-langchain/package.json +++ b/packages/@n8n/nodes-langchain/package.json @@ -175,7 +175,7 @@ "@langchain/qdrant": "0.1.2", "@langchain/redis": "0.1.0", "@langchain/textsplitters": "0.1.0", - "@modelcontextprotocol/sdk": "1.11.0", + "@modelcontextprotocol/sdk": "1.12.0", "@mozilla/readability": "0.6.0", "@n8n/client-oauth2": "workspace:*", "@n8n/json-schema-to-zod": "workspace:*", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 11ad45a523..e946787808 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -832,8 +832,8 @@ importers: specifier: 0.1.0 version: 0.1.0(@langchain/core@0.3.48(openai@4.78.1(encoding@0.1.13)(zod@3.24.1))) '@modelcontextprotocol/sdk': - specifier: 1.11.0 - version: 1.11.0 + specifier: 1.12.0 + version: 1.12.0 '@mozilla/readability': specifier: 0.6.0 version: 0.6.0 @@ -5066,8 +5066,8 @@ packages: peerDependencies: zod: '>= 3' - '@modelcontextprotocol/sdk@1.11.0': - resolution: {integrity: sha512-k/1pb70eD638anoi0e8wUGAlbMJXyvdV4p62Ko+EZ7eBe1xMx8Uhak1R5DgfoofsK5IBBnRwsYGTaLZl+6/+RQ==} + '@modelcontextprotocol/sdk@1.12.0': + resolution: {integrity: sha512-m//7RlINx1F3sz3KqwY1WWzVgTcYX52HYk4bJ1hkBXV3zccAEth+jRvG8DBRrdaQuRsPAJOx2MH3zaHNCKL7Zg==} engines: {node: '>=18'} '@mongodb-js/saslprep@1.1.9': @@ -17706,8 +17706,9 @@ snapshots: dependencies: zod: 3.24.1 - '@modelcontextprotocol/sdk@1.11.0': + '@modelcontextprotocol/sdk@1.12.0': dependencies: + ajv: 6.12.6 content-type: 1.0.5 cors: 2.8.5 cross-spawn: 7.0.6