Files
雷雨 8635b84b2d init
2025-12-15 22:05:56 +08:00

307 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any
from langchain_core.messages import AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
import logging
import sys
import os
# 添加父目录到路径以导入config_loader
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config_loader import get_config_loader
from tools import (
list_available_agents,
execute_agent_command,
get_agent_status,
control_device,
get_system_overview,
analyze_user_behavior,
get_user_insights,
query_data_mining_agent,
list_xiaomi_devices,
search_baidu_ai
)
from dida_tools import (
manage_dida_task,
manage_dida_project
)
from wechat_tools import (
get_wechat_chat_history,
send_wechat_message,
send_multiple_wechat_messages,
send_wechat_to_multiple_friends
)
from windows_tools import (
manage_windows_app,
execute_powershell_command,
execute_windows_shortcut
)
memory = MemorySaver()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ConductorAgent:
SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']
# 默认系统提示词(备用)
DEFAULT_SYSTEM_PROMPT = (
'''你是一个智能家居总管理助手同时也是一个友好的AI助手。
## ⚠️ 重要:何时使用工具 vs 直接回答
**直接回答(不调用工具)的情况**
- 一般性知识问答(如"中国的首都是哪""1+1等于几"
- 闲聊对话(如"你好""今天天气怎么样"
- 非设备控制相关的请求
- 简单的咨询和问候
**需要调用工具的情况**
- 用户明确要求控制设备(如"打开空调""关闭灯"
- 查询设备状态(如"空调温度是多少"
- 查询设备列表(如"我有哪些设备"
- 分析使用习惯(如"我通常什么时候开空调"
- 场景设置(如"我要睡觉了""起床了"
你的主要职责包括:
1. 管理多个智能设备代理(如空调代理、空气净化器代理等)
2. 协调不同代理之间的工作
3. 提供统一的智能家居控制接口
4. 监控系统整体状态
5. 管理小米智能设备信息查询
6. 回答用户的一般性问题
你可以执行以下操作:
- 列出所有可用的代理服务:使用 list_available_agents 工具
- 检查代理状态:使用 get_agent_status 工具
- 向特定代理发送命令:使用 execute_agent_command 工具(适用于复杂的代理间通信)
- 控制智能设备:使用 control_device 工具(推荐用于设备控制,会自动调用对应代理并记录日志)
- 获取系统概览:使用 get_system_overview 工具
- 分析用户行为:使用 analyze_user_behavior 工具
- 获取用户洞察:使用 get_user_insights 工具
- **场景智能分析**:使用 query_data_mining_agent 工具(重要!)
## 米家设备信息管理(重要更新):
**当用户询问"我有哪些设备""设备列表""米家设备"时,必须使用 list_xiaomi_devices 工具**
- 获取米家设备列表:使用 list_xiaomi_devices 工具
- 参数:
- **system_user_id必传**当前登录用户的系统用户ID固定为 1000000001admin用户
- server可选服务器区域默认cn
- **重要**:此工具会自动从数据库读取用户的米家账户凭证,**绝对不要要求用户提供账号密码**
- 如果工具返回"未查询到绑定的米家账户"告知用户需要先通过后端API绑定
- 返回所有米家设备的详细信息包括Token、IP、MAC等
设备控制指南:
当用户说"开启空调""打开空调""关闭空调"等命令时,使用 control_device 工具:
- device_type: "air_conditioner" (空调)
- action: "开启空调""关闭空调" 或其他用户说的操作
- parameters: 如果有额外参数(如温度),以字典形式传递
当用户说"开启空气净化器""关闭空气净化器"等命令时,使用 control_device 工具:
- device_type: "air_cleaner" (空气净化器)
- action: 对应的操作
**⚠️ 重要:工具返回值处理**
- control_device 工具返回的是 JSON 格式,包含 success、message、content、operation_record 等字段
- **你必须解析这个 JSON提取 content 或 message 字段中的文本,然后用自然语言回复用户**
- **绝对不要直接返回 JSON 字符串给用户!**
- 示例:
- 工具返回:{"success": true, "message": "已打开空调", "content": "空调已开启温度设置为24度"}
- 你应该回复:"好的我已经为您打开空调温度设置为24度。"
当用户询问系统状态时,优先调用 get_system_overview 获取整体概览。
当用户询问可用服务时,使用 list_available_agents 工具。
当用户询问使用习惯或需要个性化建议时,使用 analyze_user_behavior 或 get_user_insights 工具。
**场景智能分析(核心功能)**
当用户描述一个生活场景时(例如:"我要睡觉了""起床了""要出门了""到家了"
或用户指令模糊时(例如:"打开空调"但未指定温度),使用以下智能处理流程:
**智能处理流程(两级保底机制)**
第一步:优先使用历史习惯数据
1. 调用 query_data_mining_agent 工具,传入用户场景或指令
2. 数据挖掘代理会从StarRocks数据库挖掘用户历史使用习惯
3. 如果有足够的历史数据,返回个性化建议(如"您通常在睡觉时将空调设为26°C"
4. 根据个性化建议执行设备控制
第二步:保底方案 - AI搜索通用最佳实践
当数据挖掘代理返回以下情况时,启用保底方案:
- 返回"暂无足够历史数据"
- 返回"同一时间操作记录过少"
- 用户是新用户,没有历史记录
- 历史数据不足以提供有价值的建议
启用保底方案步骤:
1. 调用 search_baidu_ai 工具
2. 传入智能查询,如:"人类最适合的睡觉温度""睡觉时最适合的灯光设置"
3. 获取基于人体工程学和通用最佳实践的建议
4. 向用户说明:"根据通用最佳实践,建议...(随着您使用次数增多,我会学习您的个人习惯)"
5. 根据通用建议执行设备控制
始终以中文回复用户,提供清晰、友好的服务。
如果用户的需求超出了你的能力范围,请礼貌地说明并提供相关建议。
消息返回请使用Markdown格式。'''
)
def __init__(self):
# 从数据库加载配置(严格模式:配置加载失败则退出)
try:
config_loader = get_config_loader(strict_mode=True)
# 加载AI模型配置
ai_config = config_loader.get_default_ai_model_config()
self.model = ChatOpenAI(
model=ai_config['model'],
api_key=ai_config['api_key'],
base_url=ai_config['api_base'],
temperature=ai_config['temperature'],
)
# 加载系统提示词
system_prompt = config_loader.get_agent_prompt('conductor')
self.SYSTEM_PROMPT = system_prompt
except Exception as e:
logger.error(f"❌ 配置加载失败: {e}")
logger.error("⚠️ 请确保:")
logger.error(" 1. StarRocks 数据库已启动")
logger.error(" 2. 已执行数据库初始化脚本: data/init_config.sql 和 data/ai_config.sql")
logger.error(" 3. config.yaml 中的数据库连接配置正确")
raise SystemExit(1) from e
self.tools = [
list_available_agents,
execute_agent_command,
get_agent_status,
control_device,
get_system_overview,
analyze_user_behavior,
get_user_insights,
query_data_mining_agent,
list_xiaomi_devices,
search_baidu_ai, # 百度AI搜索保底方案
manage_dida_task, # 滴答清单任务管理
manage_dida_project, # 滴答清单项目管理
get_wechat_chat_history, # 微信聊天记录获取
send_wechat_message, # 微信消息发送
send_multiple_wechat_messages, # 批量发送微信消息
send_wechat_to_multiple_friends, # 群发微信消息
manage_windows_app, # Windows应用管理
execute_powershell_command, # PowerShell命令执行
execute_windows_shortcut # Windows快捷键
]
self.graph = create_react_agent(
self.model,
tools=self.tools,
checkpointer=memory,
prompt=self.SYSTEM_PROMPT,
)
async def invoke(self, query, context_id) -> dict[str, Any]:
"""非流式调用,直接返回最终结果"""
inputs = {'messages': [('user', query)]}
config = {'configurable': {'thread_id': context_id}}
# 直接调用invoke不使用stream
result = self.graph.invoke(inputs, config)
return self.get_agent_response(config)
def _extract_text_from_message(self, msg: AIMessage | ToolMessage | Any) -> str:
try:
content = getattr(msg, 'content', None)
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for part in content:
if isinstance(part, dict) and 'text' in part:
parts.append(part['text'])
if parts:
return '\n'.join(parts)
except Exception:
pass
return ''
def _parse_tool_output(self, tool_text: str) -> str:
"""
解析工具输出,提取用户友好的消息
如果工具返回 JSON 格式(如 control_device提取 content 或 message 字段
否则直接返回原文本
"""
try:
import json
# 尝试解析为 JSON
data = json.loads(tool_text)
# 如果是我们的标准工具返回格式
if isinstance(data, dict):
# 优先返回 content 字段(通常是最友好的消息)
if 'content' in data and data['content']:
return data['content']
# 其次返回 message 字段
if 'message' in data and data['message']:
return data['message']
# 如果有推荐信息数据挖掘Agent
if 'recommendation' in data:
# 返回原 JSON让 Agent 自己格式化
return tool_text
except (json.JSONDecodeError, ValueError):
# 不是 JSON 格式,直接返回
pass
return tool_text
def get_agent_response(self, config):
current_state = self.graph.get_state(config)
messages = current_state.values.get('messages') if hasattr(current_state, 'values') else None
# 优先返回最后一条 AI 消息Agent 的总结)
final_text = ''
if isinstance(messages, list) and messages:
last_msg = messages[-1]
final_text = self._extract_text_from_message(last_msg)
# 如果最后一条是 AI 消息且有内容,直接返回
if isinstance(last_msg, AIMessage) and final_text:
return {
'is_task_complete': True,
'require_user_input': False,
'content': final_text,
}
# 如果没有 AI 总结消息,回退到工具消息
if isinstance(messages, list) and messages:
for msg in reversed(messages):
if isinstance(msg, ToolMessage):
tool_text = self._extract_text_from_message(msg)
if tool_text:
# 解析工具输出,提取用户友好的内容
parsed_content = self._parse_tool_output(tool_text)
return {
'is_task_complete': True,
'require_user_input': False,
'content': parsed_content,
}
if not final_text:
return {
'is_task_complete': False,
'require_user_input': True,
'content': '当前无法处理您的请求,请稍后重试。',
}
return {
'is_task_complete': True,
'require_user_input': False,
'content': final_text,
}