Files
moss-ai/agents/conductor_agent/wechat_tools.py
雷雨 8635b84b2d init
2025-12-15 22:05:56 +08:00

219 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
微信相关工具
提供微信聊天记录获取和消息发送功能
"""
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from typing import List, Optional
import logging
import sys
import os
# 添加父目录到路径以导入backend模块
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '..', 'app', 'backend-python'))
logger = logging.getLogger(__name__)
class WechatChatHistoryArgs(BaseModel):
"""获取微信聊天记录参数"""
to_user: str = Field(..., description="好友或群聊的备注或昵称")
target_date: str = Field(..., description="目标日期格式为YY/M/D如25/11/10表示2025年11月10日")
class WechatSendMessageArgs(BaseModel):
"""发送单条消息参数"""
to_user: str = Field(..., description="好友或群聊的备注或昵称")
message: str = Field(..., description="要发送的消息内容")
class WechatSendMultipleMessagesArgs(BaseModel):
"""发送多条消息给单个好友参数"""
to_user: str = Field(..., description="好友或群聊的备注或昵称")
messages: List[str] = Field(..., description="要发送的消息列表")
class WechatSendToMultipleFriendsArgs(BaseModel):
"""发送消息给多个好友参数"""
to_users: List[str] = Field(..., description="好友或群聊的备注或昵称列表")
message: str = Field(..., description="要发送的消息内容(单条消息会发给所有好友)")
@tool("get_wechat_chat_history", args_schema=WechatChatHistoryArgs, description="获取微信特定日期的聊天记录")
def get_wechat_chat_history(
to_user: str,
target_date: str
) -> str:
"""
获取微信聊天记录
Args:
to_user: 好友或群聊的备注或昵称
target_date: 目标日期格式为YY/M/D如25/11/10
Returns:
聊天记录的文本描述
"""
try:
# 导入MCP服务
from services.wechat_mcp_service import get_wechat_mcp_service
import asyncio
mcp_service = get_wechat_mcp_service()
# 调用MCP服务
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.get_chat_history(to_user, target_date)
)
loop.close()
if result and result.get("success"):
data = result.get("data", "")
if isinstance(data, str):
return f"✅ 成功获取与 {to_user}{target_date} 的聊天记录:\n\n{data}"
else:
return f"✅ 成功获取聊天记录:\n\n{str(data)}"
else:
error_msg = result.get("message", "未知错误") if result else "服务无响应"
return f"❌ 获取聊天记录失败:{error_msg}\n提示:请确保微信桌面版已登录且窗口可以被操作"
except Exception as e:
logger.error(f"获取微信聊天记录失败: {e}", exc_info=True)
return f"❌ 获取微信聊天记录时发生错误:{str(e)}"
@tool("send_wechat_message", args_schema=WechatSendMessageArgs, description="向单个微信好友发送单条消息")
def send_wechat_message(
to_user: str,
message: str
) -> str:
"""
发送单条消息给单个微信好友
Args:
to_user: 好友或群聊的备注或昵称
message: 要发送的消息内容
Returns:
发送结果描述
"""
try:
# 导入MCP服务
from services.wechat_mcp_service import get_wechat_mcp_service
import asyncio
mcp_service = get_wechat_mcp_service()
# 调用MCP服务
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.send_message(to_user, message)
)
loop.close()
if result and result.get("success"):
return f"✅ 消息已成功发送给 {to_user}"
else:
error_msg = result.get("message", "未知错误") if result else "服务无响应"
return f"❌ 消息发送失败:{error_msg}\n提示:请确保微信桌面版已登录且窗口可以被操作"
except Exception as e:
logger.error(f"发送微信消息失败: {e}", exc_info=True)
return f"❌ 发送微信消息时发生错误:{str(e)}"
@tool("send_multiple_wechat_messages", args_schema=WechatSendMultipleMessagesArgs, description="向单个微信好友发送多条消息")
def send_multiple_wechat_messages(
to_user: str,
messages: List[str]
) -> str:
"""
发送多条消息给单个微信好友
Args:
to_user: 好友或群聊的备注或昵称
messages: 要发送的消息列表
Returns:
发送结果描述
"""
try:
# 导入MCP服务
from services.wechat_mcp_service import get_wechat_mcp_service
import asyncio
mcp_service = get_wechat_mcp_service()
# 调用MCP服务
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.send_multiple_messages(to_user, messages)
)
loop.close()
if result and result.get("success"):
return f"✅ 已成功向 {to_user} 发送 {len(messages)} 条消息"
else:
error_msg = result.get("message", "未知错误") if result else "服务无响应"
return f"❌ 批量消息发送失败:{error_msg}\n提示:请确保微信桌面版已登录且窗口可以被操作"
except Exception as e:
logger.error(f"批量发送微信消息失败: {e}", exc_info=True)
return f"❌ 批量发送微信消息时发生错误:{str(e)}"
@tool("send_wechat_to_multiple_friends", args_schema=WechatSendToMultipleFriendsArgs, description="向多个微信好友发送消息")
def send_wechat_to_multiple_friends(
to_users: List[str],
message: str
) -> str:
"""
向多个微信好友发送消息
Args:
to_users: 好友或群聊的备注或昵称列表
message: 要发送的消息内容(单条消息会发给所有好友)
Returns:
发送结果描述
"""
try:
# 导入MCP服务
from services.wechat_mcp_service import get_wechat_mcp_service
import asyncio
mcp_service = get_wechat_mcp_service()
# 调用MCP服务
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.send_to_multiple_friends(to_users, message)
)
loop.close()
if result and result.get("success"):
return f"✅ 消息已成功发送给 {len(to_users)} 位好友:{', '.join(to_users)}"
else:
error_msg = result.get("message", "未知错误") if result else "服务无响应"
return f"❌ 群发消息失败:{error_msg}\n提示:请确保微信桌面版已登录且窗口可以被操作"
except Exception as e:
logger.error(f"群发微信消息失败: {e}", exc_info=True)
return f"❌ 群发微信消息时发生错误:{str(e)}"
# 导出所有工具
__all__ = [
'get_wechat_chat_history',
'send_wechat_message',
'send_multiple_wechat_messages',
'send_wechat_to_multiple_friends'
]