mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-18 02:21:13 +00:00
feat(MCP Client Tool Node): Add support for HTTP Streamable Transport (#15454)
This commit is contained in:
@@ -11,7 +11,7 @@ import { logWrapper } from '@utils/logWrapper';
|
|||||||
import { getConnectionHintNoticeField } from '@utils/sharedFields';
|
import { getConnectionHintNoticeField } from '@utils/sharedFields';
|
||||||
|
|
||||||
import { getTools } from './loadOptions';
|
import { getTools } from './loadOptions';
|
||||||
import type { McpAuthenticationOption, McpToolIncludeMode } from './types';
|
import type { McpServerTransport, McpAuthenticationOption, McpToolIncludeMode } from './types';
|
||||||
import {
|
import {
|
||||||
connectMcpClient,
|
connectMcpClient,
|
||||||
createCallTool,
|
createCallTool,
|
||||||
@@ -31,7 +31,7 @@ export class McpClientTool implements INodeType {
|
|||||||
dark: 'file:../mcp.dark.svg',
|
dark: 'file:../mcp.dark.svg',
|
||||||
},
|
},
|
||||||
group: ['output'],
|
group: ['output'],
|
||||||
version: 1,
|
version: [1, 1.1],
|
||||||
description: 'Connect tools from an MCP Server',
|
description: 'Connect tools from an MCP Server',
|
||||||
defaults: {
|
defaults: {
|
||||||
name: 'MCP Client',
|
name: 'MCP Client',
|
||||||
@@ -83,6 +83,47 @@ export class McpClientTool implements INodeType {
|
|||||||
placeholder: 'e.g. https://my-mcp-server.ai/sse',
|
placeholder: 'e.g. https://my-mcp-server.ai/sse',
|
||||||
default: '',
|
default: '',
|
||||||
required: true,
|
required: true,
|
||||||
|
displayOptions: {
|
||||||
|
show: {
|
||||||
|
'@version': [1],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
displayName: 'Endpoint',
|
||||||
|
name: 'endpointUrl',
|
||||||
|
type: 'string',
|
||||||
|
description: 'Endpoint of your MCP server',
|
||||||
|
placeholder: 'e.g. https://my-mcp-server.ai/mcp',
|
||||||
|
default: '',
|
||||||
|
required: true,
|
||||||
|
displayOptions: {
|
||||||
|
show: {
|
||||||
|
'@version': [{ _cnd: { gte: 1.1 } }],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
displayName: 'Server Transport',
|
||||||
|
name: 'serverTransport',
|
||||||
|
type: 'options',
|
||||||
|
options: [
|
||||||
|
{
|
||||||
|
name: 'Server Sent Events (Deprecated)',
|
||||||
|
value: 'sse',
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'HTTP Streamable',
|
||||||
|
value: 'httpStreamable',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
default: 'sse',
|
||||||
|
description: 'The transport used by your endpoint',
|
||||||
|
displayOptions: {
|
||||||
|
show: {
|
||||||
|
'@version': [{ _cnd: { gte: 1.1 } }],
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
displayName: 'Authentication',
|
displayName: 'Authentication',
|
||||||
@@ -103,7 +144,7 @@ export class McpClientTool implements INodeType {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
default: 'none',
|
default: 'none',
|
||||||
description: 'The way to authenticate with your SSE endpoint',
|
description: 'The way to authenticate with your endpoint',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
displayName: 'Credentials',
|
displayName: 'Credentials',
|
||||||
@@ -187,11 +228,22 @@ export class McpClientTool implements INodeType {
|
|||||||
'authentication',
|
'authentication',
|
||||||
itemIndex,
|
itemIndex,
|
||||||
) as McpAuthenticationOption;
|
) as McpAuthenticationOption;
|
||||||
const sseEndpoint = this.getNodeParameter('sseEndpoint', itemIndex) as string;
|
|
||||||
const node = this.getNode();
|
const node = this.getNode();
|
||||||
|
|
||||||
|
let serverTransport: McpServerTransport;
|
||||||
|
let endpointUrl: string;
|
||||||
|
if (node.typeVersion === 1) {
|
||||||
|
serverTransport = 'sse';
|
||||||
|
endpointUrl = this.getNodeParameter('sseEndpoint', itemIndex) as string;
|
||||||
|
} else {
|
||||||
|
serverTransport = this.getNodeParameter('serverTransport', itemIndex) as McpServerTransport;
|
||||||
|
endpointUrl = this.getNodeParameter('endpointUrl', itemIndex) as string;
|
||||||
|
}
|
||||||
|
|
||||||
const { headers } = await getAuthHeaders(this, authentication);
|
const { headers } = await getAuthHeaders(this, authentication);
|
||||||
const client = await connectMcpClient({
|
const client = await connectMcpClient({
|
||||||
sseEndpoint,
|
serverTransport,
|
||||||
|
endpointUrl,
|
||||||
headers,
|
headers,
|
||||||
name: node.type,
|
name: node.type,
|
||||||
version: node.typeVersion,
|
version: node.typeVersion,
|
||||||
|
|||||||
@@ -4,16 +4,25 @@ import {
|
|||||||
NodeOperationError,
|
NodeOperationError,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
|
|
||||||
import type { McpAuthenticationOption } from './types';
|
import type { McpAuthenticationOption, McpServerTransport } from './types';
|
||||||
import { connectMcpClient, getAllTools, getAuthHeaders } from './utils';
|
import { connectMcpClient, getAllTools, getAuthHeaders } from './utils';
|
||||||
|
|
||||||
export async function getTools(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
|
export async function getTools(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
|
||||||
const authentication = this.getNodeParameter('authentication') as McpAuthenticationOption;
|
const authentication = this.getNodeParameter('authentication') as McpAuthenticationOption;
|
||||||
const sseEndpoint = this.getNodeParameter('sseEndpoint') as string;
|
|
||||||
const node = this.getNode();
|
const node = this.getNode();
|
||||||
|
let serverTransport: McpServerTransport;
|
||||||
|
let endpointUrl: string;
|
||||||
|
if (node.typeVersion === 1) {
|
||||||
|
serverTransport = 'sse';
|
||||||
|
endpointUrl = this.getNodeParameter('sseEndpoint') as string;
|
||||||
|
} else {
|
||||||
|
serverTransport = this.getNodeParameter('serverTransport') as McpServerTransport;
|
||||||
|
endpointUrl = this.getNodeParameter('endpointUrl') as string;
|
||||||
|
}
|
||||||
const { headers } = await getAuthHeaders(this, authentication);
|
const { headers } = await getAuthHeaders(this, authentication);
|
||||||
const client = await connectMcpClient({
|
const client = await connectMcpClient({
|
||||||
sseEndpoint,
|
serverTransport,
|
||||||
|
endpointUrl,
|
||||||
headers,
|
headers,
|
||||||
name: node.type,
|
name: node.type,
|
||||||
version: node.typeVersion,
|
version: node.typeVersion,
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ import type { JSONSchema7 } from 'json-schema';
|
|||||||
|
|
||||||
export type McpTool = { name: string; description?: string; inputSchema: JSONSchema7 };
|
export type McpTool = { name: string; description?: string; inputSchema: JSONSchema7 };
|
||||||
|
|
||||||
|
export type McpServerTransport = 'sse' | 'httpStreamable';
|
||||||
|
|
||||||
export type McpToolIncludeMode = 'all' | 'selected' | 'except';
|
export type McpToolIncludeMode = 'all' | 'selected' | 'except';
|
||||||
|
|
||||||
export type McpAuthenticationOption = 'none' | 'headerAuth' | 'bearerAuth';
|
export type McpAuthenticationOption = 'none' | 'headerAuth' | 'bearerAuth';
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import { DynamicStructuredTool, type DynamicStructuredToolInput } from '@langchain/core/tools';
|
import { DynamicStructuredTool, type DynamicStructuredToolInput } from '@langchain/core/tools';
|
||||||
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
|
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
|
||||||
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
|
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
|
||||||
|
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
|
||||||
import { CompatibilityCallToolResultSchema } from '@modelcontextprotocol/sdk/types.js';
|
import { CompatibilityCallToolResultSchema } from '@modelcontextprotocol/sdk/types.js';
|
||||||
import { Toolkit } from 'langchain/agents';
|
import { Toolkit } from 'langchain/agents';
|
||||||
import {
|
import {
|
||||||
@@ -14,7 +15,12 @@ import { z } from 'zod';
|
|||||||
|
|
||||||
import { convertJsonSchemaToZod } from '@utils/schemaParsing';
|
import { convertJsonSchemaToZod } from '@utils/schemaParsing';
|
||||||
|
|
||||||
import type { McpAuthenticationOption, McpTool, McpToolIncludeMode } from './types';
|
import type {
|
||||||
|
McpAuthenticationOption,
|
||||||
|
McpTool,
|
||||||
|
McpServerTransport,
|
||||||
|
McpToolIncludeMode,
|
||||||
|
} from './types';
|
||||||
|
|
||||||
export async function getAllTools(client: Client, cursor?: string): Promise<McpTool[]> {
|
export async function getAllTools(client: Client, cursor?: string): Promise<McpTool[]> {
|
||||||
const { tools, nextCursor } = await client.listTools({ cursor });
|
const { tools, nextCursor } = await client.listTools({ cursor });
|
||||||
@@ -145,23 +151,39 @@ type ConnectMcpClientError =
|
|||||||
| { type: 'connection'; error: Error };
|
| { type: 'connection'; error: Error };
|
||||||
export async function connectMcpClient({
|
export async function connectMcpClient({
|
||||||
headers,
|
headers,
|
||||||
sseEndpoint,
|
serverTransport,
|
||||||
|
endpointUrl,
|
||||||
name,
|
name,
|
||||||
version,
|
version,
|
||||||
}: {
|
}: {
|
||||||
sseEndpoint: string;
|
serverTransport: McpServerTransport;
|
||||||
|
endpointUrl: string;
|
||||||
headers?: Record<string, string>;
|
headers?: Record<string, string>;
|
||||||
name: string;
|
name: string;
|
||||||
version: number;
|
version: number;
|
||||||
}): Promise<Result<Client, ConnectMcpClientError>> {
|
}): Promise<Result<Client, ConnectMcpClientError>> {
|
||||||
try {
|
const endpoint = normalizeAndValidateUrl(endpointUrl);
|
||||||
const endpoint = normalizeAndValidateUrl(sseEndpoint);
|
|
||||||
|
|
||||||
if (!endpoint.ok) {
|
if (!endpoint.ok) {
|
||||||
return createResultError({ type: 'invalid_url', error: endpoint.error });
|
return createResultError({ type: 'invalid_url', error: endpoint.error });
|
||||||
|
}
|
||||||
|
|
||||||
|
const client = new Client({ name, version: version.toString() }, { capabilities: { tools: {} } });
|
||||||
|
|
||||||
|
if (serverTransport === 'httpStreamable') {
|
||||||
|
try {
|
||||||
|
const transport = new StreamableHTTPClientTransport(endpoint.result, {
|
||||||
|
requestInit: { headers },
|
||||||
|
});
|
||||||
|
await client.connect(transport);
|
||||||
|
return createResultOk(client);
|
||||||
|
} catch (error) {
|
||||||
|
return createResultError({ type: 'connection', error });
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const transport = new SSEClientTransport(endpoint.result, {
|
try {
|
||||||
|
const sseTransport = new SSEClientTransport(endpoint.result, {
|
||||||
eventSourceInit: {
|
eventSourceInit: {
|
||||||
fetch: async (url, init) =>
|
fetch: async (url, init) =>
|
||||||
await fetch(url, {
|
await fetch(url, {
|
||||||
@@ -174,13 +196,7 @@ export async function connectMcpClient({
|
|||||||
},
|
},
|
||||||
requestInit: { headers },
|
requestInit: { headers },
|
||||||
});
|
});
|
||||||
|
await client.connect(sseTransport);
|
||||||
const client = new Client(
|
|
||||||
{ name, version: version.toString() },
|
|
||||||
{ capabilities: { tools: {} } },
|
|
||||||
);
|
|
||||||
|
|
||||||
await client.connect(transport);
|
|
||||||
return createResultOk(client);
|
return createResultOk(client);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return createResultError({ type: 'connection', error });
|
return createResultError({ type: 'connection', error });
|
||||||
|
|||||||
Reference in New Issue
Block a user