feat(MCP Client Tool Node): Add MCP Client Tool Node to connect to MCP servers over SSE (#14464)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
Co-authored-by: JP van Oosten <jp@n8n.io>
This commit is contained in:
Elias Meire
2025-04-09 17:31:53 +02:00
committed by GitHub
parent b52f9f0f6c
commit 34252f53f9
24 changed files with 926 additions and 35 deletions

View File

@@ -0,0 +1,26 @@
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import type { Response } from 'express';
export type CompressionResponse = Response & {
/**
* `flush()` is defined in the compression middleware.
* This is necessary because the compression middleware sometimes waits
* for a certain amount of data before sending the data to the client
*/
flush: () => void;
};
export class FlushingSSEServerTransport extends SSEServerTransport {
constructor(
_endpoint: string,
private response: CompressionResponse,
) {
super(_endpoint, response);
}
async send(message: JSONRPCMessage): Promise<void> {
await super.send(message);
this.response.flush();
}
}

View File

@@ -0,0 +1,200 @@
import type { Tool } from '@langchain/core/tools';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import type { RequestHandlerExtra } from '@modelcontextprotocol/sdk/shared/protocol.js';
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import {
JSONRPCMessageSchema,
ListToolsRequestSchema,
CallToolRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import type * as express from 'express';
import { OperationalError, type Logger } from 'n8n-workflow';
import { zodToJsonSchema } from 'zod-to-json-schema';
import { FlushingSSEServerTransport } from './FlushingSSEServerTransport';
import type { CompressionResponse } from './FlushingSSEServerTransport';
/**
* Parses the JSONRPC message and checks whether the method used was a tool
* call. This is necessary in order to not have executions for listing tools
* and other commands sent by the MCP client
*/
function wasToolCall(body: string) {
try {
const message: unknown = JSON.parse(body);
const parsedMessage: JSONRPCMessage = JSONRPCMessageSchema.parse(message);
return (
'method' in parsedMessage &&
'id' in parsedMessage &&
parsedMessage?.method === CallToolRequestSchema.shape.method.value
);
} catch {
return false;
}
}
export class McpServer {
servers: { [sessionId: string]: Server } = {};
transports: { [sessionId: string]: FlushingSSEServerTransport } = {};
logger: Logger;
private tools: { [sessionId: string]: Tool[] } = {};
private resolveFunctions: { [sessionId: string]: CallableFunction } = {};
constructor(logger: Logger) {
this.logger = logger;
this.logger.debug('MCP Server created');
}
async connectTransport(postUrl: string, resp: CompressionResponse): Promise<void> {
const transport = new FlushingSSEServerTransport(postUrl, resp);
const server = this.setUpServer();
const { sessionId } = transport;
this.transports[sessionId] = transport;
this.servers[sessionId] = server;
resp.on('close', async () => {
this.logger.debug(`Deleting transport for ${sessionId}`);
delete this.tools[sessionId];
delete this.resolveFunctions[sessionId];
delete this.transports[sessionId];
delete this.servers[sessionId];
});
await server.connect(transport);
// Make sure we flush the compression middleware, so that it's not waiting for more content to be added to the buffer
if (resp.flush) {
resp.flush();
}
}
async handlePostMessage(req: express.Request, resp: CompressionResponse, connectedTools: Tool[]) {
const sessionId = req.query.sessionId as string;
const transport = this.transports[sessionId];
this.tools[sessionId] = connectedTools;
if (transport) {
// We need to add a promise here because the `handlePostMessage` will send something to the
// MCP Server, that will run in a different context. This means that the return will happen
// almost immediately, and will lead to marking the sub-node as "running" in the final execution
await new Promise(async (resolve) => {
this.resolveFunctions[sessionId] = resolve;
await transport.handlePostMessage(req, resp, req.rawBody.toString());
});
delete this.resolveFunctions[sessionId];
} else {
this.logger.warn(`No transport found for session ${sessionId}`);
resp.status(401).send('No transport found for sessionId');
}
if (resp.flush) {
resp.flush();
}
delete this.tools[sessionId]; // Clean up to avoid keeping all tools in memory
return wasToolCall(req.rawBody.toString());
}
setUpServer(): Server {
const server = new Server(
{
name: 'n8n-mcp-server',
version: '0.1.0',
},
{
capabilities: { tools: {} },
},
);
server.setRequestHandler(ListToolsRequestSchema, async (_, extra: RequestHandlerExtra) => {
if (!extra.sessionId) {
throw new OperationalError('Require a sessionId for the listing of tools');
}
return {
tools: this.tools[extra.sessionId].map((tool) => {
return {
name: tool.name,
description: tool.description,
inputSchema: zodToJsonSchema(tool.schema),
};
}),
};
});
server.setRequestHandler(CallToolRequestSchema, async (request, extra: RequestHandlerExtra) => {
if (!request.params?.name || !request.params?.arguments) {
throw new OperationalError('Require a name and arguments for the tool call');
}
if (!extra.sessionId) {
throw new OperationalError('Require a sessionId for the tool call');
}
const requestedTool: Tool | undefined = this.tools[extra.sessionId].find(
(tool) => tool.name === request.params.name,
);
if (!requestedTool) {
throw new OperationalError('Tool not found');
}
try {
const result = await requestedTool.invoke(request.params.arguments);
this.resolveFunctions[extra.sessionId]();
this.logger.debug(`Got request for ${requestedTool.name}, and executed it.`);
if (typeof result === 'object') {
return { content: [{ type: 'text', text: JSON.stringify(result) }] };
}
if (typeof result === 'string') {
return { content: [{ type: 'text', text: result }] };
}
return { content: [{ type: 'text', text: String(result) }] };
} catch (error) {
this.logger.error(`Error while executing Tool ${requestedTool.name}: ${error}`);
return { isError: true, content: [{ type: 'text', text: `Error: ${error.message}` }] };
}
});
server.onclose = () => {
this.logger.debug('Closing MCP Server');
};
server.onerror = (error: unknown) => {
this.logger.error(`MCP Error: ${error}`);
};
return server;
}
}
/**
* This singleton is shared across the instance, making sure we only have one server to worry about.
* It needs to stay in memory to keep track of the long-lived connections.
* It requires a logger at first creation to set everything up.
*/
export class McpServerSingleton {
static #instance: McpServerSingleton;
private _serverData: McpServer;
private constructor(logger: Logger) {
this._serverData = new McpServer(logger);
}
static instance(logger: Logger): McpServer {
if (!McpServerSingleton.#instance) {
McpServerSingleton.#instance = new McpServerSingleton(logger);
logger.debug('Created singleton for MCP Servers');
}
return McpServerSingleton.#instance.serverData;
}
get serverData() {
return this._serverData;
}
}

View File

@@ -0,0 +1,173 @@
import { WebhookAuthorizationError } from 'n8n-nodes-base/dist/nodes/Webhook/error';
import { validateWebhookAuthentication } from 'n8n-nodes-base/dist/nodes/Webhook/utils';
import type { INodeTypeDescription, IWebhookFunctions, IWebhookResponseData } from 'n8n-workflow';
import { NodeConnectionTypes, Node } from 'n8n-workflow';
import { getConnectedTools } from '@utils/helpers';
import type { CompressionResponse } from './FlushingSSEServerTransport';
import { McpServerSingleton } from './McpServer';
import type { McpServer } from './McpServer';
const MCP_SSE_SETUP_PATH = 'sse';
const MCP_SSE_MESSAGES_PATH = 'messages';
export class McpTrigger extends Node {
description: INodeTypeDescription = {
displayName: 'MCP Server Trigger',
name: 'mcpTrigger',
icon: {
light: 'file:../mcp.svg',
dark: 'file:../mcp.dark.svg',
},
group: ['trigger'],
version: 1,
description: 'Expose n8n tools as an MCP Server endpoint',
activationMessage: 'You can now connect your MCP Clients to the SSE URL.',
defaults: {
name: 'MCP Server Trigger',
},
codex: {
categories: ['AI', 'Core Nodes'],
subcategories: {
AI: ['Root Nodes', 'Model Context Protocol'],
'Core Nodes': ['Other Trigger Nodes'],
},
alias: ['Model Context Protocol', 'MCP Server'],
resources: {
primaryDocumentation: [
{
url: 'https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-langchain.mcptrigger/',
},
],
},
},
triggerPanel: {
header: 'Listen for MCP events',
executionsHelp: {
inactive:
"This trigger has two modes: test and production.<br /><br /><b>Use test mode while you build your workflow</b>. Click the 'test step' button, then make an MCP request to the test URL. The executions will show up in the editor.<br /><br /><b>Use production mode to run your workflow automatically</b>. <a data-key='activate'>Activate</a> the workflow, then make requests to the production URL. These executions will show up in the <a data-key='executions'>executions list</a>, but not the editor.",
active:
"This trigger has two modes: test and production.<br /><br /><b>Use test mode while you build your workflow</b>. Click the 'test step' button, then make an MCP request to the test URL. The executions will show up in the editor.<br /><br /><b>Use production mode to run your workflow automatically</b>. Since your workflow is activated, you can make requests to the production URL. These executions will show up in the <a data-key='executions'>executions list</a>, but not the editor.",
},
activationHint:
'Once youve finished building your workflow, run it without having to click this button by using the production URL.',
},
inputs: [
{
type: NodeConnectionTypes.AiTool,
displayName: 'Tools',
},
],
outputs: [],
credentials: [
{
// eslint-disable-next-line n8n-nodes-base/node-class-description-credentials-name-unsuffixed
name: 'httpBearerAuth',
required: true,
displayOptions: {
show: {
authentication: ['bearerAuth'],
},
},
},
{
name: 'httpHeaderAuth',
required: true,
displayOptions: {
show: {
authentication: ['headerAuth'],
},
},
},
],
properties: [
{
displayName: 'Authentication',
name: 'authentication',
type: 'options',
options: [
{ name: 'None', value: 'none' },
{ name: 'Bearer Auth', value: 'bearerAuth' },
{ name: 'Header Auth', value: 'headerAuth' },
],
default: 'none',
description: 'The way to authenticate',
},
{
displayName: 'Path',
name: 'path',
type: 'string',
default: '',
placeholder: 'webhook',
required: true,
description: 'The base path for this MCP server',
},
],
webhooks: [
{
name: 'setup',
httpMethod: 'GET',
responseMode: 'onReceived',
isFullPath: true,
path: `={{$parameter["path"]}}/${MCP_SSE_SETUP_PATH}`,
nodeType: 'mcp',
ndvHideMethod: true,
ndvHideUrl: false,
},
{
name: 'default',
httpMethod: 'POST',
responseMode: 'onReceived',
isFullPath: true,
path: `={{$parameter["path"]}}/${MCP_SSE_MESSAGES_PATH}`,
nodeType: 'mcp',
ndvHideMethod: true,
ndvHideUrl: true,
},
],
};
async webhook(context: IWebhookFunctions): Promise<IWebhookResponseData> {
const webhookName = context.getWebhookName();
const req = context.getRequestObject();
const resp = context.getResponseObject() as unknown as CompressionResponse;
try {
await validateWebhookAuthentication(context, 'authentication');
} catch (error) {
if (error instanceof WebhookAuthorizationError) {
resp.writeHead(error.responseCode);
resp.end(error.message);
return { noWebhookResponse: true };
}
throw error;
}
const mcpServer: McpServer = McpServerSingleton.instance(context.logger);
if (webhookName === 'setup') {
// Sets up the transport and opens the long-lived connection. This resp
// will stay streaming, and is the channel that sends the events
const postUrl = req.path.replace(
new RegExp(`/${MCP_SSE_SETUP_PATH}$`),
`/${MCP_SSE_MESSAGES_PATH}`,
);
await mcpServer.connectTransport(postUrl, resp);
return { noWebhookResponse: true };
} else if (webhookName === 'default') {
// This is the command-channel, and is actually executing the tools. This
// sends the response back through the long-lived connection setup in the
// 'setup' call
const connectedTools = await getConnectedTools(context, true);
const wasToolCall = await mcpServer.handlePostMessage(req, resp, connectedTools);
if (wasToolCall) return { noWebhookResponse: true, workflowData: [[{ json: {} }]] };
return { noWebhookResponse: true };
}
return { workflowData: [[{ json: {} }]] };
}
}

View File

@@ -0,0 +1,45 @@
import { jest } from '@jest/globals';
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
import { mock } from 'jest-mock-extended';
import { FlushingSSEServerTransport } from '../FlushingSSEServerTransport';
import type { CompressionResponse } from '../FlushingSSEServerTransport';
describe('FlushingSSEServerTransport', () => {
const mockResponse = mock<CompressionResponse>();
let transport: FlushingSSEServerTransport;
const endpoint = '/test/endpoint';
beforeEach(() => {
jest.resetAllMocks();
mockResponse.status.mockReturnThis();
transport = new FlushingSSEServerTransport(endpoint, mockResponse);
});
it('should call flush after sending a message', async () => {
// Create a sample JSONRPC message
const message: JSONRPCMessage = {
jsonrpc: '2.0',
id: '123',
result: { success: true },
};
// Send a message through the transport
await transport.start();
await transport.send(message);
expect(mockResponse.writeHead).toHaveBeenCalledWith(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
});
expect(mockResponse.write).toHaveBeenCalledWith(
// @ts-expect-error `_sessionId` is private
`event: endpoint\ndata: /test/endpoint?sessionId=${transport._sessionId}\n\n`,
);
expect(mockResponse.write).toHaveBeenCalledWith(
`event: message\ndata: ${JSON.stringify(message)}\n\n`,
);
expect(mockResponse.flush).toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,122 @@
import { jest } from '@jest/globals';
import type { Tool } from '@langchain/core/tools';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import type { Request } from 'express';
import { captor, mock } from 'jest-mock-extended';
import type { CompressionResponse } from '../FlushingSSEServerTransport';
import { FlushingSSEServerTransport } from '../FlushingSSEServerTransport';
import { McpServer } from '../McpServer';
const sessionId = 'mock-session-id';
const mockServer = mock<Server>();
jest.mock('@modelcontextprotocol/sdk/server/index.js', () => {
return {
Server: jest.fn().mockImplementation(() => mockServer),
};
});
const mockTransport = mock<FlushingSSEServerTransport>({ sessionId });
jest.mock('../FlushingSSEServerTransport', () => {
return {
FlushingSSEServerTransport: jest.fn().mockImplementation(() => mockTransport),
};
});
describe('McpServer', () => {
const mockRequest = mock<Request>({ query: { sessionId }, path: '/sse' });
const mockResponse = mock<CompressionResponse>();
const mockTool = mock<Tool>({ name: 'mockTool' });
let mcpServer: McpServer;
beforeEach(() => {
jest.clearAllMocks();
mockResponse.status.mockReturnThis();
mcpServer = new McpServer(mock());
});
describe('connectTransport', () => {
const postUrl = '/post-url';
it('should set up a transport and server', async () => {
await mcpServer.connectTransport(postUrl, mockResponse);
// Check that FlushingSSEServerTransport was initialized with correct params
expect(FlushingSSEServerTransport).toHaveBeenCalledWith(postUrl, mockResponse);
// Check that Server was initialized
expect(Server).toHaveBeenCalled();
// Check that transport and server are stored
expect(mcpServer.transports[sessionId]).toBeDefined();
expect(mcpServer.servers[sessionId]).toBeDefined();
// Check that connect was called on the server
expect(mcpServer.servers[sessionId].connect).toHaveBeenCalled();
// Check that flush was called if available
expect(mockResponse.flush).toHaveBeenCalled();
});
it('should set up close handler that cleans up resources', async () => {
await mcpServer.connectTransport(postUrl, mockResponse);
// Get the close callback and execute it
const closeCallbackCaptor = captor<() => Promise<void>>();
expect(mockResponse.on).toHaveBeenCalledWith('close', closeCallbackCaptor);
await closeCallbackCaptor.value();
// Check that resources were cleaned up
expect(mcpServer.transports[sessionId]).toBeUndefined();
expect(mcpServer.servers[sessionId]).toBeUndefined();
});
});
describe('handlePostMessage', () => {
it('should call transport.handlePostMessage when transport exists', async () => {
mockTransport.handlePostMessage.mockImplementation(async () => {
// @ts-expect-error private property `resolveFunctions`
mcpServer.resolveFunctions[sessionId]();
});
// Add the transport directly
mcpServer.transports[sessionId] = mockTransport;
mockRequest.rawBody = Buffer.from(
JSON.stringify({
jsonrpc: '2.0',
method: 'tools/call',
id: 123,
params: { name: 'mockTool' },
}),
);
// Call the method
const result = await mcpServer.handlePostMessage(mockRequest, mockResponse, [mockTool]);
// Verify that transport's handlePostMessage was called
expect(mockTransport.handlePostMessage).toHaveBeenCalledWith(
mockRequest,
mockResponse,
expect.any(String),
);
// Verify that we check if it was a tool call
expect(result).toBe(true);
// Verify flush was called
expect(mockResponse.flush).toHaveBeenCalled();
});
it('should return 401 when transport does not exist', async () => {
// Call without setting up transport
await mcpServer.handlePostMessage(mockRequest, mockResponse, [mockTool]);
// Verify error status was set
expect(mockResponse.status).toHaveBeenCalledWith(401);
expect(mockResponse.send).toHaveBeenCalledWith(expect.stringContaining('No transport found'));
});
});
});

View File

@@ -0,0 +1,92 @@
import { jest } from '@jest/globals';
import type { Tool } from '@langchain/core/tools';
import type { Request, Response } from 'express';
import { mock } from 'jest-mock-extended';
import type { IWebhookFunctions } from 'n8n-workflow';
import type { McpServer } from '../McpServer';
import { McpTrigger } from '../McpTrigger.node';
const mockTool = mock<Tool>({ name: 'mockTool' });
jest.mock('@utils/helpers', () => ({
getConnectedTools: jest.fn().mockImplementation(() => [mockTool]),
}));
const mockServer = mock<McpServer>();
jest.mock('../McpServer', () => ({
McpServerSingleton: {
instance: jest.fn().mockImplementation(() => mockServer),
},
}));
describe('McpTrigger Node', () => {
const sessionId = 'mock-session-id';
const mockContext = mock<IWebhookFunctions>();
const mockRequest = mock<Request>({ query: { sessionId }, path: '/custom-path/sse' });
const mockResponse = mock<Response>();
let mcpTrigger: McpTrigger;
beforeEach(() => {
jest.clearAllMocks();
mcpTrigger = new McpTrigger();
mockContext.getRequestObject.mockReturnValue(mockRequest);
mockContext.getResponseObject.mockReturnValue(mockResponse);
});
describe('webhook method', () => {
it('should handle setup webhook', async () => {
// Configure the context for setup webhook
mockContext.getWebhookName.mockReturnValue('setup');
// Call the webhook method
const result = await mcpTrigger.webhook(mockContext);
// Verify that the connectTransport method was called with correct URL
expect(mockServer.connectTransport).toHaveBeenCalledWith(
'/custom-path/messages',
mockResponse,
);
// Verify the returned result has noWebhookResponse: true
expect(result).toEqual({ noWebhookResponse: true });
});
it('should handle default webhook for tool execution', async () => {
// Configure the context for default webhook (tool execution)
mockContext.getWebhookName.mockReturnValue('default');
// Mock that the server executes a tool and returns true
mockServer.handlePostMessage.mockResolvedValueOnce(true);
// Call the webhook method
const result = await mcpTrigger.webhook(mockContext);
// Verify that handlePostMessage was called with request, response and tools
expect(mockServer.handlePostMessage).toHaveBeenCalledWith(mockRequest, mockResponse, [
mockTool,
]);
// Verify the returned result when a tool was called
expect(result).toEqual({
noWebhookResponse: true,
workflowData: [[{ json: {} }]],
});
});
it('should handle default webhook when no tool was executed', async () => {
// Configure the context for default webhook
mockContext.getWebhookName.mockReturnValue('default');
// Mock that the server doesn't execute a tool and returns false
mockServer.handlePostMessage.mockResolvedValueOnce(false);
// Call the webhook method
const result = await mcpTrigger.webhook(mockContext);
// Verify the returned result when no tool was called
expect(result).toEqual({ noWebhookResponse: true });
});
});
});