Files
moss-ai/agents/conductor_agent/dida_tools.py

384 lines
14 KiB
Python
Raw Permalink Normal View History

2025-12-15 22:05:56 +08:00
"""
滴答清单相关工具
提供滴答清单任务项目标签等管理功能
"""
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
import logging
import sys
import os
# 添加父目录到路径以导入backend模块
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '..', 'app', 'backend-python'))
logger = logging.getLogger(__name__)
class DidaTaskArgs(BaseModel):
"""滴答清单任务操作参数"""
system_user_id: int = Field(..., description="系统用户ID固定为1000000001")
action: str = Field(..., description="操作类型create_task, get_tasks, update_task, delete_task, complete_task")
task_id: Optional[str] = Field(None, description="任务ID更新/删除/完成操作必需)")
title: Optional[str] = Field(None, description="任务标题(创建操作必需)")
content: Optional[str] = Field(None, description="任务内容/描述")
project_id: Optional[str] = Field(None, description="项目ID")
class DidaProjectArgs(BaseModel):
"""滴答清单项目操作参数"""
system_user_id: int = Field(..., description="系统用户ID固定为1000000001")
action: str = Field(..., description="操作类型get_projects, create_project, update_project, delete_project")
project_id: Optional[str] = Field(None, description="项目ID更新/删除操作必需)")
name: Optional[str] = Field(None, description="项目名称(创建操作必需)")
color: Optional[str] = Field(None, description="项目颜色")
def _get_dida_credentials(system_user_id: int) -> Optional[Dict]:
"""
获取用户的滴答清单凭证
Args:
system_user_id: 系统用户ID
Returns:
凭证字典包含access_token等
"""
try:
from database import query, get_db_type, init_database
# 确保数据库已初始化
init_database(strict_mode=False)
db_type = get_db_type()
if db_type == "mysql":
sql = """
SELECT access_token, refresh_token, dida_username
FROM dida_credentials
WHERE system_user_id = %s AND is_active = 1
ORDER BY updated_at DESC
LIMIT 1
"""
else:
# StarRocks: 取最新记录
sql = """
SELECT access_token, refresh_token, dida_username
FROM dida_credentials
WHERE system_user_id = %s
ORDER BY updated_at DESC
LIMIT 1
"""
result = query(sql, (system_user_id,))
if result and len(result) > 0:
return result[0]
return None
except Exception as e:
logger.error(f"获取滴答清单凭证失败: {e}")
return None
@tool("manage_dida_task", args_schema=DidaTaskArgs, description="管理滴答清单任务(创建、查询、更新、删除、完成)")
def manage_dida_task(
system_user_id: int,
action: str,
task_id: Optional[str] = None,
title: Optional[str] = None,
content: Optional[str] = None,
project_id: Optional[str] = None
) -> str:
"""
管理滴答清单任务
支持的操作
- create_task: 创建任务需要title
- get_tasks: 获取任务列表可选project_id过滤
- update_task: 更新任务需要task_id和要更新的字段
- delete_task: 删除任务需要task_id
- complete_task: 完成任务需要task_id
Args:
system_user_id: 系统用户ID
action: 操作类型
task_id: 任务ID
title: 任务标题
content: 任务内容
project_id: 项目ID
Returns:
操作结果的JSON字符串
"""
try:
# 获取凭证
credentials = _get_dida_credentials(system_user_id)
if not credentials:
return "❌ 未找到滴答清单绑定信息,请先在账户设置中绑定滴答清单账号"
access_token = credentials["access_token"]
# 导入MCP服务
from services.dida_mcp_service import get_dida_mcp_service
import asyncio
mcp_service = get_dida_mcp_service()
# 根据操作类型调用不同的MCP方法
if action == "create_task":
if not title:
return "❌ 创建任务需要提供标题title"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.create_task(access_token, title, content or "", project_id)
)
loop.close()
# MCP直接返回任务对象字典不是包装在{success: true}中
if result and isinstance(result, dict) and result.get("id"):
task_id = result.get("id")
task_title = result.get("title", title)
return f"✅ 任务创建成功:{task_title}\n📌 任务ID: {task_id}"
else:
return f"❌ 任务创建失败:{str(result) if result else '未知错误'}"
elif action == "get_tasks":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.get_tasks(access_token, project_id)
)
loop.close()
# MCP直接返回任务列表不是包装在字典中
if result and isinstance(result, list):
if not result:
return "📋 您目前没有任务"
task_list = []
for task in result: # 显示所有任务,不截取
task_info = f"- {task.get('title', '未命名任务')}"
# 检查是否已完成
if task.get('status') == 2 or task.get('isCompleted'):
task_info = f"{task_info}"
else:
task_info = f"📌 {task_info}"
# 添加优先级和截止日期信息
priority = task.get('priority')
if priority == 5:
task_info += " [高优先级]"
elif priority == 3:
task_info += " [中优先级]"
due_date = task.get('dueDate')
if due_date:
task_info += f" 📅 {due_date[:10]}" # 只显示日期部分
task_list.append(task_info)
return f"📋 任务列表(共{len(result)}个):\n" + "\n".join(task_list)
else:
return f"❌ 获取任务失败:{str(result)}"
elif action == "update_task":
if not task_id:
return "❌ 更新任务需要提供任务IDtask_id"
update_data = {}
if title:
update_data["title"] = title
if content:
update_data["content"] = content
if project_id:
update_data["project_id"] = project_id
if not update_data:
return "❌ 更新任务需要至少提供一个要更新的字段"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.update_task(access_token, task_id, **update_data)
)
loop.close()
if result and result.get("success"):
return "✅ 任务更新成功"
else:
return f"❌ 任务更新失败:{result.get('message', '未知错误')}"
elif action == "delete_task":
if not task_id:
return "❌ 删除任务需要提供任务IDtask_id"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.delete_task(access_token, task_id)
)
loop.close()
if result and result.get("success"):
return "✅ 任务删除成功"
else:
return f"❌ 任务删除失败:{result.get('message', '未知错误')}"
elif action == "complete_task":
if not task_id:
return "❌ 完成任务需要提供任务IDtask_id"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.complete_task(access_token, task_id)
)
loop.close()
if result and result.get("success"):
return "✅ 任务已标记为完成"
else:
return f"❌ 标记任务完成失败:{result.get('message', '未知错误')}"
else:
return f"❌ 不支持的操作类型:{action}"
except Exception as e:
logger.error(f"管理滴答清单任务失败: {e}", exc_info=True)
return f"❌ 操作失败:{str(e)}"
@tool("manage_dida_project", args_schema=DidaProjectArgs, description="管理滴答清单项目(查询、创建、更新、删除)")
def manage_dida_project(
system_user_id: int,
action: str,
project_id: Optional[str] = None,
name: Optional[str] = None,
color: Optional[str] = None
) -> str:
"""
管理滴答清单项目
支持的操作
- get_projects: 获取项目列表
- create_project: 创建项目需要name
- update_project: 更新项目需要project_id和要更新的字段
- delete_project: 删除项目需要project_id
Args:
system_user_id: 系统用户ID
action: 操作类型
project_id: 项目ID
name: 项目名称
color: 项目颜色
Returns:
操作结果的JSON字符串
"""
try:
# 获取凭证
credentials = _get_dida_credentials(system_user_id)
if not credentials:
return "❌ 未找到滴答清单绑定信息,请先在账户设置中绑定滴答清单账号"
access_token = credentials["access_token"]
# 导入MCP服务
from services.dida_mcp_service import get_dida_mcp_service
import asyncio
mcp_service = get_dida_mcp_service()
# 根据操作类型调用不同的MCP方法
if action == "get_projects":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.get_projects(access_token)
)
loop.close()
# MCP直接返回项目列表不是包装在字典中
if result and isinstance(result, list):
if not result:
return "📁 您目前没有项目"
project_list = []
for project in result: # 显示所有项目,不截取
project_info = f"- {project.get('name', '未命名项目')} (ID: {project.get('id')})"
project_list.append(project_info)
return f"📁 项目列表(共{len(result)}个):\n" + "\n".join(project_list)
else:
return f"❌ 获取项目失败:{str(result)}"
elif action == "create_project":
if not name:
return "❌ 创建项目需要提供名称name"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.create_project(access_token, name, color)
)
loop.close()
if result and result.get("success"):
return f"✅ 项目创建成功:{name}"
else:
return f"❌ 项目创建失败:{result.get('message', '未知错误')}"
elif action == "update_project":
if not project_id:
return "❌ 更新项目需要提供项目IDproject_id"
update_data = {}
if name:
update_data["name"] = name
if color:
update_data["color"] = color
if not update_data:
return "❌ 更新项目需要至少提供一个要更新的字段"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.update_project(access_token, project_id, **update_data)
)
loop.close()
if result and result.get("success"):
return "✅ 项目更新成功"
else:
return f"❌ 项目更新失败:{result.get('message', '未知错误')}"
elif action == "delete_project":
if not project_id:
return "❌ 删除项目需要提供项目IDproject_id"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(
mcp_service.delete_project(access_token, project_id)
)
loop.close()
if result and result.get("success"):
return "✅ 项目删除成功"
else:
return f"❌ 项目删除失败:{result.get('message', '未知错误')}"
else:
return f"❌ 不支持的操作类型:{action}"
except Exception as e:
logger.error(f"管理滴答清单项目失败: {e}", exc_info=True)
return f"❌ 操作失败:{str(e)}"