玩转MCP
玩转MCP
- 0.环境
- 1.自定义stdio交互
- 1.1.server
- 1.2.client
- 1.3.效果
- 2.自定义sse交互
- 2.1.server
- 2.2.client
- 2.3.效果
- 3.使用官方文件
mcp火了好一阵了,最近一直在大院干活儿,好不容易抽出时间,赶紧来学习学习。
官方文档, mcp广场可以搜索自己需要的server服务,都是些saas服务,其实就是把各家的server下载到本地然后调用。官方分stdio和sse两种交互模式。我这里都用代码自定义,方便项目应用。
0.环境
conda create -n mcp python==3.12
conda activate mcp
pip install mcp openai
1.自定义stdio交互
1.1.server
服务端包括三部分:创建mcp客户端,然后写自己的tool都带上@mcp.tool()注解,最后run(这里确定使用stdio方式还是sse方式)
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("my_server", version="1.0.0")@mcp.tool()
def use_my_camera()-> None:passif __name__ == "__main__":mcp.run(transport='stdio')
我的测试代码里写了四个tool,打开本地相机,向本地写入文件,打开spotify并播放某首歌曲,给我一个leetcode题目,写的比较糙,慎用,需要的可以去下载官网的。
这里我在调用spotify的时候记得要去https://developer.spotify.com/dashboard创建自己的开发者账户,拿到自己的client_id和client_secret,这里redirect url最好要写http://127.0.0.1:8888/callback/
from mcp.server.fastmcp import FastMCP
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import random
from bs4 import BeautifulSoup
import cv2
from typing import Any
mcp = FastMCP("my_server", version="1.0.0", port=8000)@mcp.tool()
def use_my_camera()-> None:"""打开本地相机功能Args:NoneRaises:Exception: 如果无法打开相机或无法获取图像帧,则抛出异常。"""cap = cv2.VideoCapture(0)if not cap.isOpened():raise Exception("无法打开相机")try:while True:ret, frame = cap.read()if not ret:raise Exception("无法获取图像帧")cv2.imshow('Camera', frame)if cv2.waitKey(1) & 0xFF == ord('q'):breakfinally:cap.release()cv2.destroyAllWindows()@mcp.tool()
def write_to_txt_page(file_name: str, text: str)-> None:"""向给定的文本写入到指定的本地文件,用于使用txt存储用户的数据。Args:file_name: 要写入的文件名,类型为字符串。text: 要写入文件的文本内容,类型为字符串。Raises:Exception: 如果写入文件时出错,则抛出异常。"""try:with open(file_name, 'a', encoding='utf-8') as file:file.write(text)except Exception as e:raise Exception(f"写入文件时出错: {e}")import requests@mcp.tool()
def play_spotify_song(song_name: str) -> None:"""使用spotify播放指定的歌曲。Args:song_name: 要播放的歌曲名称,类型为字符串。Raises:Exception: 如果播放歌曲时出错,则抛出异常。"""sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id='XXXXXX',#自己的client_secret='XXXXXX',#自己的redirect_uri='http://127.0.0.1:8888/callback/',scope='user-library-read user-read-playback-state user-modify-playback-state'
))try:# 搜索歌曲results = sp.search(q=song_name, limit=1, type='track')if results['tracks']['items']:track_url = results['tracks']['items'][0]['external_urls']['spotify']track_id = results['tracks']['items'][0]['id']print(f"Found song: {track_url}")# 获取当前播放设备并控制播放devices = sp.devices()if devices['devices']:device_id = devices['devices'][0]['id']print(f"Devices available: {devices['devices']}")sp.start_playback(device_id=device_id, uris=[f'spotify:track:{track_id}'])print(f"Playing {song_name} on device {device_id}...")else:raise Exception("No devices available for playback.")else:raise Exception(f"Song '{song_name}' not found!")except Exception as e:raise Exception(f"播放时出错: {e}")@mcp.tool()
def get_leetcode_cn_problem(topic_tag: str, difficulty: str = None) -> dict[str, Any] | None:"""从 LeetCode 中国站 (leetcode.cn) 根据指定标签和难度获取题目。Args:topic_tag (str): 标签,比如 "array", "binary-search", "linked-list" 等。difficulty (str, optional): "EASY", "MEDIUM", "HARD"。Returns:json: 题目信息,以 JSON 格式返回,包括标题、难度和描述。"""try:url = "https://leetcode.cn/graphql/"headers = {"Content-Type": "application/json","Referer": "https://leetcode.cn/problemset/all/","Origin": "https://leetcode.cn","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}filters = {"tags": [topic_tag]}if difficulty:filters["difficulty"] = difficulty.upper() # 注意是 UPPERpayload = {"operationName": "problemsetQuestionList","variables": {"categorySlug": "","filters": filters,"limit": 50,"skip": 0},"query": """query problemsetQuestionList($categorySlug: String, $filters: QuestionListFilterInput, $limit: Int, $skip: Int) {problemsetQuestionList(categorySlug: $categorySlug, filters: $filters, limit: $limit, skip: $skip) {totalquestions {titletitleSlugdifficultypaidOnly}}}"""}response = requests.post(url, json=payload, headers=headers)if response.status_code == 200:data = response.json()questions = data['data']['problemsetQuestionList']['questions']candidates = []for q in questions:if q['paidOnly']:continue # 不要付费题candidates.append(q)if candidates:selected = random.choice(candidates)problem_response = requests.get(f'https://leetcode.cn/problems/{selected["titleSlug"]}', headers=headers)if problem_response.status_code == 200:soup = BeautifulSoup(problem_response.text, 'html.parser')# 提取标题title = soup.find('title').text if soup.find('title') else "未找到标题"# 提取描述description_meta = soup.find('meta', {'name': 'description'})description = description_meta['content'] if description_meta else "未找到描述"import jsonresult = {"title": title,"difficulty": selected['difficulty'],"description": description}return json.dumps(result, ensure_ascii=False)else:error_result = {"error": f"请求失败,状态码: {problem_response.status_code}"}return json.dumps(error_result, ensure_ascii=False)else:error_result = {"error": "未找到符合条件的题目。"}return json.dumps(error_result, ensure_ascii=False)else:error_result = {"error": f"请求失败,状态码: {response.status_code}"}return json.dumps(error_result, ensure_ascii=False)except Exception as e:error_result = {"error": f"发生错误: {str(e)}"}return json.dumps(error_result, ensure_ascii=False)
if __name__ == "__main__":mcp.run(transport='sse')
1.2.client
客户端包括三部分:连接大模型(我用ollama,qwen2.5),连接mcp服务器(这里stdio和sse有区别),处理用户请求(将大模型和mcp服务建立连接,这里需要根据自己的模型效果改改逻辑)。
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
import json
import sys
class MCPClient:def __init__(self):# 初始化会话和客户端对象self.session: Optional[ClientSession] = None # 使用会话管理和 API 客户端进行初始化self.exit_stack = AsyncExitStack() # 使用AsyncExitStack管理资源self.openai_api_key = "ollama" self.base_url = "http://192.168.3.36:11434/v1" #Ollama 服务器的 IP 地址self.model = "qwen2.5:latest" # Ollama 模型名称self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)async def connect_to_server(self, server_script_path: str):"""连接到MCP服务器Args:server_script_path (str): 服务器脚本路径(.py 或.js)"""if not server_script_path.endswith(('.py', '.js')):raise ValueError("服务器脚本必须是.py 或.js 文件")command = "python" if server_script_path.endswith('.py') else "node"server_params = StdioServerParameters(command=command,args=[server_script_path],env=None)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))self.stdio, self.write = stdio_transportself.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# 初始化会话并列出可用工具await self.session.initialize()response = await self.session.list_tools()tools = response.toolsprint("\n已连接到服务器, 可用工具:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:"""处理查询,使用Claude和可用工具Args:query (str): 用户输入的查询Returns:str: 处理后的响应内容"""# 构造消息对象messages = [{"role": "user","content": query}]# 获取可用工具列表response = await self.session.list_tools()# 将工具列表转换为需要的格式available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}}for tool in response.tools]# 第一次调用Claude API# print("进入》》》》》》》process_query")response = self.client.chat.completions.create(model=self.model,messages=messages,tools=available_tools)print("response",response)# 获取API响应的内容content = response.choices[0].messagefinal_text = []assistant_message_content = []if content.tool_calls:# 获取工具调用信息tool_call = content.tool_calls[0]tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)# 调用工具result = await self.session.call_tool(tool_name, tool_args)final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")assistant_message_content.append(content)print(f"\n\n[正在用工具 {tool_name}, 参数为 {tool_args}]\n\n")# 修正消息列表更新逻辑messages.append({"role": "assistant","content": "","tool_calls": [tool_call]})messages.append({"role": "tool","content": result.content[0].text if result.content[0].text else "","name": tool_name,"tool_call_id": tool_call.id})response = self.client.chat.completions.create(model=self.model,messages=messages,tools=available_tools)print("response",response)# 获取API响应的内容content = response.choices[0].messagefinal_text.append(content.content)# 如果没有调用工具,直接返回响应内容else:final_text.append(content.content)assistant_message_content.append(content)return "\n".join(final_text)async def chat_loop(self):"""运行交互式聊天循环"""print("\nMCP 客户端已启动!")print("请输入您的查询或输入 'quit' 退出。")while True:try:query = input("user input:").strip()if query.lower() == 'quit':breakresponse = await self.process_query(query)print("\n" + response)except Exception as e:print(f"\n错误: {str(e)}")async def cleanup(self):"""清理资源"""await self.exit_stack.aclose()async def main():if len(sys.argv) < 2:print("用法: python client.py <path_to_server_script>")sys.exit(1)client = MCPClient()try:await client.connect_to_server(sys.argv[1])await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())
1.3.效果
运行
python client.py server.py
输入命令进行测试,我的命令(写一封500字告白信到我本地,给我播放一首好日子,我想练一个简单的二分法,打开本地相机)。
还可以使用网页端测试效果
pip install mcp[cli]
mcp dev server.py
2.自定义sse交互
sse交互与stdio差别很小,这里只写区别
2.1.server
这里初始化mcp的时候给一个端口号,运行的时候使用sse模式,剩下工具的定义都不变。
mcp = FastMCP("my_server", version="1.0.0", port=8000)
@mcp.tool()
if __name__ == "__main__":mcp.run(transport='sse')
2.2.client
client这里就是连接mcp服务的时候与stdio不同,然后在main里面调用的时候传递的就是server启动的url地址
async def connect_to_server(self, url: str):"""连接到基于 HTTP SSE 的 MCP 工具服务器"""sse_transport = await self.exit_stack.enter_async_context(sse_client(url))self.session = await self.exit_stack.enter_async_context(ClientSession(*sse_transport))await self.session.initialize()response = await self.session.list_tools()tools = response.toolsprint("\n已连接到服务器, 可用工具:", [tool.name for tool in tools])
async def main():client = MCPClient()try:await client.connect_to_server('http://0.0.0.0:8000/sse')await client.chat_loop()finally:await client.cleanup()
2.3.效果
启动服务端python server.py
启动客户端python client.py
网页端测试mcp dev server.py
3.使用官方文件
这里我测试了一个操作键鼠,截屏等操作的ScreenPilot,首先git代码,进入环境。
安装依赖包pip install -r requirements.txt
他的代码是stdio的交互(我没找到sse的),所以client我就用第一个stdio里面的,server就用官方的main文件,所以运行python C:\Users\robot\Desktop\mcp\stdio\client.py C:\Users\robot\Desktop\mcp\ScreenPilot\main.py
就可以调用这个服务。