104 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			104 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						||
from typing import Optional
 | 
						||
 | 
						||
from orjson import orjson
 | 
						||
logger = logging.getLogger(__name__)
 | 
						||
keywords = {
 | 
						||
    # "gender":{"1":"男","2":"女"},
 | 
						||
    "person_status":{"1":"草稿","2":"审批中","3":"制卡中","4":"已入库","5":"停用"},
 | 
						||
    "pass_type":{"1":"集团公司员工","2":"借调人员","3":"借用人员","4":"外部监管人员","5":"外协服务人员","6":"工勤人员","7":"来访人员"},
 | 
						||
    "person_type": {"YG":"正式员工","PQ":"劳务派遣人员","QT":"其他柔性引进人员","WHZ":"合作单位","WLS":"临时访客","WQT":"其他外部人员"},
 | 
						||
    "id_card_type":{"1":"身份证","2":"护照","3":"港澳通行证"},
 | 
						||
    "highest_education": {"1":"初中","2":"高中","3":"中专","4":"技校","5":"职高","6":"大专","7":"本科","8":"硕士","9":"博士"},
 | 
						||
    "highest_degree":{"1":"学士学位","2":"硕士学位","3":"博士学位","4":"无"},
 | 
						||
}
 | 
						||
 | 
						||
def check_and_get_sql(res: str) -> str:
 | 
						||
    json_str = extract_nested_json(res)
 | 
						||
    if json_str is None:
 | 
						||
        raise Exception(orjson.dumps({'message': 'Cannot parse sql from answer',
 | 
						||
                                      'traceback': "Cannot parse sql from answer:\n" + res}).decode())
 | 
						||
    sql: str
 | 
						||
    data: dict
 | 
						||
    try:
 | 
						||
        print("check_and_get_sql1----------------------------")
 | 
						||
        data = orjson.loads(json_str)
 | 
						||
 | 
						||
        if data['success']:
 | 
						||
            sql = data['sql']
 | 
						||
            return sql
 | 
						||
        else:
 | 
						||
            print("check_and_get_sql2----------------------------")
 | 
						||
            message = data['message']
 | 
						||
            raise Exception(message)
 | 
						||
    except Exception as e:
 | 
						||
        print("check_and_get_sql3----------------------------")
 | 
						||
        raise e
 | 
						||
    except Exception:
 | 
						||
        print("check_and_get_sql4----------------------------")
 | 
						||
        raise Exception(orjson.dumps({'message': 'Cannot parse sql from answer',
 | 
						||
                                      'traceback': "Cannot parse sql from answer:\n" + res}).decode())
 | 
						||
 | 
						||
 | 
						||
def extract_nested_json(text):
 | 
						||
    stack = []
 | 
						||
    start_index = -1
 | 
						||
    results = []
 | 
						||
    if not text:
 | 
						||
        logger.warning("extract_nested_json: text is empty")
 | 
						||
    for i, char in enumerate(text):
 | 
						||
        if char in '{[':
 | 
						||
            if not stack:  # 记录起始位置
 | 
						||
                start_index = i
 | 
						||
            stack.append(char)
 | 
						||
        elif char in '}]':
 | 
						||
            if stack and ((char == '}' and stack[-1] == '{') or (char == ']' and stack[-1] == '[')):
 | 
						||
                stack.pop()
 | 
						||
                if not stack:  # 栈空时截取完整JSON
 | 
						||
                    json_str = text[start_index:i + 1]
 | 
						||
                    try:
 | 
						||
                        orjson.loads(json_str)  # 验证有效性
 | 
						||
                        results.append(json_str)
 | 
						||
                    except:
 | 
						||
                        pass
 | 
						||
            else:
 | 
						||
                stack = []  # 括号不匹配则重置
 | 
						||
    if len(results) > 0 and results[0]:
 | 
						||
        return results[0]
 | 
						||
    return None
 | 
						||
 | 
						||
 | 
						||
def get_chart_type_from_sql_answer(res: str) -> Optional[str]:
 | 
						||
    json_str = extract_nested_json(res)
 | 
						||
    if json_str is None:
 | 
						||
        return None
 | 
						||
    chart_type: Optional[str]
 | 
						||
    data: dict
 | 
						||
    try:
 | 
						||
        data = orjson.loads(json_str)
 | 
						||
 | 
						||
        if data['success']:
 | 
						||
            chart_type = data['chart-type']
 | 
						||
        else:
 | 
						||
            return None
 | 
						||
    except Exception:
 | 
						||
        return None
 | 
						||
    return chart_type
 | 
						||
 | 
						||
 | 
						||
def deal_result(data: list) -> list:
 | 
						||
    try:
 | 
						||
        for item in data:
 | 
						||
            for key, map_value in keywords.items():
 | 
						||
                if key in item:
 | 
						||
                    new_key = item.get(key)
 | 
						||
                    if new_key in map_value:
 | 
						||
                        item[key] = map_value[new_key]
 | 
						||
        print("data----------{0}".format(data))
 | 
						||
        return data
 | 
						||
    except Exception as e:
 | 
						||
        print("1111111111111111111111111111111111111111",e)
 | 
						||
        raise Exception(f"sql执行结果处理失败:{str(e)}")
 | 
						||
 | 
						||
 |