feat:代码清理,整改,完善代码注释,

This commit is contained in:
lei_y601
2025-07-23 10:16:48 +08:00
parent bb621c0567
commit 53f53f91dc
3 changed files with 79 additions and 184 deletions

14
.env
View File

@@ -5,20 +5,20 @@ MODEL_BASE_URL=http://10.254.23.128:31205/v1
#模型名称
MODEL_NAME=deepseek_70B
#模型活跃度
MODEL_TEMPERATURE=0.8
MODEL_TEMPERATURE=0.1
#放行host地址
DJANGO_ALLOWED_HOSTS=192.168.237.130,127.0.0.1,10.212.26.130,192.168.31.115
#是否开启debug模式
DJANGO_DEBUG_ENABLE=True
DJANGO_LOG_LEVEL=INFO
DEFAULT_AUTH_URL = 'https://oa-pre.yced.com.cn/iuap-api-auth'
DEFAULT_GATEWAY_URL = 'https://oa-pre.yced.com.cn/iuap-api-gateway '
DEFAULT_AUTH_URL = 'https://oa-pre.cyj.cn/iuap-api-auth'
DEFAULT_GATEWAY_URL = 'https://oa-pre.cyj.cn/iuap-api-gateway '
TEMP_TENANT_ID = 'pr3rl2oj'
#APPKEY = 'e5d615eb169944abb3a1db5bd699ca65'
#APPSECRET = '84bb250c995c6161a97aebc2296abe558f6551ac'
APPKEY = '422b4eb9c97b448689c0d444072a55ad'
APPSECRET = '371f07c8abe30577c608be6ba60198a914f98297'
APPKEY = 'ad73da749fea425283d428b3a764bde8'
APPSECRET = '8cd4139890e14a3b9ae7d56f8b3ca092d81a126d'
DEFAULT_HEADER = {'Content-Type': 'application/json'}
# General Purpose Interface
@@ -37,6 +37,6 @@ EDIT_MEETING = '/yonbip/uspace/external/access/edit'
DEFAULT_PAGESIZE = 30
DEFAULT_QUERY_SIZE = 30
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_HOST=116.196.80.43
REDIS_PORT=30007
REDIS_PASSWORD=xx

View File

@@ -1,11 +1,9 @@
from copy import deepcopy
from threading import Lock
from datetime import datetime, timedelta
import requests, json
import json
from apscheduler.schedulers.background import BackgroundScheduler
from decouple import config
import threading, re
import re
from pydantic_ai.settings import ModelSettings
@@ -21,10 +19,8 @@ DEFAULT_QUERY_SIZE = config('DEFAULT_QUERY_SIZE', cast=int, default=30)
by_user_query_size = 300
error_respones = "请求{0}失败"
'''
解析时间,转化为时间戳
解析时间,转化为时间戳,目前用友所需的为毫秒,默认将时间戳*1000
'''
def parse_time(time_str: str) -> int:
try:
timestamp = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S").timestamp()
@@ -35,10 +31,8 @@ def parse_time(time_str: str) -> int:
'''
判断是否为json格式
字符串工具函数:判断是否为json格式
'''
def is_json(myjson):
try:
json_object = json.loads(myjson)
@@ -47,7 +41,9 @@ def is_json(myjson):
logger.warning("json检查未通过")
return False
'''
时间工具函数进行按天叠加的功能例如获取3天后的时间
'''
def get_time_info(offset: int) -> int:
now = datetime.now()
if offset > 0:
@@ -60,7 +56,6 @@ def get_time_info(offset: int) -> int:
查询租户下所有会议室类型
'''
def query_room_type(params: dict) -> str:
"""
query_params:
@@ -86,10 +81,8 @@ def query_room_type(params: dict) -> str:
'''
查询租户下会议室信息
查询租户下会议室信息,没有过滤
'''
def query_room_info(data: dict, params: dict) -> str:
# resp = requests.get('http://127.0.0.1:8000/myapi/room/')
"""
@@ -138,7 +131,10 @@ def query_room_info(data: dict, params: dict) -> str:
return error_respones.format("查询会议室信息")
'''
查询可用会议室信息,
包含过滤掉未启用的会议室
'''
def query_avali_room(data: dict,params: dict,start_time=None,end_time=None) -> str:
logger.info(f"query_avali_room params : {params}")
time_exist = False
@@ -253,10 +249,8 @@ def query_booking_info_by_user(params: dict):
return error_respones.format(f"查询指定用户{user_id}已预定会议")
'''
预定会议
预定会议入口
'''
def book_meeting(params: dict, data: dict) -> str:
"""
data {
@@ -323,13 +317,8 @@ def book_meeting(params: dict, data: dict) -> str:
'''
会议编辑
'''
def edit_meeting(params: dict, data: dict):
"""
:params: {tenant_id}
会议编辑,所需参数如下
params: {tenant_id}
:data {
id (nor null) string
subject string
@@ -342,7 +331,11 @@ def edit_meeting(params: dict, data: dict):
remindArray 提醒时间枚举值仅有51520306014404320
content
}
"""
'''
def edit_meeting(params: dict, data: dict):
try:
tenant_id = params.get('tenantId', None)
access_token = getinfo.get_access_token(tenant_id)
@@ -360,8 +353,6 @@ def edit_meeting(params: dict, data: dict):
'''
第二个会议取消接口
'''
def cancel_meeting(params: dict) -> str:
"""
params{
@@ -385,29 +376,15 @@ def cancel_meeting(params: dict) -> str:
logger.error("location: cancel_meeting => {0}".format(str(ex)))
return error_respones.format("取消会议")
'''
去除对话中的<think>标签,某些深度思考智能体,包含<think>标签,需要去除
'''
def check_and_process_think(content: str) -> str:
filtered_text = re.sub(r"<think\b[^>]*>.*?</think>", '', content, flags=re.DOTALL)
filtered_text = re.sub(r'^.*?</think>', '', filtered_text, flags=re.DOTALL)
return filtered_text
def map_meetingname_to_id(params: dict):
map_info = []
data = {}
try:
room_info = json.loads(query_room_info(data=data, params=params))
if isinstance(room_info, str):
raise Exception(f"get room_info error:{room_info}")
for room in room_info:
room_id = room.get('id')
room_name = room.get('name')
map_info.append({room_id: room_name})
logger.info(f"map_info => {map_info}")
return map_info
except Exception as ex:
logger.error("location: map_meetingname_to_id => {0}".format(str(ex)))
return error_respones.format("获取会议室名称和会议室Id的映射")
@@ -418,14 +395,6 @@ def map_meetingname_to_id(params: dict):
def build_prompt(params,map_meetingname_to_id:list):
"""构建增强提示词"""
qry_room_info_for_mart_str = '''
{
"func_name":"query_room",
"capacity":20,
"Region":""
}
'''
qry_avail_meetingroom_for_mart_str = '''
{
@@ -447,12 +416,6 @@ def build_prompt(params,map_meetingname_to_id:list):
"func_name":"book_room"
}
'''
qry_book_info_for_mart_str = '''
{
"func_name":"query_booking_info"
}
'''
qry_user_book_info_for_mart_str = '''
{
"func_name":"query_user_booking_info"
@@ -554,104 +517,16 @@ def build_prompt(params,map_meetingname_to_id:list):
return template
'''
消息历史维护
'''
class DialogManager:
def __init__(self):
self.dialogs = {}
self.lock = Lock()
def get_history(self, session_id: str) -> list:
return self.dialogs.get(session_id, [])
def clear_history(self):
logger.info('----开始定时执行清理历史消息------')
self.dialogs.clear()
def cut_msg(self, session_id: str):
if (len(self.dialogs.get(session_id, [])) > 20):
his = self.get_history(session_id)
self.set_msg(session_id, his[-5:])
def set_msg(self, session_id: str, data:list):
self.dialogs[session_id] = data
self.cut_msg(session_id)
def add_message2(self, session_id, content):
self.dialogs[session_id].append(content)
self.cut_msg(session_id)
def add_message(self, session_id, role, content):
with self.lock:
if session_id not in self.dialogs:
self.dialogs[session_id] = []
# 自动维护对话历史
self.dialogs[session_id].append({
"role": role,
"timestamp": datetime.now().isoformat(),
"content": content
})
# 上下文压缩(超长对话处理)
if len(self.dialogs[session_id]) > 15:
self.compress_context(session_id)
def compress_context(self, session_id):
"""对话历史压缩算法"""
history = self.dialogs[session_id]
# 保留最近3条完整记录
recent = history[-5:]
# 摘要生成中间对话内容
# summary = self.generate_summary(history[3:-3])
new_his = [history[0]]
new_his.extend(recent)
# 重组会话历史
self.dialogs[session_id] = new_his
def generate_summary(self, messages):
"""生成对话摘要(调用本地模型)"""
text = "\n".join([f"{m['role']}: {m['content']}" for m in messages])
headers = {}
messages = [
{"role": "system",
"content": "你现在是一个对话总结助手,基于用户与模型智能体的对话,生成对话总结摘要,要求简明且保留关键信息"},
{"role": "user", "content": f"请生成以下对话的简明摘要(保留关键信息):\n\n{text}"}
]
payload = {
"model": MODEL_NAME,
"messages": messages,
}
headers["Authorization"] = f"Bearer {config('MODEL_API_KEY')}"
response = requests.post(f"{BASE_URL}/chat/completions", data=json.dumps(payload, ensure_ascii=False),
timeout=1000, headers=headers, verify=False)
response.raise_for_status()
logger.info(response.text)
return response.json().get("response", "摘要生成失败")
dialog_manager = DialogManager()
scheduler = BackgroundScheduler()
# 每天凌晨1点执行任务,清理缓存中的历史消息
scheduler.add_job(dialog_manager.clear_history, 'cron', hour=1)
scheduler.start()
'''
异步添加上下文处理信息
'''
def add_message_async(manager: DialogManager, session_id: str, role: str, content: str):
thread = threading.Thread(target=manager.add_message, args=(session_id, role, content))
thread.start()
'''
函数调用,修改预订预订
函数调用,修改预订会议室信息,
目前主要修改主题,开始-结束时间等
'''
def process_edit_room_meeting(data, params) -> tuple:
new_data = {
"subject": data.get('subject','系统默认主题'),
@@ -685,10 +560,9 @@ def process_edit_room_meeting(data, params) -> tuple:
'''
函数调用,处理预订会议室
函数调用,处理预订会议室接口
触发预定会议室
'''
def process_book_room(data, params) -> tuple:
result = book_meeting(data=data, params=params)
book_promot = f'''
@@ -700,7 +574,10 @@ def process_book_room(data, params) -> tuple:
'''
return False, book_promot
'''
查询会议室信息,没有过滤功能,默认查询租户下的会议室
'''
def process_query_room(data, params) -> tuple:
logger.info("current data is :{}".format(data))
result = query_avali_room(data=data, params=params)
@@ -724,8 +601,6 @@ def process_query_room(data, params) -> tuple:
'''
函数调用,查询已经预订会议室详情
'''
def process_query_book_room(**kwargs) -> tuple:
result = query_booking_info(params=kwargs['params'])
book_promot = f'''
@@ -737,7 +612,7 @@ def process_query_book_room(**kwargs) -> tuple:
'''
查询指定用户的已预定信息
查询指定用户的已预定信息,基于用户信息过滤该用户预定的会议室
'''
def process_user_query_book_room(**kwargs) -> tuple:
result = query_booking_info_by_user(params=kwargs['params'])
@@ -750,10 +625,9 @@ def process_user_query_book_room(**kwargs) -> tuple:
'''
函数调用,取消会议室
函数调用,
取消会议室
'''
def process_cancel_room_meeting(data, params) -> tuple:
meetingId = data.get("meetingId", None)
if meetingId:
@@ -778,7 +652,10 @@ def process_cancel_room_meeting(data, params) -> tuple:
'''
return False, book_promot
'''
查询现有的可用的会议室信息,
基于开始与结束时间查询过滤被占用的会议室
'''
def process_query_avali_room(data, params) -> tuple:
logger.info("current data is :{}".format(data))
start_time = data.get("start_time", None)
@@ -803,6 +680,8 @@ def process_query_avali_room(data, params) -> tuple:
'''
函数调用map维护
通过名称查询对应的function
函数参数要求: data :{},param :{}
'''
func_tion_call_map = {
"book_room": process_book_room,
@@ -814,7 +693,9 @@ func_tion_call_map = {
"query_user_booking_info":process_user_query_book_room
}
'''
提取智能体返回消息中的json数据
'''
def extract_json_blocks(text):
json_blocks = re.findall(r'\{[^{}]*\}', text)
valid_blocks = []
@@ -838,10 +719,10 @@ from pydantic_ai.models import cached_async_http_client
from pydantic_ai.models.openai import OpenAIModel
from pydantic_ai.providers.openai import OpenAIProvider
#构建一个模型的入口,包含基本参数
http_client = cached_async_http_client(provider='openai',connect=8)
provider = OpenAIProvider(api_key=config('MODEL_API_KEY'), base_url=BASE_URL,http_client=http_client)
model = OpenAIModel(MODEL_NAME, provider=provider)
#map_meetingname_to_id_list=map_meetingname_to_id({'tenantId':config('TEMP_TENANT_ID')})
agent = Agent(model)
#seed 作用:控制面对相同问题,输出尽量保持一致,越大一致性越强
model_setting=ModelSettings(
@@ -850,11 +731,18 @@ model_setting=ModelSettings(
)
'''
动态构建智能体提示词,因为模板中有日期,所以需要实时动态构建
确保日期是最新状态
'''
@agent.system_prompt(dynamic=True)
def gen_system_prompt()->str:
return build_prompt({},[])
'''
消息对话入口,处理智能体与用户对话交互
包括历史记录的维护
'''
def process_chat(covers_id:str,user_id: str, user_input: str, params: dict):
history = []
@@ -881,7 +769,6 @@ def process_chat(covers_id:str,user_id: str, user_input: str, params: dict):
fun_name = data.get("func_name", None)
book_promot = ''
result_false = {'result': False}
add_to_content=False
if fun_name:
func = func_tion_call_map.get(fun_name, None)
if func:

View File

@@ -6,6 +6,12 @@ from pydantic_ai.messages import ModelMessagesTypeAdapter, ModelRequest, ModelRe
from pydantic_core import to_jsonable_python
logger = logging.getLogger('django')
# 智能体会话历史消息长度控制参数
history_len=config('MODEL_HISTORY_LENGTH', cast=int, default=15)
'''
redis 连接池
'''
pool = redis.ConnectionPool(
host=config('REDIS_HOST', cast=str, default='localhost'),
port=config('REDIS_PORT', cast=int, default=6379),
@@ -14,8 +20,11 @@ pool = redis.ConnectionPool(
encoding='utf-8',
password=config('REDIS_PASSWORD', cast=str, default=''),
)
# 默认的预定会议室智能体 历史消息缓存的前缀 key
default_redis_covers_id_prefix = 'yj_room:agent:'
'''
测试使用的工具函数,打印历史消息
'''
def print_his(key,r):
es = r.lrange(key, 0, -1)
data=[json.loads(i) for i in es]
@@ -25,14 +34,14 @@ def print_his(key,r):
logger.info(f"----------{len(data)}-----------")
'''
裁剪历史消息 大于20条默认取后第一条和后五条
通过lua脚本执行保证redis 多个操作的事务
裁剪历史消息 大于15条默认取后第一条和后五条一般智能体对话不建议超过15条
如果智能体对应的大模型的 上下文长度不够,建议缩短历史消息长度,根据实际情况配置吧
'''
def cut_history(covers_id: str, r):
key = default_redis_covers_id_prefix + covers_id
if r.llen(key) > 20:
logger.info("消息超过设定阈值,开始裁剪消息。。。")
if r.llen(key) >= history_len:
logger.info(f"消息超过设定阈值:{history_len},开始裁剪消息....")
script='''
local msg = redis.call('lindex', KEYS[1],0)
redis.call('ltrim', KEYS[1],-5,-1)
@@ -43,9 +52,9 @@ def cut_history(covers_id: str, r):
'''
根据会话ID 获取历史消息列表
validate_python 将str 类型 反序列化成框架所需的对话列表
'''
def get_history(covers_id: str) -> list:
key = default_redis_covers_id_prefix + covers_id
r = redis.Redis(connection_pool=pool)
@@ -57,10 +66,11 @@ def get_history(covers_id: str) -> list:
'''
设置历史消息
设置历史消息,以覆盖的方式,重新设置,针对初次对话等
通过lua脚本执行保证redis 多个操作的事务
默认设置过期时间为2天
to_jsonable_python 将对象数据转化成 python json 数据
'''
def set_history(covers_id: str, data):
key = default_redis_covers_id_prefix + covers_id
r = redis.Redis(connection_pool=pool)
@@ -72,10 +82,8 @@ def set_history(covers_id: str, data):
r.eval(script,1,key,*[json.dumps(i, ensure_ascii=False) for i in to_jsonable_python(data)])
cut_history(covers_id, r)
'''
添加历史消息
添加历史消息,往消息列表中追加一条历史消息,增量操作
'''
def append_message(covers_id: str, data):
key = default_redis_covers_id_prefix + covers_id
r = redis.Redis(connection_pool=pool)