Files
yj_room_agent/yj_room_agent/LLM/ai_service.py
2025-06-24 14:20:38 +08:00

656 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from threading import Lock
from datetime import datetime, timedelta
import requests, json
from apscheduler.schedulers.background import BackgroundScheduler
from .openai_client import call_openai_api
from decouple import config
import threading, re
from ..tools import getinfo, params_filter
import logging
MODEL_NAME = config('MODEL_NAME', default="")
BASE_URL = config('MODEL_BASE_URL', default="")
logger = logging.getLogger('django')
DEFAULT_QUERY_SIZE = config('DEFAULT_QUERY_SIZE', cast=int, default=30)
'''
解析时间,转化为时间戳
'''
def parse_time(time_str: str) -> int:
try:
timestamp = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S").timestamp()
return int(timestamp * 1000)
except Exception as ee:
print("解析时间错误:" + str(ee.with_traceback()))
return int(datetime.now().timestamp() * 1000)
'''
判断是否为json格式
'''
def is_json(myjson):
try:
json_object = json.loads(myjson)
return True
except ValueError:
print("json检查未通过")
return False
def get_time_info(offset: int) -> int:
now = datetime.now()
if offset > 0:
now = now + timedelta(days=offset)
midnight = now.replace(hour=0, minute=0, second=0, microsecond=0)
return int(midnight.timestamp() * 1000)
'''
查询租户下所有会议室类型
'''
def query_room_type(params: dict) -> str:
"""
query_params:
ytenantId not null
pageNum
pageSize
"""
print("当前params : {0}".format(params))
tenant_id = params.get('tenantId', None)
if not tenant_id:
raise Exception("tenantId 不能为空")
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
room_type = getinfo.query_meetingroom_type(params)
content = json.loads(json.dumps(room_type))
return str(content)
'''
查询租户下会议室信息
'''
def query_room_info(data: dict, params: dict) -> str:
# resp = requests.get('http://127.0.0.1:8000/myapi/room/')
"""
query_params:
ytenantId not null
typeIds
pageNum
pageSize
"""
logger.info(f"当前params : {params}")
tenant_id = params.get('tenantId', None)
if not tenant_id:
logger.error("query_room_info ,tenantId is None")
raise Exception("tenantId 不能为空")
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
meeting_room = getinfo.query_meetingroom(params)
content = json.loads(json.dumps(meeting_room))
result = params_filter.filter_params(content['data'], "query_meeting_room")
new_list = []
if data.get('Region') and data.get('Region') != 'None':
region = data.get('Region')
for b in result:
if (region in b.get('name', 'default') or region in b.get('typeName', 'default')) and b.get('capacity',0) >= data.get('capacity', 0):
new_list.append(b)
if len(new_list) >= DEFAULT_QUERY_SIZE:
print("regin--------------- {0}".format(new_list))
return json.dumps(new_list, ensure_ascii=False)
else:
for b in result:
if b.get('capacity',0) >= data.get('capacity', 0):
new_list.append(b)
if len(new_list) >= DEFAULT_QUERY_SIZE:
return json.dumps(new_list, ensure_ascii=False)
print("no regin-------------- {0}".format(new_list))
return json.dumps(new_list, ensure_ascii=False)
'''
预定会议室
'''
# def book_room(data: dict, params: dict) -> str:
# # resp = requests.post('http://127.0.0.1:8000/myapi/room/', data)
# """
# :data {
# startDateTime (not null)
# endDateTime (not null)
# qzId (not null)
# ownnerId (not null)
# subject (not null)
# meetingroomId (not null)
# content (not null)
# personAmount (not null)
# }
# :params: ytenantId (not null)
# :return:
# """
# new_data = {
# "startDateTime": parse_time(data["start_time"]),
# "endDateTime": parse_time(data["end_time"]),
# "qzId": "",
# "content": "",
# "subject": "",
# "ownnerId": "15764457249",
# "meetingroomId": data["room_id"],
# "personAmount": data["capacity"],
# }
# tenant_id = params.get('tenantId', None)
# if not tenant_id:
# raise Exception("tenantId 不能为空")
# access_token = getinfo.get_access_token(tenant_id)
# params['access_token'] = access_token
# booking_info = getinfo.book_meeting_room(params, data)
# content = json.loads(json.dumps(booking_info))
# return str(content)
'''
查询租户下预定信息
'''
def query_booking_info(params: dict) -> str:
tenant_id = params.get('tenantId', None)
if not tenant_id:
raise Exception("tenantId 不能为空")
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
params['startTimeStamp'] = get_time_info(0)
params['endTimeStamp'] = get_time_info(10)
meeting_room = getinfo.query_meetingbooking_info(params)
result = params_filter.filter_state(meeting_room)
print(result)
return json.dumps(result)
'''
预定会议
'''
def book_meeting(params: dict, data: dict) -> str:
"""
data {
"subject": "api333",
"meetingRoomId": "1711130269642129413",
"meetingRoomName": "506",
"meetingDateTimeStamp": 1750099200000,
"startDateTimeStamp": 1750099200000,
"endDateTimeStamp": 1750102800000,
"meetingPersonYhtUserIds": [
"ec4326d0-d490-4d5a-9105-260b51fd16d0"
],
"meetingPersonAmount": 12,
"remindArray": [
15
],
"content": "会议内容",
"isSendMail": false,
"isSendMsg": false
}
params: tenantId (not null)
"""
new_data = {
"subject": data["subject"],
"meetingRoomId": data["room_id"],
"meetingRoomName": "会议室名称",
"meetingDateTimeStamp": int(datetime.now().timestamp() * 1000),
"startDateTimeStamp": parse_time(data["start_time"]),
"endDateTimeStamp": parse_time(data["end_time"]),
"meetingPersonYhtUserIds": [
],
"meetingPersonAmount": data["capacity"],
"remindArray": [
15
],
"content": "会议内容",
"isSendMail": True,
"isSendMsg": True
}
tenant_id = params.get('tenantId', None)
if not tenant_id:
raise Exception("tenantId 不能为空")
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
book_meeting = getinfo.book_meeting(params, new_data)
content = json.loads(json.dumps(book_meeting))
return str(content)
'''
会议编辑
'''
def edit_meeting(params: dict, data: dict):
"""
:params: {tenant_id}
:data {
id (nor null) string
subject string
meetingRoomName string
meetingDateTimeStamp (==startDateTime ) number
startDateTimeStamp number
endDateTimeStamp number
meetingPersonYhtUserIds strArray
meetingPersonAmount number
remindArray 提醒时间枚举值仅有51520306014404320
content
}
"""
tenant_id = params.get('tenantId', None)
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
result = getinfo.edit_meeting_info(params, data)
content = json.loads(json.dumps(result))
print("content is :", content)
return str(content)
'''
第二个会议取消接口
'''
def cancel_meeting(params: dict) -> str:
"""
params{
ytenantId
meetingId
}
:return:
"""
logger.info(f"当前params : {params}")
tenant_id = params.get('tenantId', None)
if not tenant_id:
raise Exception("tenantId 不能为空")
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
book_meeting = getinfo.cancel_meeting(params)
content = json.loads(json.dumps(book_meeting))
print("content is :", content)
return str(content)
def check_and_process_think(content: str) -> str:
filtered_text = re.sub(r"<think\b[^>]*>.*?</think>", '', content, flags=re.DOTALL)
filtered_text = re.sub(r'^.*?</think>', '', filtered_text, flags=re.DOTALL)
return filtered_text
'''
初始提示词构建
'''
def build_prompt(params):
"""构建增强提示词"""
qry_room_info_for_mart_str = '''
{
"func_name":"query_room",
"capacity":20, //参会人数没有则返回0,
"Region":"成都", //会议室所在区域没有则返回None
}
'''
book_for_mart_str = '''
{
"room_id":"11", //会议室ID
"capacity":20,
"subject":"会议主题"
"start_time":"2025-06-04 09:30:10",
"end_time":"2025-06-04 12:30:10",
"user_confirm":1 //用户是否确认,
"func_name":"book_room",
}
'''
qry_book_info_for_mart_str = '''
{
"func_name":"query_booking_info"
}
'''
cancel_meeting_info_for_mart_str = '''
{
"func_name":"cancel_room_meeting",
"meetingId":"233333"
}
'''
return_booking_info_str = '''
{ "会议室名称":""
"会议ID":"",
"会议主题":""
"会议开始时间":"",
"会议结束时间":"",
}
'''
edit_meeting_info_for_mart_str = '''
{
"func_name":"edit_room_meeting",
"meetingId":"233333",
"start_time":"2025-06-04 09:30:10",
"end_time":"2025-06-04 12:30:10",
}
'''
time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 增强提示词模板
template = f"""你是一个专业的OA会议预订助手请根据以下信息提供服务
现在时间是 :{time_now}
请按以下步骤处理:
1. 当用户需要帮忙预订会议室时,请根据上下文提取用户预订信息,会议室时信息,参数人数,会议主题等,如果没有会议主题则默认主题为'默认主题',提取结果请按照以下的json示例返回,不需要返回其他的多余数据,返回示例如下:
{qry_room_info_for_mart_str}
2. 只有当用户确定要预订某间会议室,而不是在询问合适会议室时,请根据上下文提取用户预订信息,预订时间等信息并返回而不是直接提示预订成功,结果请只返回json格式得预订信息且不需要包含多余的描述内容以及<think>标签等,输出结果示例如下:
{book_for_mart_str}
3. 用户需要查询已有的会议室预订情况或者预订情况时,实时查询,禁止使用历史对话数据,结果请按照以下的json示例返回,不需要返回其他的多余数据,返回示例如下:
{qry_book_info_for_mart_str}
4. 如果用户需要取消预订会议时,分两种情况:
(1)用户未提供要取消的会议号ID,返回已预定会议情况供用户选择,实时查询,禁止使用历史对话中的旧数据!,查询调用如下:
{qry_book_info_for_mart_str} 返回格式规范:{return_booking_info_str}
(2)用户给出了需要取消的会议号ID,如果用户从已预定会议列表中选择了需要取消的会议,则提取会议ID并提示用户是否确认取消取消示例如下:
{cancel_meeting_info_for_mart_str}
5. 如果用户需要修改已有的会议时,请根据上下文提取用户修改预订信息,如果没有获取到会议号ID,开始时间,结束时间,请先用自然对话提示用户提供相应的信息而不是提取json数据。提取结果请按照以下的json示例返回,不需要返回其他的多余数据,返回示例如下:
{edit_meeting_info_for_mart_str}
6. 如果当用户再次请求预订会议室时,请不要按照json格式直接提取用户的预订信息,而是请重新查看现有会议室的最新情况,基于用户需求给用户推荐合理的会议室,推荐选项时不需要提取用户预订信息按照json格式返回,只需要重新给用户推荐选项即可,按照正常自然语言对话返回
7. 如果用户需要解析调用API返回的结果请解析用户提供的相应结果信息并给予自然语言反馈,不需要返回json数据
8. 用户其他需求,请按照自然语言对话返回
"""
return template
'''
消息历史维护
'''
class DialogManager:
def __init__(self):
self.dialogs = {}
self.lock = Lock()
def get_history(self, session_id: str) -> list:
return self.dialogs.get(session_id, [])
def clear_history(self):
logger.info('----开始定时执行清理历史消息------')
self.dialogs.clear()
def add_message(self, session_id, role, content):
with self.lock:
if session_id not in self.dialogs:
self.dialogs[session_id] = []
# 自动维护对话历史
self.dialogs[session_id].append({
"role": role,
"timestamp": datetime.now().isoformat(),
"content": content
})
# 上下文压缩(超长对话处理)
if len(self.dialogs[session_id]) > 50:
self.compress_context(session_id)
def compress_context(self, session_id):
"""对话历史压缩算法"""
history = self.dialogs[session_id]
# 保留最近3条完整记录
recent = history[-5:]
# 摘要生成中间对话内容
summary = self.generate_summary(history[3:-3])
# 重组会话历史
self.dialogs[session_id] = [
{"role": "system", "content": f"对话摘要:{summary}"},
*recent
]
def generate_summary(self, messages):
"""生成对话摘要(调用本地模型)"""
text = "\n".join([f"{m['role']}: {m['content']}" for m in messages])
headers = {}
messages = [
{"role": "system",
"content": "你现在是一个对话总结助手,基于用户与模型智能体的对话,生成对话总结摘要,要求简明且保留关键信息"},
{"role": "user", "content": f"请生成以下对话的简明摘要(保留关键信息):\n\n{text}"}
]
payload = {
"model": MODEL_NAME,
"messages": messages,
}
headers["Authorization"] = f"Bearer {config('MODEL_API_KEY')}"
response = requests.post(f"{BASE_URL}/chat/completions", data=json.dumps(payload, ensure_ascii=False),
timeout=1000, headers=headers, verify=False)
response.raise_for_status()
logger.info(response.text)
return response.json().get("response", "摘要生成失败")
dialog_manager = DialogManager()
scheduler = BackgroundScheduler()
# 每天凌晨1点执行任务,清理缓存中的历史消息
scheduler.add_job(dialog_manager.clear_history, 'cron', hour=1)
scheduler.start()
'''
异步添加上下文处理信息
'''
def add_message_async(manager: DialogManager, session_id: str, role: str, content: str):
thread = threading.Thread(target=manager.add_message, args=(session_id, role, content))
thread.start()
'''
函数调用,修改预订预订
'''
def process_edit_room_meeting(data, params) -> str:
new_data = {
"subject": "会议主题",
"id": data["meetingId"],
"meetingDateTimeStamp": int(datetime.now().timestamp() * 1000),
"startDateTimeStamp": parse_time(data["start_time"]),
"endDateTimeStamp": parse_time(data["end_time"]),
"meetingPersonYhtUserIds": [
],
"remindArray": [
15
],
"content": "会议内容",
"isSendMail": True,
"isSendMsg": True
}
tenant_id = params.get('tenantId', None)
if not tenant_id:
raise Exception("tenantId 不能为空")
access_token = getinfo.get_access_token(tenant_id)
params['access_token'] = access_token
result = getinfo.edit_meeting_info(params, new_data)
book_promot = f'''
系统调用API修改已有的预订会议的结果如下:
{result}
请帮用户解析修改预订会议的结果,并根据结果给予用户相应自然语言反馈
'''
return book_promot
'''
函数调用,处理预订会议室
'''
def process_book_room(data, params) -> str:
result = book_meeting(data=data, params=params)
book_promot = f'''
系统调用API预订该会议室的结果如下:
{result}
请帮用户解析预订会议室的结果如果成功则需要返回会议ID,并根据结果给予用户相应自然语言反馈
'''
return book_promot
def process_query_room(data, params) -> str:
logger.info("current data is :{}".format(data))
result = query_room_info(data=data, params=params)
logger.info("--process_query_room data is :{}".format(result))
book_promot = f'''
系统调用API查询现有空闲会议室信息结果如下:
{result}
解析用户预订需求(时间、人数、设备要求等),name或typeName字段里面可能包含了区域信息,isApprove 代表是否需要审批,注意根据地理信息过滤,给用户推荐合适会议室,请按照自然语言描述返回。
'''
return book_promot
'''
函数调用,查询已经预订会议室详情
'''
def process_query_book_room(**kwargs) -> str:
result = query_booking_info(params=kwargs['params'])
book_promot = f'''
系统调用API查询当前租户下已经预订的会议室的结果如下:
{result}
请帮用户解析已有的预订会议室的结果,请解析所有的会议预订数据,不要省略且结果中需要返回会议ID,并根据结果给予用户相应自然语言反馈
'''
return book_promot
'''
函数调用,取消会议室
'''
def process_cancel_room_meeting(data, params) -> str:
meetingId = data.get("meetingId", None)
if meetingId:
params["meetingId"] = meetingId
result = cancel_meeting(params=params)
book_promot = f'''
系统调用API取消预订会议室的结果如下:
{result}
请帮用户解析取消预订会议室的结果,并根据结果给予用户相应自然语言反馈
'''
else:
dic_data = {
"result": "false",
"message": "未能获取到会议预订ID,请重新尝试取消预订会议室"
}
book_promot = f'''
系统调用API取消预订会议室的结果如下:
{dic_data}
请帮用户解析取消预订会议室的结果,并根据结果给予用户相应自然语言反馈
'''
return book_promot
'''
函数调用map维护
'''
func_tion_call_map = {
"book_room": process_book_room,
"query_room": process_query_room,
"query_booking_info": process_query_book_room,
"cancel_room_meeting": process_cancel_room_meeting,
"edit_room_meeting": process_edit_room_meeting,
}
'''
消息对话入口函数
'''
def process_chat(user_id: str, user_input: str, params: dict):
history = []
query_history = dialog_manager.get_history(user_id)
history.extend(query_history)
prompt = ''
if history is None or len(history) == 0:
prompt = build_prompt(params)
add_message_async(dialog_manager, user_id, 'system', prompt)
add_message_async(dialog_manager, user_id, 'user', user_input)
resp = call_openai_api(model=MODEL_NAME, system_prompt=prompt, user_query=user_input,
api_key=config('MODEL_API_KEY'),
history=history, temperature=config("MODEL_TEMPERATURE", cast=float, default=0.5))
content = resp["choices"][0]["message"]["content"]
reasoning_content = resp["choices"][0]["message"].get('reasoning_content')
logger.info(f"process chat content is : {content}")
new_content = check_and_process_think(content=content)
if 'json' in new_content or is_json(new_content):
# new_content = new_content.replace("json", '')
new_content = new_content.split('```json')[-1].split('```')[0]
new_content = new_content.replace("`", '')
data = json.loads(new_content)
fun_name = data.get("func_name", None)
book_promot = ''
result_false = {'result': False}
if fun_name:
func = func_tion_call_map.get(fun_name, None)
if func:
# 触发函数调用------
logger.info(f"触发系统函数调用==>:{fun_name}")
book_promot = func(data=data, params=params)
logger.info("test point book_promot => {0} ".format(book_promot))
# dialog_manager.add_message(user_id, 'user', book_promot)
else:
book_promot = f'''
未查询到相应的执行函数,结果为:
{result_false}
请帮用户解析结果,并根据结果给予用户相应自然语言反馈
'''
else:
book_promot = f'''
提取用户的函数调用参数失败,结果为:
{result_false}
请帮用户解析结果,并根据结果给予用户相应自然语言反馈
'''
new_history = []
logger.info("test point completed the function call ")
query_history = dialog_manager.get_history(user_id)
new_history.extend(query_history)
resp = call_openai_api(model=MODEL_NAME, user_query=book_promot,
api_key=config('MODEL_API_KEY'),
history=new_history,
system_prompt='',
temperature=config("MODEL_TEMPERATURE", cast=float, default=0.5),
)
content = resp["choices"][0]["message"]["content"]
logger.info("final content => {0}".format(content))
new_content = check_and_process_think(content)
add_message_async(dialog_manager, user_id, 'assistant', content)
return {'response': new_content}
else:
logger.info("no func ")
add_message_async(dialog_manager, user_id, 'assistant', content)
if reasoning_content:
add_message_async(dialog_manager, user_id, 'assistant', reasoning_content)
return {'response': new_content}