mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-16 17:46:45 +00:00
196 lines
6.0 KiB
TypeScript
196 lines
6.0 KiB
TypeScript
import type { Tool } from '@langchain/core/tools';
|
|
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
|
|
import type { RequestHandlerExtra } from '@modelcontextprotocol/sdk/shared/protocol.js';
|
|
import type { JSONRPCMessage } from '@modelcontextprotocol/sdk/types.js';
|
|
import {
|
|
JSONRPCMessageSchema,
|
|
ListToolsRequestSchema,
|
|
CallToolRequestSchema,
|
|
} from '@modelcontextprotocol/sdk/types.js';
|
|
import type * as express from 'express';
|
|
import { OperationalError, type Logger } from 'n8n-workflow';
|
|
import { zodToJsonSchema } from 'zod-to-json-schema';
|
|
|
|
import { FlushingSSEServerTransport } from './FlushingSSEServerTransport';
|
|
import type { CompressionResponse } from './FlushingSSEServerTransport';
|
|
|
|
/**
|
|
* Parses the JSONRPC message and checks whether the method used was a tool
|
|
* call. This is necessary in order to not have executions for listing tools
|
|
* and other commands sent by the MCP client
|
|
*/
|
|
function wasToolCall(body: string) {
|
|
try {
|
|
const message: unknown = JSON.parse(body);
|
|
const parsedMessage: JSONRPCMessage = JSONRPCMessageSchema.parse(message);
|
|
return (
|
|
'method' in parsedMessage &&
|
|
'id' in parsedMessage &&
|
|
parsedMessage?.method === CallToolRequestSchema.shape.method.value
|
|
);
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
export class McpServer {
|
|
servers: { [sessionId: string]: Server } = {};
|
|
|
|
transports: { [sessionId: string]: FlushingSSEServerTransport } = {};
|
|
|
|
logger: Logger;
|
|
|
|
private tools: { [sessionId: string]: Tool[] } = {};
|
|
|
|
private resolveFunctions: { [sessionId: string]: CallableFunction } = {};
|
|
|
|
constructor(logger: Logger) {
|
|
this.logger = logger;
|
|
this.logger.debug('MCP Server created');
|
|
}
|
|
|
|
async connectTransport(postUrl: string, resp: CompressionResponse): Promise<void> {
|
|
const transport = new FlushingSSEServerTransport(postUrl, resp);
|
|
const server = this.setUpServer();
|
|
const { sessionId } = transport;
|
|
this.transports[sessionId] = transport;
|
|
this.servers[sessionId] = server;
|
|
|
|
resp.on('close', async () => {
|
|
this.logger.debug(`Deleting transport for ${sessionId}`);
|
|
delete this.tools[sessionId];
|
|
delete this.resolveFunctions[sessionId];
|
|
delete this.transports[sessionId];
|
|
delete this.servers[sessionId];
|
|
});
|
|
|
|
await server.connect(transport);
|
|
|
|
// Make sure we flush the compression middleware, so that it's not waiting for more content to be added to the buffer
|
|
if (resp.flush) {
|
|
resp.flush();
|
|
}
|
|
}
|
|
|
|
async handlePostMessage(req: express.Request, resp: CompressionResponse, connectedTools: Tool[]) {
|
|
const sessionId = req.query.sessionId as string;
|
|
const transport = this.transports[sessionId];
|
|
this.tools[sessionId] = connectedTools;
|
|
if (transport) {
|
|
// We need to add a promise here because the `handlePostMessage` will send something to the
|
|
// MCP Server, that will run in a different context. This means that the return will happen
|
|
// almost immediately, and will lead to marking the sub-node as "running" in the final execution
|
|
await new Promise(async (resolve) => {
|
|
this.resolveFunctions[sessionId] = resolve;
|
|
await transport.handlePostMessage(req, resp, req.rawBody.toString());
|
|
});
|
|
delete this.resolveFunctions[sessionId];
|
|
} else {
|
|
this.logger.warn(`No transport found for session ${sessionId}`);
|
|
resp.status(401).send('No transport found for sessionId');
|
|
}
|
|
|
|
if (resp.flush) {
|
|
resp.flush();
|
|
}
|
|
|
|
delete this.tools[sessionId]; // Clean up to avoid keeping all tools in memory
|
|
|
|
return wasToolCall(req.rawBody.toString());
|
|
}
|
|
|
|
setUpServer(): Server {
|
|
const server = new Server(
|
|
{
|
|
name: 'n8n-mcp-server',
|
|
version: '0.1.0',
|
|
},
|
|
{
|
|
capabilities: { tools: {} },
|
|
},
|
|
);
|
|
|
|
server.setRequestHandler(ListToolsRequestSchema, async (_, extra: RequestHandlerExtra) => {
|
|
if (!extra.sessionId) {
|
|
throw new OperationalError('Require a sessionId for the listing of tools');
|
|
}
|
|
|
|
return {
|
|
tools: this.tools[extra.sessionId].map((tool) => {
|
|
return {
|
|
name: tool.name,
|
|
description: tool.description,
|
|
inputSchema: zodToJsonSchema(tool.schema),
|
|
};
|
|
}),
|
|
};
|
|
});
|
|
|
|
server.setRequestHandler(CallToolRequestSchema, async (request, extra: RequestHandlerExtra) => {
|
|
if (!request.params?.name || !request.params?.arguments) {
|
|
throw new OperationalError('Require a name and arguments for the tool call');
|
|
}
|
|
if (!extra.sessionId) {
|
|
throw new OperationalError('Require a sessionId for the tool call');
|
|
}
|
|
|
|
const requestedTool: Tool | undefined = this.tools[extra.sessionId].find(
|
|
(tool) => tool.name === request.params.name,
|
|
);
|
|
if (!requestedTool) {
|
|
throw new OperationalError('Tool not found');
|
|
}
|
|
|
|
try {
|
|
const result = await requestedTool.invoke(request.params.arguments);
|
|
|
|
this.resolveFunctions[extra.sessionId]();
|
|
|
|
this.logger.debug(`Got request for ${requestedTool.name}, and executed it.`);
|
|
|
|
// TODO: Refactor this to no longer use the legacy tool result, but
|
|
return { toolResult: result };
|
|
} catch (error) {
|
|
this.logger.error(`Error while executing Tool ${requestedTool.name}: ${error}`);
|
|
return { isError: true, content: [{ type: 'text', text: `Error: ${error.message}` }] };
|
|
}
|
|
});
|
|
|
|
server.onclose = () => {
|
|
this.logger.debug('Closing MCP Server');
|
|
};
|
|
server.onerror = (error: unknown) => {
|
|
this.logger.error(`MCP Error: ${error}`);
|
|
};
|
|
return server;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This singleton is shared across the instance, making sure we only have one server to worry about.
|
|
* It needs to stay in memory to keep track of the long-lived connections.
|
|
* It requires a logger at first creation to set everything up.
|
|
*/
|
|
export class McpServerSingleton {
|
|
static #instance: McpServerSingleton;
|
|
|
|
private _serverData: McpServer;
|
|
|
|
private constructor(logger: Logger) {
|
|
this._serverData = new McpServer(logger);
|
|
}
|
|
|
|
static instance(logger: Logger): McpServer {
|
|
if (!McpServerSingleton.#instance) {
|
|
McpServerSingleton.#instance = new McpServerSingleton(logger);
|
|
logger.debug('Created singleton for MCP Servers');
|
|
}
|
|
|
|
return McpServerSingleton.#instance.serverData;
|
|
}
|
|
|
|
get serverData() {
|
|
return this._serverData;
|
|
}
|
|
}
|