import json from pymupdf import message from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy import uuid from datetime import datetime from decouple import config import pathlib from fastapi import File, UploadFile from typing import List import os import asyncio import logging from service.parse_resume2_doc import extra_resume logger = logging.getLogger(__name__) BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//') def check_and_create_directory(files): logger.info("check_and_create_directory in service") # 先创建一个task if not files or len(files) == 0: return None id = str(uuid.uuid4()) current_time = datetime.now() # 格式化时间为字符串 formatted_time = current_time.strftime("%Y-%m-%d-%H-%M-%S") task = DBTASK(id=id, create_time=datetime.now(), status=0, success_num=0, total_num=len(files), fail_num=0,name=f"解析任务({formatted_time})") session = SqliteSqlalchemy().session try: session.add(task) session.commit() except Exception as e: print(f"Failed to save DBTASK info error {e}") session.rollback() return None finally: session.close() return id async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str): logger.info(f"upload_and_save_file in service dir_id {dir_id}") pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id) pathxx.mkdir(parents=True, exist_ok=True) data = [] for file in files: name, fix = os.path.splitext(file.filename) id = str(uuid.uuid4()) if fix not in ['.doc', '.docx']: continue with open(pathxx.joinpath(id + fix), 'wb') as f: file_content = await file.read() f.write(file_content) data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix)) session = SqliteSqlalchemy().session try: session.bulk_save_objects(data) session.commit() except Exception as e: print(f"Failed to save DBRESUME error {e}") session.rollback() return False, f"Failed to save DBRESUME error {e}",[] finally: session.close() return True, "success" async def fetch_files(dir_id) -> (bool, str): logger.info(f"start fetching files task {dir_id} in service") if not os.path.exists(BASE_PATH): logger.info(f"目录{BASE_PATH}不存在") return None file_extensions = ['.docx', '.doc'] files_list = [] dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id) for root,dirs,files in os.walk(dir_path): for file in files: _,ext = os.path.splitext(file) if file_extensions and ext not in file_extensions: logger.error(f"文件{file}格式不符合预期") continue file_path = os.path.join(root,file) if os.path.isfile(file_path): files_list.append(file_path) else: logger.error(f"路径下{file_path}不是文件") update_success_mapping = [] update_fail_mapping = [] for file in files_list: logger.info(f"file is {file} {os.path.basename(file)}") file_name = os.path.basename(file) id = os.path.splitext(file_name)[0] result = extra_resume(file) result = json.dumps(result, ensure_ascii=False) logger.info(f"result type is {type(result)}") logger.info(f"file content is {result}") if not result: logger.warning(f"file {file_name} 提取为空") update_fail_mapping.append({'id':id, 'status':0, 'message': f"task {dir_id} => file {file_name} 提取为空"}) continue update_success_mapping.append({'id':id, 'status':1,'data_info': result}) session = SqliteSqlalchemy().session logger.info(f"update success mapping => {update_success_mapping}") logger.info(f"update fail mapping => {update_fail_mapping}") success_num = len(update_success_mapping) fail_num = len(update_fail_mapping) try: update_data = update_success_mapping + update_fail_mapping session.bulk_update_mappings(DBRESUME, update_data) if update_fail_mapping: session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num, 'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}]) else: session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1, 'success_num': success_num, 'fail_num': fail_num}]) session.commit() except Exception as e: logger.error(f"update failed => task {dir_id} error {e}") session.rollback() return False, f"Failed to update DBRESUME error {e}" finally: session.close() return True, 'success'