141 lines
5.0 KiB
Python
141 lines
5.0 KiB
Python
import json
|
|
|
|
from pymupdf import message
|
|
|
|
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
|
|
import uuid
|
|
from datetime import datetime
|
|
from decouple import config
|
|
import pathlib
|
|
from fastapi import File, UploadFile
|
|
from typing import List
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
|
|
from service.parse_resume2_doc import extra_resume
|
|
|
|
logger = logging.getLogger(__name__)
|
|
#BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
|
|
|
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
|
|
|
|
|
def check_and_create_directory(files):
|
|
logger.info("check_and_create_directory in service")
|
|
# 先创建一个task
|
|
if not files or len(files) == 0:
|
|
logger.warning("check_and_create_directory is empty")
|
|
return None
|
|
id = str(uuid.uuid4())
|
|
current_time = datetime.now()
|
|
# 格式化时间为字符串
|
|
formatted_time = current_time.strftime("%Y-%m-%d-%H-%M-%S")
|
|
task = DBTASK(id=id, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
|
|
fail_num=0,name=f"解析任务({formatted_time})")
|
|
|
|
session = SqliteSqlalchemy().session
|
|
try:
|
|
session.add(task)
|
|
session.commit()
|
|
except Exception as e:
|
|
print(f"Failed to save DBTASK info error {e}")
|
|
session.rollback()
|
|
return None
|
|
finally:
|
|
session.close()
|
|
return id
|
|
|
|
|
|
async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
|
|
logger.info(f"upload_and_save_file in service dir_id {dir_id}")
|
|
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
|
pathxx.mkdir(parents=True, exist_ok=True)
|
|
data = []
|
|
for file in files:
|
|
name, fix = os.path.splitext(file.filename)
|
|
id = str(uuid.uuid4())
|
|
if fix not in ['.doc', '.docx']:
|
|
continue
|
|
with open(pathxx.joinpath(id + fix), 'wb') as f:
|
|
file_content = await file.read()
|
|
f.write(file_content)
|
|
|
|
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
|
|
session = SqliteSqlalchemy().session
|
|
try:
|
|
session.bulk_save_objects(data)
|
|
session.commit()
|
|
except Exception as e:
|
|
print(f"Failed to save DBRESUME error {e}")
|
|
session.rollback()
|
|
return False, f"Failed to save DBRESUME error {e}"
|
|
finally:
|
|
session.close()
|
|
return True, "success"
|
|
|
|
def fetch_files(dir_id) -> (bool, str):
|
|
|
|
logger.info(f"start fetching files task {dir_id} in service")
|
|
if not os.path.exists(BASE_PATH):
|
|
logger.info(f"目录{BASE_PATH}不存在")
|
|
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
|
|
file_extensions = ['.docx', '.doc']
|
|
files_list = []
|
|
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
|
for root,dirs,files in os.walk(dir_path):
|
|
for file in files:
|
|
_,ext = os.path.splitext(file)
|
|
if file_extensions and ext not in file_extensions:
|
|
logger.error(f"文件{file}格式不符合预期")
|
|
continue
|
|
file_path = os.path.join(root,file)
|
|
if os.path.isfile(file_path):
|
|
files_list.append(file_path)
|
|
else:
|
|
logger.error(f"路径下{file_path}不是文件")
|
|
update_success_mapping = []
|
|
update_fail_mapping = []
|
|
for file in files_list:
|
|
logger.info(f"file is {file} {os.path.basename(file)}")
|
|
file_name = os.path.basename(file)
|
|
id = os.path.splitext(file_name)[0]
|
|
result = extra_resume(file)
|
|
result = json.dumps(result, ensure_ascii=False)
|
|
logger.info(f"result type is {type(result)}")
|
|
logger.info(f"file content is {result}")
|
|
if not result:
|
|
logger.warning(f"file {file_name} 提取为空")
|
|
update_fail_mapping.append({'id':id, 'status':0,
|
|
'message': f"task {dir_id} => file {file_name} 提取为空"})
|
|
continue
|
|
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
|
|
session = SqliteSqlalchemy().session
|
|
logger.info(f"update success mapping => {update_success_mapping}")
|
|
logger.info(f"update fail mapping => {update_fail_mapping}")
|
|
success_num = len(update_success_mapping)
|
|
fail_num = len(update_fail_mapping)
|
|
try:
|
|
update_data = update_success_mapping + update_fail_mapping
|
|
session.bulk_update_mappings(DBRESUME, update_data)
|
|
|
|
if update_fail_mapping:
|
|
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
|
|
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
|
|
else:
|
|
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
|
|
'success_num': success_num, 'fail_num': fail_num}])
|
|
session.commit()
|
|
except Exception as e:
|
|
logger.error(f"update failed => task {dir_id} error {e}")
|
|
session.rollback()
|
|
return False, f"Failed to update DBRESUME error {e}"
|
|
finally:
|
|
session.close()
|
|
|
|
return True, 'success'
|
|
|
|
|
|
|
|
|