Files
雷雨 8635b84b2d init
2025-12-15 22:05:56 +08:00

427 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from langchain_core.tools import tool
from miio import AirConditioningCompanionMcn02
import json
from pydantic import BaseModel, Field
import logging
import asyncio
import os
# 配置日志
logger = logging.getLogger(__name__)
# 默认配置 - 如果 MCP 不可用,会回退到这些配置
DEFAULT_SYSTEM_USER_ID = 1000000001 # admin 用户ID
DEFAULT_AC_NAME = "空调"
AC_IP = "192.168.110.129" # 默认IP回退用
AC_TOKEN = "1724bf8d57b355173dfa08ae23367f86" # 默认Token回退用
AC_MODEL = "lumi.acpartner.mcn02"
# 设备缓存(避免频繁查询)
_device_cache = {}
async def get_device_info_from_mcp(system_user_id: int, device_name: str = "空调") -> dict:
"""
通过 MCP 服务获取设备信息
注意:这需要 MCP 服务运行,如果不可用会返回 None
"""
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# 获取当前文件所在目录,计算 MCP 服务路径
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(current_dir))
mcp_path = os.path.join(project_root, "mcp", "device_query_mcp.py")
logger.info(f"正在通过 MCP 查询设备: {device_name}")
# 创建 MCP 客户端
server_params = StdioServerParameters(
command="python",
args=[mcp_path],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化
await session.initialize()
# 调用工具获取设备信息
result = await session.call_tool(
"get_device_by_name",
arguments={
"system_user_id": system_user_id,
"device_name": device_name
}
)
# 解析结果
result_data = json.loads(result.content[0].text if hasattr(result, 'content') else result)
if result_data.get("success") and result_data.get("devices"):
devices = result_data["devices"]
if devices:
# 返回第一个匹配的设备
device = devices[0]
return device
return None
except Exception as e:
logger.error(f"❌ 通过 MCP 获取设备信息失败: {e}")
return None
def get_device_config(device_name: str = DEFAULT_AC_NAME, system_user_id: int = DEFAULT_SYSTEM_USER_ID) -> dict:
"""
获取设备配置(优先使用 MCP失败则使用缓存或默认配置
返回格式:
{
"ip": "192.168.110.123",
"token": "1724bf8d57b355173dfa08ae23367f86",
"model": "lumi.acpartner.mcn02",
"name": "客厅空调"
}
"""
cache_key = f"{system_user_id}_{device_name}"
# 检查缓存
if cache_key in _device_cache:
logger.info(f"使用缓存的设备信息: {device_name}")
return _device_cache[cache_key]
# 尝试通过 MCP 获取
try:
# 在线程池中需要创建新的事件循环
try:
device_info = asyncio.run(
get_device_info_from_mcp(system_user_id, device_name)
)
except RuntimeError:
# 如果已经有事件循环在运行,创建新的
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
device_info = loop.run_until_complete(
get_device_info_from_mcp(system_user_id, device_name)
)
finally:
loop.close()
if device_info:
config = {
"ip": device_info.get("localip", ""),
"token": device_info.get("token", ""),
"model": device_info.get("model", ""),
"name": device_info.get("name", device_name),
"did": device_info.get("did", ""),
"isOnline": device_info.get("isOnline", False),
}
# 缓存设备信息5分钟有效
_device_cache[cache_key] = config
return config
except Exception as e:
logger.error(f"获取设备配置失败: {e}")
# 如果 MCP 不可用,返回默认配置(向后兼容)
return {
"ip": AC_IP,
"token": AC_TOKEN,
"model": AC_MODEL,
"name": "空调",
}
def get_device_connection(device_name: str = DEFAULT_AC_NAME):
"""获取设备连接"""
config = get_device_config(device_name)
if not config.get("ip") or not config.get("token"):
raise ValueError(f"设备 {device_name} 配置不完整,缺少 IP 或 Token")
return AirConditioningCompanionMcn02(
ip=config["ip"],
token=config["token"]
)
@tool("get_ac_status", description="获取空调当前状态")
def get_ac_status(device_name: str = DEFAULT_AC_NAME):
"""
获取设备状态并以 JSON 格式返回
参数:
device_name: 设备名称,默认为"空调",可以指定具体的设备名如"客厅空调"
"""
try:
config = get_device_config(device_name)
device = get_device_connection(device_name)
props = device.send("get_prop", ["power", "mode", "tar_temp", "fan_level", "ver_swing", "load_power"])
status = {
"device_name": config.get("name", device_name),
"power": props[0],
"mode": props[1],
"target_temperature": props[2],
"fan_level": props[3],
"vertical_swing": props[4],
"load_power": props[5],
"online": True,
"model": config.get("model", "unknown"),
"ip": config.get("ip", ""),
}
return json.dumps(status, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"获取空调状态失败: {e}")
config = get_device_config(device_name)
error_status = {
"error": f"获取设备状态失败: {str(e)}",
"message": f"请检查:\n1. 设备是否已开启并连接到网络\n2. 设备IP地址是否配置正确当前配置{config.get('ip', 'unknown')}\n3. 设备Token是否正确",
"online": False,
"model": config.get("model", "unknown")
}
return json.dumps(error_status, indent=2, ensure_ascii=False)
class PowerArgs(BaseModel):
power: bool = Field(..., description="空调电源状态true 开启false 关闭")
device_name: str = Field(default=DEFAULT_AC_NAME, description="设备名称,默认为'空调'")
@tool("set_ac_power", args_schema=PowerArgs, description="开启或关闭空调。power=true 开启power=false 关闭")
def set_ac_power(power: bool, device_name: str = DEFAULT_AC_NAME):
"""
设置空调电源状态
参数:
power: True 开启False 关闭
device_name: 设备名称,默认为"空调"
"""
try:
config = get_device_config(device_name)
device = get_device_connection(device_name)
if power:
device.on()
return json.dumps({
"message": f"{config.get('name', device_name)} 已开启",
"power": True
}, indent=2, ensure_ascii=False)
else:
device.off()
return json.dumps({
"message": f"{config.get('name', device_name)} 已关闭",
"power": False
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"设置空调电源失败: {e}")
config = get_device_config(device_name)
error_status = {
"error": f"设置电源状态失败: {str(e)}",
"message": f"请检查:\n1. 设备是否已开启并连接到网络\n2. 设备IP地址是否配置正确当前配置{config.get('ip', 'unknown')}\n3. 设备Token是否正确",
"online": False,
"model": config.get("model", "unknown")
}
return json.dumps(error_status, indent=2, ensure_ascii=False)
class TemperatureArgs(BaseModel):
temperature: int = Field(..., ge=16, le=30, description="目标温度(摄氏度),范围 16-30")
device_name: str = Field(default=DEFAULT_AC_NAME, description="设备名称,默认为'空调'")
@tool("set_ac_temperature", args_schema=TemperatureArgs, description="设置空调目标温度16-30℃")
def set_ac_temperature(temperature: int, device_name: str = DEFAULT_AC_NAME):
"""
设置空调目标温度
参数:
temperature: 目标温度16-30℃
device_name: 设备名称,默认为"空调"
"""
try:
config = get_device_config(device_name)
device = get_device_connection(device_name)
# 对于 mcn02目标温度字段为 tar_temp对应的设置命令通常为 set_tar_temp
result = device.send("set_tar_temp", [temperature])
logger.info(f"{config.get('name', device_name)} 温度已设置为{temperature}")
return json.dumps({
"message": f"{config.get('name', device_name)} 温度已设置为{temperature}",
"target_temperature": temperature,
"result": result
}, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"设置空调温度失败: {e}")
config = get_device_config(device_name)
error_status = {
"error": f"设置温度失败: {str(e)}",
"message": f"请检查:\n1. 设备是否已开启并连接到网络\n2. 设备IP地址是否配置正确当前配置{config.get('ip', 'unknown')}\n3. 设备Token是否正确",
"online": False,
"model": config.get("model", "unknown")
}
return json.dumps(error_status, indent=2, ensure_ascii=False)
@tool("list_devices", description="查询和列出用户的空调设备信息。当用户询问有哪些空调设备时调用此工具。必须传入 system_user_id 参数。")
def list_devices(system_user_id: int):
"""
查询和列出用户的空调设备信息(只返回空调相关设备)
当用户询问以下问题时,必须调用此工具:
- "空调设备信息"
参数:
system_user_id: 系统用户ID必传当前为 1000000001admin用户
返回:
空调设备的详细信息包括设备名称、型号、IP地址、Token、在线状态等
注意:此工具会自动从数据库读取用户的米家账户凭证,无需用户提供账号密码
"""
try:
logger.info(f"准备获取用户 {system_user_id} 的设备列表")
# 1. 预检查MCP 服务文件是否存在
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(current_dir))
mcp_path = os.path.join(project_root, "mcp", "device_query_mcp.py")
if not os.path.exists(mcp_path):
logger.error(f"开发错误MCP 服务文件不存在: {mcp_path}")
return json.dumps({
"success": False,
"message": "请先检查设备查询服务是否启动。"
}, ensure_ascii=False, indent=2)
# 2. 预检查MCP 依赖是否已安装
try:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
except ImportError as e:
logger.error(f"开发错误MCP 模块未安装: {e}")
return json.dumps({
"success": False,
"message": "请先检查设备查询服务是否启动。"
}, ensure_ascii=False, indent=2)
# 3. 调用 MCP 服务
logger.info(f"✅ 预检查通过,正在通过 MCP 获取设备列表...")
async def get_devices():
try:
server_params = StdioServerParameters(
command="python",
args=[mcp_path],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"get_user_devices",
arguments={"system_user_id": system_user_id}
)
return result.content[0].text if hasattr(result, 'content') else str(result)
except Exception as e:
logger.error(f"MCP 调用失败: {e}")
return json.dumps({
"success": False,
"message": "请先检查设备查询服务是否启动。"
}, ensure_ascii=False, indent=2)
# 在线程池中需要创建新的事件循环
try:
devices_json = asyncio.run(get_devices())
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
devices_json = loop.run_until_complete(get_devices())
finally:
loop.close()
# 解析结果
devices_data = json.loads(devices_json)
if devices_data.get("success"):
all_devices = devices_data.get("devices", [])
# 过滤出空调相关的设备
# 空调设备通常包含 "acpartner"、"aircondition" 等关键词
ac_devices = []
for device in all_devices:
model = device.get("model", "").lower()
name = device.get("name", "").lower()
if "acpartner" in model or "aircondition" in model or "空调" in name or "ac" in name:
ac_devices.append(device)
if len(ac_devices) == 0:
return json.dumps({
"success": True,
"message": "未找到空调设备",
"total_devices": 0,
"devices": []
}, indent=2, ensure_ascii=False)
# 构建友好的输出(只包含空调设备)
device_list = []
for i, device in enumerate(ac_devices, 1):
device_info = {
"序号": i,
"设备名称": device.get("name", "未命名"),
"型号": device.get("model", "未知"),
"在线状态": "在线" if device.get("isOnline") else "离线",
"IP地址": device.get("localip", "N/A"),
"Token": device.get("token", "N/A"),
"所属家庭": device.get("home_name", "N/A"),
}
device_list.append(device_info)
return json.dumps({
"success": True,
"message": f"找到 {len(ac_devices)} 个空调设备",
"total_devices": len(ac_devices),
"devices": device_list
}, indent=2, ensure_ascii=False)
else:
# 判断是否是凭证问题
error_msg = devices_data.get("message", "")
if "未找到小米账号绑定信息" in error_msg or "未找到" in error_msg:
return json.dumps({
"success": False,
"message": "未查询到绑定米家账户的Token请先绑定米家账户。\n可以通过后端API进行绑定POST /api/v1/xiaomi/login/start"
}, indent=2, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"message": error_msg
}, indent=2, ensure_ascii=False)
except Exception as e:
error_str = str(e)
logger.error(f"列出设备失败: {e}")
# 判断错误类型,给出友好提示
if "Connection refused" in error_str or "timeout" in error_str.lower():
return json.dumps({
"success": False,
"message": "请先开启设备查询MCP服务无法连接到MCP服务。"
}, indent=2, ensure_ascii=False)
elif "No module named" in error_str:
return json.dumps({
"success": False,
"message": f"请先安装MCP所需的依赖模块{error_str}"
}, indent=2, ensure_ascii=False)
else:
return json.dumps({
"success": False,
"message": f"获取设备列表失败:{error_str}"
}, indent=2, ensure_ascii=False)