from typing import Any from langchain_core.messages import AIMessage, ToolMessage from langchain_openai import ChatOpenAI from langgraph.checkpoint.memory import MemorySaver from langgraph.prebuilt import create_react_agent import logging import sys import os # 添加父目录到路径以导入config_loader sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config_loader import get_config_loader from tools import ( list_available_agents, execute_agent_command, get_agent_status, control_device, get_system_overview, analyze_user_behavior, get_user_insights, query_data_mining_agent, list_xiaomi_devices, search_baidu_ai ) from dida_tools import ( manage_dida_task, manage_dida_project ) from wechat_tools import ( get_wechat_chat_history, send_wechat_message, send_multiple_wechat_messages, send_wechat_to_multiple_friends ) from windows_tools import ( manage_windows_app, execute_powershell_command, execute_windows_shortcut ) memory = MemorySaver() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ConductorAgent: SUPPORTED_CONTENT_TYPES = ['text', 'text/plain'] # 默认系统提示词(备用) DEFAULT_SYSTEM_PROMPT = ( '''你是一个智能家居总管理助手,同时也是一个友好的AI助手。 ## ⚠️ 重要:何时使用工具 vs 直接回答 **直接回答(不调用工具)的情况**: - 一般性知识问答(如"中国的首都是哪"、"1+1等于几") - 闲聊对话(如"你好"、"今天天气怎么样") - 非设备控制相关的请求 - 简单的咨询和问候 **需要调用工具的情况**: - 用户明确要求控制设备(如"打开空调"、"关闭灯") - 查询设备状态(如"空调温度是多少") - 查询设备列表(如"我有哪些设备") - 分析使用习惯(如"我通常什么时候开空调") - 场景设置(如"我要睡觉了"、"起床了") 你的主要职责包括: 1. 管理多个智能设备代理(如空调代理、空气净化器代理等) 2. 协调不同代理之间的工作 3. 提供统一的智能家居控制接口 4. 监控系统整体状态 5. 管理小米智能设备信息查询 6. 回答用户的一般性问题 你可以执行以下操作: - 列出所有可用的代理服务:使用 list_available_agents 工具 - 检查代理状态:使用 get_agent_status 工具 - 向特定代理发送命令:使用 execute_agent_command 工具(适用于复杂的代理间通信) - 控制智能设备:使用 control_device 工具(推荐用于设备控制,会自动调用对应代理并记录日志) - 获取系统概览:使用 get_system_overview 工具 - 分析用户行为:使用 analyze_user_behavior 工具 - 获取用户洞察:使用 get_user_insights 工具 - **场景智能分析**:使用 query_data_mining_agent 工具(重要!) ## 米家设备信息管理(重要更新): **当用户询问"我有哪些设备"、"设备列表"、"米家设备"时,必须使用 list_xiaomi_devices 工具** - 获取米家设备列表:使用 list_xiaomi_devices 工具 - 参数: - **system_user_id(必传)**:当前登录用户的系统用户ID,固定为 1000000001(admin用户) - server(可选):服务器区域,默认cn - **重要**:此工具会自动从数据库读取用户的米家账户凭证,**绝对不要要求用户提供账号密码** - 如果工具返回"未查询到绑定的米家账户",告知用户需要先通过后端API绑定 - 返回:所有米家设备的详细信息,包括Token、IP、MAC等 设备控制指南: 当用户说"开启空调"、"打开空调"、"关闭空调"等命令时,使用 control_device 工具: - device_type: "air_conditioner" (空调) - action: "开启空调" 或 "关闭空调" 或其他用户说的操作 - parameters: 如果有额外参数(如温度),以字典形式传递 当用户说"开启空气净化器"、"关闭空气净化器"等命令时,使用 control_device 工具: - device_type: "air_cleaner" (空气净化器) - action: 对应的操作 **⚠️ 重要:工具返回值处理** - control_device 工具返回的是 JSON 格式,包含 success、message、content、operation_record 等字段 - **你必须解析这个 JSON,提取 content 或 message 字段中的文本,然后用自然语言回复用户** - **绝对不要直接返回 JSON 字符串给用户!** - 示例: - 工具返回:{"success": true, "message": "已打开空调", "content": "空调已开启,温度设置为24度"} - 你应该回复:"好的,我已经为您打开空调,温度设置为24度。" 当用户询问系统状态时,优先调用 get_system_overview 获取整体概览。 当用户询问可用服务时,使用 list_available_agents 工具。 当用户询问使用习惯或需要个性化建议时,使用 analyze_user_behavior 或 get_user_insights 工具。 **场景智能分析(核心功能)**: 当用户描述一个生活场景时(例如:"我要睡觉了"、"起床了"、"要出门了"、"到家了"), 或用户指令模糊时(例如:"打开空调"但未指定温度),使用以下智能处理流程: **智能处理流程(两级保底机制)**: 第一步:优先使用历史习惯数据 1. 调用 query_data_mining_agent 工具,传入用户场景或指令 2. 数据挖掘代理会从StarRocks数据库挖掘用户历史使用习惯 3. 如果有足够的历史数据,返回个性化建议(如"您通常在睡觉时将空调设为26°C") 4. 根据个性化建议执行设备控制 第二步:保底方案 - AI搜索通用最佳实践 当数据挖掘代理返回以下情况时,启用保底方案: - 返回"暂无足够历史数据" - 返回"同一时间操作记录过少" - 用户是新用户,没有历史记录 - 历史数据不足以提供有价值的建议 启用保底方案步骤: 1. 调用 search_baidu_ai 工具 2. 传入智能查询,如:"人类最适合的睡觉温度"、"睡觉时最适合的灯光设置" 3. 获取基于人体工程学和通用最佳实践的建议 4. 向用户说明:"根据通用最佳实践,建议...(随着您使用次数增多,我会学习您的个人习惯)" 5. 根据通用建议执行设备控制 始终以中文回复用户,提供清晰、友好的服务。 如果用户的需求超出了你的能力范围,请礼貌地说明并提供相关建议。 消息返回请使用Markdown格式。''' ) def __init__(self): # 从数据库加载配置(严格模式:配置加载失败则退出) try: config_loader = get_config_loader(strict_mode=True) # 加载AI模型配置 ai_config = config_loader.get_default_ai_model_config() self.model = ChatOpenAI( model=ai_config['model'], api_key=ai_config['api_key'], base_url=ai_config['api_base'], temperature=ai_config['temperature'], ) # 加载系统提示词 system_prompt = config_loader.get_agent_prompt('conductor') self.SYSTEM_PROMPT = system_prompt except Exception as e: logger.error(f"❌ 配置加载失败: {e}") logger.error("⚠️ 请确保:") logger.error(" 1. StarRocks 数据库已启动") logger.error(" 2. 已执行数据库初始化脚本: data/init_config.sql 和 data/ai_config.sql") logger.error(" 3. config.yaml 中的数据库连接配置正确") raise SystemExit(1) from e self.tools = [ list_available_agents, execute_agent_command, get_agent_status, control_device, get_system_overview, analyze_user_behavior, get_user_insights, query_data_mining_agent, list_xiaomi_devices, search_baidu_ai, # 百度AI搜索保底方案 manage_dida_task, # 滴答清单任务管理 manage_dida_project, # 滴答清单项目管理 get_wechat_chat_history, # 微信聊天记录获取 send_wechat_message, # 微信消息发送 send_multiple_wechat_messages, # 批量发送微信消息 send_wechat_to_multiple_friends, # 群发微信消息 manage_windows_app, # Windows应用管理 execute_powershell_command, # PowerShell命令执行 execute_windows_shortcut # Windows快捷键 ] self.graph = create_react_agent( self.model, tools=self.tools, checkpointer=memory, prompt=self.SYSTEM_PROMPT, ) async def invoke(self, query, context_id) -> dict[str, Any]: """非流式调用,直接返回最终结果""" inputs = {'messages': [('user', query)]} config = {'configurable': {'thread_id': context_id}} # 直接调用invoke,不使用stream result = self.graph.invoke(inputs, config) return self.get_agent_response(config) def _extract_text_from_message(self, msg: AIMessage | ToolMessage | Any) -> str: try: content = getattr(msg, 'content', None) if isinstance(content, str): return content if isinstance(content, list): parts = [] for part in content: if isinstance(part, dict) and 'text' in part: parts.append(part['text']) if parts: return '\n'.join(parts) except Exception: pass return '' def _parse_tool_output(self, tool_text: str) -> str: """ 解析工具输出,提取用户友好的消息 如果工具返回 JSON 格式(如 control_device),提取 content 或 message 字段 否则直接返回原文本 """ try: import json # 尝试解析为 JSON data = json.loads(tool_text) # 如果是我们的标准工具返回格式 if isinstance(data, dict): # 优先返回 content 字段(通常是最友好的消息) if 'content' in data and data['content']: return data['content'] # 其次返回 message 字段 if 'message' in data and data['message']: return data['message'] # 如果有推荐信息(数据挖掘Agent) if 'recommendation' in data: # 返回原 JSON,让 Agent 自己格式化 return tool_text except (json.JSONDecodeError, ValueError): # 不是 JSON 格式,直接返回 pass return tool_text def get_agent_response(self, config): current_state = self.graph.get_state(config) messages = current_state.values.get('messages') if hasattr(current_state, 'values') else None # 优先返回最后一条 AI 消息(Agent 的总结) final_text = '' if isinstance(messages, list) and messages: last_msg = messages[-1] final_text = self._extract_text_from_message(last_msg) # 如果最后一条是 AI 消息且有内容,直接返回 if isinstance(last_msg, AIMessage) and final_text: return { 'is_task_complete': True, 'require_user_input': False, 'content': final_text, } # 如果没有 AI 总结消息,回退到工具消息 if isinstance(messages, list) and messages: for msg in reversed(messages): if isinstance(msg, ToolMessage): tool_text = self._extract_text_from_message(msg) if tool_text: # 解析工具输出,提取用户友好的内容 parsed_content = self._parse_tool_output(tool_text) return { 'is_task_complete': True, 'require_user_input': False, 'content': parsed_content, } if not final_text: return { 'is_task_complete': False, 'require_user_input': True, 'content': '当前无法处理您的请求,请稍后重试。', } return { 'is_task_complete': True, 'require_user_input': False, 'content': final_text, }