Files
yj_resume/service/file_service.py
2025-12-06 17:39:21 +08:00

141 lines
5.0 KiB
Python

import json
from pymupdf import message
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
import uuid
from datetime import datetime
from decouple import config
import pathlib
from fastapi import File, UploadFile
from typing import List
import os
import asyncio
import logging
from service.parse_resume2_doc import extra_resume
logger = logging.getLogger(__name__)
#BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
def check_and_create_directory(files):
logger.info("check_and_create_directory in service")
# 先创建一个task
if not files or len(files) == 0:
logger.warning("check_and_create_directory is empty")
return None
id = str(uuid.uuid4())
current_time = datetime.now()
# 格式化时间为字符串
formatted_time = current_time.strftime("%Y-%m-%d-%H-%M-%S")
task = DBTASK(id=id, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
fail_num=0,name=f"解析任务({formatted_time})")
session = SqliteSqlalchemy().session
try:
session.add(task)
session.commit()
except Exception as e:
print(f"Failed to save DBTASK info error {e}")
session.rollback()
return None
finally:
session.close()
return id
async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
logger.info(f"upload_and_save_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathxx.mkdir(parents=True, exist_ok=True)
data = []
for file in files:
name, fix = os.path.splitext(file.filename)
id = str(uuid.uuid4())
if fix not in ['.doc', '.docx']:
continue
with open(pathxx.joinpath(id + fix), 'wb') as f:
file_content = await file.read()
f.write(file_content)
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
session = SqliteSqlalchemy().session
try:
session.bulk_save_objects(data)
session.commit()
except Exception as e:
print(f"Failed to save DBRESUME error {e}")
session.rollback()
return False, f"Failed to save DBRESUME error {e}"
finally:
session.close()
return True, "success"
def fetch_files(dir_id) -> (bool, str):
logger.info(f"start fetching files task {dir_id} in service")
if not os.path.exists(BASE_PATH):
logger.info(f"目录{BASE_PATH}不存在")
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
file_extensions = ['.docx', '.doc']
files_list = []
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
for root,dirs,files in os.walk(dir_path):
for file in files:
_,ext = os.path.splitext(file)
if file_extensions and ext not in file_extensions:
logger.error(f"文件{file}格式不符合预期")
continue
file_path = os.path.join(root,file)
if os.path.isfile(file_path):
files_list.append(file_path)
else:
logger.error(f"路径下{file_path}不是文件")
update_success_mapping = []
update_fail_mapping = []
for file in files_list:
logger.info(f"file is {file} {os.path.basename(file)}")
file_name = os.path.basename(file)
id = os.path.splitext(file_name)[0]
result = extra_resume(file)
result = json.dumps(result, ensure_ascii=False)
logger.info(f"result type is {type(result)}")
logger.info(f"file content is {result}")
if not result:
logger.warning(f"file {file_name} 提取为空")
update_fail_mapping.append({'id':id, 'status':0,
'message': f"task {dir_id} => file {file_name} 提取为空"})
continue
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
session = SqliteSqlalchemy().session
logger.info(f"update success mapping => {update_success_mapping}")
logger.info(f"update fail mapping => {update_fail_mapping}")
success_num = len(update_success_mapping)
fail_num = len(update_fail_mapping)
try:
update_data = update_success_mapping + update_fail_mapping
session.bulk_update_mappings(DBRESUME, update_data)
if update_fail_mapping:
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
else:
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
'success_num': success_num, 'fail_num': fail_num}])
session.commit()
except Exception as e:
logger.error(f"update failed => task {dir_id} error {e}")
session.rollback()
return False, f"Failed to update DBRESUME error {e}"
finally:
session.close()
return True, 'success'