Files
sqlbot_agent/util/utils.py

104 lines
3.8 KiB
Python
Raw Normal View History

2025-10-15 16:52:35 +08:00
import logging
2025-09-23 14:49:00 +08:00
from typing import Optional
from orjson import orjson
2025-10-15 16:52:35 +08:00
logger = logging.getLogger(__name__)
2025-09-25 16:49:25 +08:00
keywords = {
# "gender":{"1":"男","2":"女"},
2025-09-25 16:49:25 +08:00
"person_status":{"1":"草稿","2":"审批中","3":"制卡中","4":"已入库","5":"停用"},
"pass_type":{"1":"集团公司员工","2":"借调人员","3":"借用人员","4":"外部监管人员","5":"外协服务人员","6":"工勤人员","7":"来访人员"},
"person_type": {"YG":"正式员工","PQ":"劳务派遣人员","QT":"其他柔性引进人员","WHZ":"合作单位","WLS":"临时访客","WQT":"其他外部人员"},
"id_card_type":{"1":"身份证","2":"护照","3":"港澳通行证"},
"highest_education": {"1":"初中","2":"高中","3":"中专","4":"技校","5":"职高","6":"大专","7":"本科","8":"硕士","9":"博士"},
"highest_degree":{"1":"学士学位","2":"硕士学位","3":"博士学位","4":""},
}
2025-09-23 14:49:00 +08:00
def check_and_get_sql(res: str) -> str:
json_str = extract_nested_json(res)
if json_str is None:
raise Exception(orjson.dumps({'message': 'Cannot parse sql from answer',
'traceback': "Cannot parse sql from answer:\n" + res}).decode())
sql: str
data: dict
try:
2025-09-25 16:49:25 +08:00
print("check_and_get_sql1----------------------------")
2025-09-23 14:49:00 +08:00
data = orjson.loads(json_str)
if data['success']:
sql = data['sql']
return sql
else:
2025-09-25 16:49:25 +08:00
print("check_and_get_sql2----------------------------")
2025-09-23 14:49:00 +08:00
message = data['message']
raise Exception(message)
except Exception as e:
2025-09-25 16:49:25 +08:00
print("check_and_get_sql3----------------------------")
2025-09-23 14:49:00 +08:00
raise e
except Exception:
2025-09-25 16:49:25 +08:00
print("check_and_get_sql4----------------------------")
2025-09-23 14:49:00 +08:00
raise Exception(orjson.dumps({'message': 'Cannot parse sql from answer',
'traceback': "Cannot parse sql from answer:\n" + res}).decode())
def extract_nested_json(text):
stack = []
start_index = -1
results = []
2025-10-15 16:52:35 +08:00
if not text:
logger.warning("extract_nested_json: text is empty")
2025-09-23 14:49:00 +08:00
for i, char in enumerate(text):
if char in '{[':
if not stack: # 记录起始位置
start_index = i
stack.append(char)
elif char in '}]':
if stack and ((char == '}' and stack[-1] == '{') or (char == ']' and stack[-1] == '[')):
stack.pop()
if not stack: # 栈空时截取完整JSON
json_str = text[start_index:i + 1]
try:
orjson.loads(json_str) # 验证有效性
results.append(json_str)
except:
pass
else:
stack = [] # 括号不匹配则重置
if len(results) > 0 and results[0]:
return results[0]
return None
def get_chart_type_from_sql_answer(res: str) -> Optional[str]:
json_str = extract_nested_json(res)
if json_str is None:
return None
chart_type: Optional[str]
data: dict
try:
data = orjson.loads(json_str)
if data['success']:
chart_type = data['chart-type']
else:
return None
except Exception:
return None
return chart_type
2025-09-25 16:49:25 +08:00
def deal_result(data: list) -> list:
try:
for item in data:
for key, map_value in keywords.items():
if key in item:
new_key = item.get(key)
if new_key in map_value:
item[key] = map_value[new_key]
2025-09-25 16:49:25 +08:00
print("data----------{0}".format(data))
return data
except Exception as e:
print("1111111111111111111111111111111111111111",e)
2025-09-25 16:49:25 +08:00
raise Exception(f"sql执行结果处理失败{str(e)}")