0

MCP协议实战:用标准化接口让AI Agent真正调用外部工具

2026.06.02 | youres | 24次围观

为什么AI Agent需要MCP协议?

先说个真实场景:我想让AI Agent帮我查快递、读文件、自动发邮件,于是写了三个独立脚本分别调用快递API、文件系统和SMTP服务。每个脚本都能工作,但问题来了——每个脚本的调用方式都不一样:快递API要OAuth认证,文件系统是本地路径,邮件要SMTP连接。如果我想换AI模型或者加新工具,几乎要重写一遍。

MCP(Model Context Protocol)就是为了解决这个问题而生的。它定义了一套标准化的工具调用协议——不管你用的是Claude、GPT还是本地模型,不管工具是API、本地程序还是云服务,都用同一套接口规范。

这个协议最近在AI圈火得很快,因为它第一次让"AI Agent调用各种工具"这件事变得真正可复用、可扩展。本文从实战角度,带你从零搭一个基于MCP的工具调用系统。

MCP协议核心概念

MCP的设计哲学就三句话:工具是资源、调用是请求、结果是结构化数据。具体拆解:

  • Host(主机):AI应用本身,比如你的Agent或者助手
  • Client(客户端):Host里的MCP客户端,负责和Server通信
  • Server(服务端):每个外部工具(API、本地服务)对应一个MCP Server
  • Transport(传输层):目前主流是stdio(标准输入输出)和HTTP+SSE两种

整个通信流程是:Host发送请求 → Client路由到对应Server → Server执行工具 → 返回JSON结果 → Client解析 → Host拿到结构化数据

实战:搭建一个本地MCP Server

场景设定

我们搭一个文件管理MCP Server,让AI Agent可以:读取指定路径文件、搜索文件内容、获取文件元信息。之所以选文件管理,是因为这个场景足够通用,代码量适中,改一改就能接其他API。

项目结构

mcp_file_server/
├── src/
│   ├── index.ts          # Server入口
│   ├── tools/
│   │   ├── read_file.ts  # 读取文件
│   │   ├── search_file.ts # 搜索文件
│   │   └── file_info.ts  # 文件信息
│   └── types.ts          # 类型定义
├── package.json
└── tsconfig.json

核心代码实现

// src/types.ts - 类型定义
export interface ToolDefinition {
  name: string;
  description: string;
  inputSchema: {
    type: "object";
    properties: Record<string, any>;
    required?: string[];
  };
}

export interface ToolCallRequest {
  name: string;
  arguments: Record<string, any>;
}

export interface ToolCallResult {
  content: Array<{
    type: "text" | "image";
    text?: string;
    data?: string;
    mimeType?: string;
  }>;
  isError?: boolean;
}
// src/tools/read_file.ts
import * as fs from "fs/promises";
import * as path from "path";
import { ToolDefinition, ToolCallResult } from "../types";

export const read_file_tool: ToolDefinition = {
  name: "read_file",
  description: "读取指定路径的文本文件内容,支持UTF-8编码。返回文件全文或指定行范围。",
  inputSchema: {
    type: "object",
    properties: {
      path: {
        type: "string",
        description: "文件绝对路径"
      },
      startLine: {
        type: "number",
        description: "起始行号(可选,默认从第1行开始)"
      },
      endLine: {
        type: "number",
        description: "结束行号(可选,默认到文件末尾)"
      }
    },
    required: ["path"]
  }
};

export async function read_file(args: {
  path: string;
  startLine?: number;
  endLine?: number;
}): Promise<ToolCallResult> {
  try {
    const content = await fs.readFile(args.path, "utf-8");
    const lines = content.split("\n");
    const start = (args.startLine || 1) - 1;
    const end = args.endLine || lines.length;
    const selected = lines.slice(start, end).join("\n");

    return {
      content: [{
        type: "text",
        text: "文件: " + args.path + "\n行数: " + lines.length + "\n---\n" + selected
      }]
    };
  } catch (error: any) {
    return {
      content: [{ type: "text", text: "读取失败: " + error.message }],
      isError: true
    };
  }
}
// src/tools/search_file.ts
import * as fs from "fs/promises";
import * as path from "path";
import { ToolDefinition, ToolCallResult } from "../types";

export const search_file_tool: ToolDefinition = {
  name: "search_file",
  description: "递归搜索指定目录下包含关键词的文件,返回匹配的行内容",
  inputSchema: {
    type: "object",
    properties: {
      directory: {
        type: "string",
        description: "搜索目录路径"
      },
      keyword: {
        type: "string",
        description: "搜索关键词"
      },
      extension: {
        type: "string",
        description: "文件扩展名过滤(如 .txt, .md)"
      }
    },
    required: ["directory", "keyword"]
  }
};

async function searchInFile(
  filepath: string,
  keyword: string
): Promise<string | null> {
  try {
    const content = await fs.readFile(filepath, "utf-8");
    const lines = content.split("\n");
    const matches = lines
      .map((line, idx) => ({ line, num: idx + 1 }))
      .filter(item => item.line.includes(keyword));

    if (matches.length > 0) {
      return matches
        .slice(0, 10)
        .map(m => "  " + m.num + ": " + m.line.trim())
        .join("\n");
    }
    return null;
  } catch {
    return null;
  }
}

export async function search_file(args: {
  directory: string;
  keyword: string;
  extension?: string;
}): Promise<ToolCallResult> {
  const results: string[] = [];

  async function walk(dir: string) {
    const entries = await fs.readdir(dir, { withFileTypes: true });
    for (const entry of entries) {
      if (entry.name.startsWith(".")) continue;
      const fullPath = path.join(dir, entry.name);
      if (entry.isDirectory()) {
        await walk(fullPath);
      } else if (entry.isFile()) {
        if (args.extension && !entry.name.endsWith(args.extension)) continue;
        const match = await searchInFile(fullPath, args.keyword);
        if (match) {
          results.push(fullPath + ":\n" + match + "\n");
        }
      }
    }
  }

  await walk(args.directory);

  if (results.length === 0) {
    return {
      content: [{ type: "text", text: "未找到匹配结果" }],
      isError: false
    };
  }

  return {
    content: [{
      type: "text",
      text: "找到 " + results.length + " 个匹配文件:\n---\n" + results.join("\n---\n")
    }]
  };
}
// src/index.ts - Server主入口
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema
} from "@modelcontextprotocol/sdk/types.js";
import { read_file_tool, read_file } from "./tools/read_file";
import { search_file_tool, search_file } from "./tools/search_file";
import { file_info_tool, file_info } from "./tools/file_info";

const server = new Server(
  { name: "file-manager-mcp", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

// 注册所有工具
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [read_file_tool, search_file_tool, file_info_tool]
  };
});

// 处理工具调用
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file":
      return await read_file(args);
    case "search_file":
      return await search_file(args);
    case "file_info":
      return await file_info(args);
    default:
      return {
        content: [{ type: "text", text: "未知工具: " + name }],
        isError: true
      };
  }
});

// 启动stdio传输
const transport = new StdioServerTransport();
server.connect(transport);

console.error("文件管理MCP Server已启动");

让AI Agent调用MCP Server

使用Python SDK连接

# mcp_client.py
from mcp import ClientSession, StdioServerParameters
import asyncio

class MCPFileClient:
    def __init__(self):
        self.session = None

    async def connect(self):
        params = StdioServerParameters(
            command="node",
            args=["/path/to/mcp_file_server/dist/index.js"]
        )
        async with ClientSession(params) as session:
            await session.initialize()
            self.session = session
            return session

    async def read_file(self, path: str, startLine: int = None, endLine: int = None):
        result = await self.session.call_tool("read_file", {
            "path": path,
            **( {"startLine": startLine} if startLine else {}),
            **( {"endLine": endLine} if endLine else {})
        })
        return result.content[0].text

    async def search(self, directory: str, keyword: str, extension: str = None):
        result = await self.session.call_tool("search_file", {
            "directory": directory,
            "keyword": keyword,
            **( {"extension": extension} if extension else {})
        })
        return result.content[0].text

# 使用示例
async def main():
    client = MCPFileClient()
    await client.connect()

    # 让AI决定调用什么工具
    ai_decision = "帮我读取 config.json 的前20行"

    if "读取" in ai_decision and "config" in ai_decision:
        content = await client.read_file("/project/config.json", 1, 20)
        print(content)

asyncio.run(main())

集成到OpenClaw Agent

如果你用OpenClaw,可以直接在Agent配置里挂载MCP Server:

{
  "mcpServers": {
    "fileManager": {
      "command": "node",
      "args": ["/path/to/mcp_file_server/dist/index.js"],
      "enabled": true
    },
    "webSearch": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
    }
  }
}

配置好后,Agent就能直接用自然语言调用这些工具,比如"帮我搜索项目里所有包含API密钥的文件"。

MCP生态现状和选型建议

目前(2026年初)MCP生态还处于快速发展期,主流工具大概分这几类:

类型代表项目成熟度适用场景
官方SDK@modelcontextprotocol/sdk生产级TypeScript/JavaScript开发MCP Server
文件系统server-filesystem生产级让AI读写本地文件
浏览器控制server-playwrightBetaAI自动化操作网页
GitHub集成server-github生产级代码管理和PR操作
数据库server-postgres, server-sqliteBetaAI查询和分析数据
Slack/Discordserver-slack, server-discordBetaAI操作群聊和消息

踩坑总结和最佳实践

我在这套方案上踩过几个坑,记下来帮你避雷:

  • stdio模式慎用于长时间运行的服务:stdio模式下,如果Server卡住(比如死循环),Client会一直等待。建议用HTTP+SSE模式,支持超时控制。
  • 工具返回的文本长度要控制:大文件全文返回会让AI上下文爆炸。一定要加行数限制或文件大小判断。
  • 安全隔离别忘了:MCP Server以Agent权限运行,如果接文件系统的Server,不要让AI能读敏感目录(如.ssh、.npmrc)。
  • 错误处理要友好:工具执行失败时,返回的isError字段和友好的错误信息比抛异常更重要。

进阶:从工具调用到工作流编排

单个工具调用是MCP的起点,真正的价值在于组合多个工具形成工作流。比如:

# 工作流示例:自动化代码审查
async function code_review_flow(repo_path: string):
    # 1. 搜索所有修改的文件
    changed = await search(repo_path, "TODO|FIXME|BUG")
    # 2. 读取关键文件
    files = parse_changed_files(changed)
    reviews = [await read_file(f) for f in files]
    # 3. 调用LLM分析
    analysis = await llm.analyze(reviews)
    # 4. 生成报告
    await write_report(analysis)

这种工具编排能力才是MCP的真正魅力——它让AI Agent从"能做什么"进化到"能规划做什么"。

相关文章推荐AI Agent定时任务自动执行实战 | OpenClaw Agent实战部署 | 大模型RAG知识库搭建教程

版权声明

本文仅代表个人观点。
本文系AI辅助作者原创,未经许可,转载请保留原文链接。

发表评论