Compare commits

8 Commits
master ... dev

Author SHA1 Message Date
yujj128
b383a52bdc 导出excel 2025-12-12 15:31:14 +08:00
雷雨
5e70e79365 feat:处理doc转docx 2025-12-10 15:25:16 +08:00
雷雨
e057917151 feat:更新任务状态 2025-12-10 11:02:20 +08:00
雷雨
50f3ab9438 feat:dockerfile修改 2025-12-10 10:54:56 +08:00
雷雨
0f7a32270b feat:bug修复 2025-12-10 10:51:19 +08:00
雷雨
f32104994d feat:增加文件格式化,下载,压缩 2025-12-10 10:48:39 +08:00
yujj128
15d778fdb9 提取简历 2025-12-10 10:33:25 +08:00
yujj128
fba18a0cd6 去包 2025-12-08 18:32:20 +08:00
12 changed files with 634 additions and 130 deletions

View File

@@ -3,7 +3,7 @@ WORKDIR /app
COPY . /app
ENV TZ=Asia/Shanghai \
LANG=C.UTF-8
RUN rm -rf logs .git .idea .venv && apt-get update && apt-get install -y vim curl sqlite3 && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
RUN mkdir -p /app/uploads
RUN rm -rf logs .git .idea .venv && apt-get update && apt-get install -y pandoc vim curl sqlite3 && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
RUN mkdir -p /app/uploads && mkdir -p /app/zip
EXPOSE 3006
CMD ["python", "main.py"]

View File

@@ -4,14 +4,15 @@ from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
from decouple import config
DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
# DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
DB_PATH = config('DB_PATH', default='D://PycharmProject//yj_resume//main.sqlite3')
class DBTASK(Base):
__tablename__ = 'db_task'
id = Column(String(100), primary_key=True)
name = Column(String(100), nullable=False)
create_time = Column(DateTime, nullable=False, )
task_type = Column(String(20), nullable=False, )
# 0 代表待执行1 成功2 失败
status = Column(Integer, nullable=False, default=0)
success_num = Column(Integer, nullable=False, default=0)
@@ -33,6 +34,19 @@ class DBRESUME(Base):
# 错误信息等
message = Column(Text, nullable=True)
class DBEXCEL(Base):
__tablename__ = 'db_excel'
# 每个任务对应一个文件夹ID
id = Column(String(100), primary_key=True)
# 0 代表待执行1 成功2 失败
status = Column(Integer, nullable=False, default=0)
file_name = Column(String(100), nullable=True)
# 可以用json表示提取的数据
excel_info = Column(Text, nullable=True)
# 错误信息等
message = Column(Text, nullable=True)
class SqliteSqlalchemy(object):
def __init__(self):

52
main.py
View File

@@ -2,40 +2,45 @@ from fastapi import FastAPI
import uvicorn
from fastapi import FastAPI, File, UploadFile, HTTPException
from typing import List
from service.file_service import check_and_create_directory, upload_and_save_file, fetch_files
from service.file_service import download_format_words, check_and_create_directory, upload_and_format_file, \
upload_and_save_file, fetch_files, fetch_and_format_file
from service import excel_service
from service.db_service import get_task_list
from fastapi.responses import FileResponse
import threading
from logging_config import LOGGING_CONFIG
import logging
import pandas as pd
logger = logging.getLogger(__name__)
app = FastAPI()
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
@app.get("/api/v1/hw")
def read_root():
return {"Hello": "World"}
# 上传文件并解析,解析是异步错误
@app.post("/upload_files_and_parse")
@app.post("/yj_resume/upload_files_and_parse")
async def create_upload_files(files: List[UploadFile] = File(...)):
dir_id = check_and_create_directory(files)
dir_id = check_and_create_directory(files, 'parse')
if not dir_id:
return {"result": False, "code": 500, "message": "create directory failed"}
flag, message = await upload_and_save_file(dir_id, files)
logger.info(f"flag is {flag}")
if flag:
#flag, message = await fetch_files(dir_id)
# flag, message = await fetch_files(dir_id)
executor.submit(fetch_files, dir_id)
return {"result": flag, "message": message, "task_id": dir_id}
@app.get("/export_task_data_to_excel")
@app.get("/yj_resume/export_task_data_to_excel")
def export_task_data_to_excel(task_id: str):
path_xx = excel_service.export_task_data_to_excel(task_id)
if not path_xx:
@@ -47,12 +52,43 @@ def export_task_data_to_excel(task_id: str):
)
@app.get("/parse_task_list")
@app.get("/yj_resume/parse_task_list")
def parse_task_list():
data = get_task_list()
data = get_task_list('parse')
return {"data": data, "code": 200, }
@app.get("/yj_resume/format_task_list")
def format_task_list():
data = get_task_list('format')
return {"data": data, "code": 200, }
@app.post("/yj_resume/upload_files_and_format")
async def create_upload_files(files: List[UploadFile] = File(...)):
dir_id = check_and_create_directory(files, 'format')
if not dir_id:
return {"result": False, "code": 500, "message": "create directory failed"}
flag, message = await upload_and_format_file(dir_id, files)
logger.info(f"flag is {flag}")
if flag:
# flag, message = await fetch_files(dir_id)
executor.submit(fetch_and_format_file, dir_id)
return {"result": flag, "message": message, "task_id": dir_id}
@app.get("/yj_resume/download_format_words")
def export_task_data_to_excel(task_id: str):
path_xx = download_format_words(task_id)
if not path_xx:
raise HTTPException(status_code=404, detail="file not found")
return FileResponse(
path=path_xx,
media_type="application/octet-stream", # 通用二进制流
filename=f"{task_id}.zip" # 浏览器下载时使用的文件名
)
if __name__ == '__main__':
logger.info("start server")
uvicorn.run(app, host="0.0.0.0", port=3006)

View File

@@ -7,12 +7,7 @@ python-decouple
python-multipart
pandas
openpyxl
python-multipart
PyMuPDF>=1.23.0
paddlepaddle>=2.5.0
paddleocr>=2.7.0.3
opencv-python>=4.8.0
numpy>=1.24.0
pdf2image>=1.16.3
Pillow>=10.0.0
Pillow>=10.0.0
numpy
openpyxl

View File

@@ -1,9 +1,10 @@
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
def get_task_list():
task_list = SqliteSqlalchemy().session.query(DBTASK).order_by(DBTASK.create_time.desc()).all()
result=[]
def get_task_list(task_type):
task_list = SqliteSqlalchemy().session.query(DBTASK).filter(DBTASK.task_type == task_type).order_by(
DBTASK.create_time.desc()).all()
result = []
for task in task_list:
result.append({
"id": task.id,

View File

@@ -4,8 +4,8 @@ import pandas as pd
import pathlib
from decouple import config
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
#BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
# 导出数据到excel

View File

@@ -1,8 +1,6 @@
import json
from pymupdf import message
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
from sqlalchemy import update
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy, DBEXCEL
import uuid
from datetime import datetime
from decouple import config
@@ -12,16 +10,66 @@ from typing import List
import os
import asyncio
import logging
from logging_config import LOGGING_CONFIG
from service.format_template_resume import format_excel_to_words
from service.parse_resume2_doc import extra_resume
import pypandoc
logger = logging.getLogger(__name__)
#BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
# ZIP_PATH = config('ZIP_PATh', default='E://pyptoject//yj_resume//zip//')
#
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
ZIP_PATH = config('ZIP_PATh', default='D://PycharmProject//yj_resume//zip//')
import pandas as pd
import zipfile
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
import os
import subprocess
import msvcrt
import tempfile
def check_and_create_directory(files):
def convert_doc_to_docx_secure(input_file,out_put_dir):
# 环境配置
with tempfile.TemporaryDirectory() as tmpdir:
os.environ['TMP'] = tmpdir
os.environ['TEMP'] = tmpdir
# 构建命令
cmd = [
'soffice',
'--headless',
'--nologo',
'--nodefault',
'--norestore',
'--convert-to', 'docx',
'--outdir', out_put_dir,
input_file
]
# 执行转换
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
timeout=30 # 设置超时防止卡死
)
return True
except subprocess.CalledProcessError as e:
print(f"深度错误信息:\nSTDOUT: {e.stdout}\nSTDERR: {e.stderr}")
return False
def check_and_create_directory(files, task_type):
logger.info("check_and_create_directory in service")
# 先创建一个task
if not files or len(files) == 0:
@@ -31,8 +79,8 @@ def check_and_create_directory(files):
current_time = datetime.now()
# 格式化时间为字符串
formatted_time = current_time.strftime("%Y-%m-%d-%H-%M-%S")
task = DBTASK(id=id, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
fail_num=0,name=f"解析任务({formatted_time})")
task = DBTASK(id=id, task_type=task_type, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
fail_num=0, name=f"解析任务({formatted_time})")
session = SqliteSqlalchemy().session
try:
@@ -57,11 +105,13 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
id = str(uuid.uuid4())
if fix not in ['.doc', '.docx']:
continue
with open(pathxx.joinpath(id + fix), 'wb') as f:
file_content = await file.read()
f.write(file_content)
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
if fix=='.doc':
convert_doc_to_docx_secure(str(pathxx.joinpath(id + fix)),str(pathxx))
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + '.docx'))
session = SqliteSqlalchemy().session
try:
session.bulk_save_objects(data)
@@ -74,22 +124,22 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
session.close()
return True, "success"
def fetch_files(dir_id) -> (bool, str):
def fetch_files(dir_id) -> (bool, str):
logger.info(f"start fetching files task {dir_id} in service")
if not os.path.exists(BASE_PATH):
logger.info(f"目录{BASE_PATH}不存在")
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
file_extensions = ['.docx', '.doc']
file_extensions = ['.docx']
files_list = []
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
for root,dirs,files in os.walk(dir_path):
for root, dirs, files in os.walk(dir_path):
for file in files:
_,ext = os.path.splitext(file)
_, ext = os.path.splitext(file)
if file_extensions and ext not in file_extensions:
logger.error(f"文件{file}格式不符合预期")
continue
file_path = os.path.join(root,file)
file_path = os.path.join(root, file)
if os.path.isfile(file_path):
files_list.append(file_path)
else:
@@ -106,10 +156,10 @@ def fetch_files(dir_id) -> (bool, str):
logger.info(f"file content is {result}")
if not result:
logger.warning(f"file {file_name} 提取为空")
update_fail_mapping.append({'id':id, 'status':0,
'message': f"task {dir_id} => file {file_name} 提取为空"})
update_fail_mapping.append({'id': id, 'status': 0,
'message': f"task {dir_id} => file {file_name} 提取为空"})
continue
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
update_success_mapping.append({'id': id, 'status': 1, 'data_info': result})
session = SqliteSqlalchemy().session
logger.info(f"update success mapping => {update_success_mapping}")
logger.info(f"update fail mapping => {update_fail_mapping}")
@@ -120,8 +170,8 @@ def fetch_files(dir_id) -> (bool, str):
session.bulk_update_mappings(DBRESUME, update_data)
if update_fail_mapping:
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 2, 'success_num': success_num,
'fail_num': fail_num, 'message': f'fail => {update_fail_mapping}'}])
else:
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
'success_num': success_num, 'fail_num': fail_num}])
@@ -136,5 +186,69 @@ def fetch_files(dir_id) -> (bool, str):
return True, 'success'
async def upload_and_format_file(dir_id, files: List[UploadFile]) -> (bool, str):
logger.info(f"upload_and_format_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathxx.mkdir(parents=True, exist_ok=True)
data = []
for file in files:
# id = str(uuid.uuid4())
name, fix = os.path.splitext(file.filename)
if fix not in ['.xls', '.xlsx']:
continue
with open(pathxx.joinpath(dir_id + fix), 'wb') as f:
file_content = await file.read()
f.write(file_content)
data.append(DBEXCEL(id=dir_id, status=0, file_name=dir_id + '.xlsx'))
session = SqliteSqlalchemy().session
try:
session.bulk_save_objects(data)
session.commit()
except Exception as e:
print(f"Failed to save DBEXCEL error {e}")
session.rollback()
return False, f"Failed to save DBEXCEL error {e}"
finally:
session.close()
return True, "success"
def zip_file_folder(dir_id):
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
output_filename = pathlib.Path(ZIP_PATH).joinpath((dir_id + ".zip"))
with zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 遍历文件夹中的所有文件和子文件夹
for root, dirs, files in os.walk(pathxx):
for file in files:
# 构建完整的文件路径
file_path = os.path.join(root, file)
# 将文件添加到ZIP文件中使用相对于原始文件夹的路径
zipf.write(file_path, arcname=os.path.relpath(file_path, pathxx))
def fetch_and_format_file(dir_id) -> (bool, str):
logger.info(f"fetch_and_format_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathx_1 = pathxx.joinpath(dir_id + ".xlsx")
if not pathx_1.exists():
pathx_1 = pathxx.joinpath(dir_id + ".xls")
data = pd.read_excel(pathx_1)
data_dict = data.to_dict('records')
print(data_dict)
format_excel_to_words(dir_id, data_dict)
zip_file_folder(dir_id)
session = SqliteSqlalchemy().session
try:
session.execute(update(DBTASK).where(DBTASK.id == dir_id).values(status=1))
session.commit()
except Exception as e:
session.rollback()
finally:
session.close()
def download_format_words(task_id):
pathxx = pathlib.Path(ZIP_PATH).joinpath((task_id + ".zip"))
if not pathxx.exists():
return None
return pathxx

View File

@@ -1,6 +1,20 @@
import json
import re
from docxtpl import DocxTemplate
from pathlib import Path
from decouple import config
import pathlib,logging
import uuid
from sqlalchemy import update
from openpyxl import load_workbook,styles
from db.sql_db import SqliteSqlalchemy, DBEXCEL
logger = logging.getLogger(__name__)
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
context = {
'name': '张三',
'sex': '',
@@ -29,7 +43,261 @@ context = {
'''
}
file_path = Path.cwd().joinpath('template.docx')
template = DocxTemplate(file_path)
template.render(context)
template.save('E://resu//output.docx')
excel_mapping = {
'politics': {'群众': '1', '中共党员': '2', '民主党派': '3', '共青团员': '4'},
'education': {'小学及以下': '1', '初中': '2', '高中、技校': '3', '中专': '4',
'大专': '5', '大学本科': '6', '硕士研究生': '7', '博士研究生': '8'},
'nation': {'汉族': '1', '蒙古族': '2', '回族': '3', '藏族': '4', '维吾尔族': '5',
'苗族': '6', '彝族': '7', '壮族': '8', '布依族': '9', '朝鲜族': '10',
'满族': '11', '侗族': '12', '瑶族': '13', '白族': '14', '土家族': '15',
'哈尼族': '16', '哈萨克族': '17', '傣族': '18', '黎族': '19', '傈僳族': '20',
'佤族': '21', '畲族': '22', '高山族': '23', '拉祜族': '24', '水族': '25',
'东乡族': '26', '纳西族': '27', '景颇族': '28', '柯尔克孜族': '29', '土族': '30',
'达斡尔族': '31', '仫佬族': '32', '羌族': '33', '布朗族': '34', '撒拉族': '35',
'毛南族': '36', '仡佬族': '37', '锡伯族': '38', '阿昌族': '39', '普米族': '40',
'塔吉克族': '41', '怒族': '42', '乌孜别克族': '43', '俄罗斯族': '44', '鄂温克族': '45',
'德昂族': '46', '保安族': '47', '裕固族': '48', '京族': '49', '塔塔尔族': '50', '独龙族': '51',
'鄂伦春族': '52', '赫哲族': '53', '门巴族': '54', '珞巴族': '55', '基诺族': '56', '其他': '57'},
'address': {'拉萨市': '540100', '昌都地区': '542100', '山南地区': '542200', '日喀则地区': '542300',
'那曲地区': '542400', '阿里地区': '542500', '林芝地区': '542600', '藏外地区': '549999'}}
def convert_excel_data(old_dict: dict) -> dict:
new_dict = {}
pre_edu_mapping = {
'':'小学及以下',
'小学':'小学及以下',
'初中':'初中',
'高中':'高中、技校',
'高职':'高中、技校',
'中专':'中专',
'大专': '大专',
'本科': '大学本科',
'硕士': '硕士研究生',
'博士': '博士研究生',
}
pre_addr_mapping = {
'拉萨': '拉萨市',
'昌都': '昌都地区',
'山南': '山南地区',
'日喀则': '日喀则地区',
'那曲': '那曲地区',
'阿里': '阿里地区',
'林芝': '林芝地区',
}
pre_nation_mapping = excel_mapping.get('nation')
new_dict['name'] = dict_get_mul_key(['姓名', '姓 名'], old_dict)
new_dict['sex'] = dict_get_mul_key(['性别', '性 别'], old_dict)
new_dict['origin'] = dict_get_mul_key(['籍贯', '籍 贯'], old_dict)
new_dict['politics'] = '中共党员' if len(dict_get_mul_key(['入党时间', '入 党 时 间'], old_dict)) > 0 and dict_get_mul_key(['入党时间'],old_dict) not in ('/', '\\','None','nan','') else '群众'
address = dict_get_mul_key(['出 生 地', '出生地'], old_dict)
nation = dict_get_mul_key(['民族', '民 族'], old_dict)
new_dict['nation'] = nation
# 学历标准化
r1 = old_dict.get("全日制教育")
r2 = old_dict.get("在职教育")
if len(r1) > 0 and r1 not in ('/','\\','None','nan','','','null'):
education = r1
else:
education = r2
education = education.lower().replace(' ', '')
for k,v in pre_edu_mapping.items():
if k in education:
new_dict['education'] = v
addr_found = False
# 居住地标准化
for k,v in pre_addr_mapping.items():
if k in address:
print("藏内地区")
addr_found = True
new_dict['address'] = v
if not addr_found:
print("藏外地区")
new_dict['address'] = '藏外地区'
# 民族标准化
for k,v in pre_nation_mapping.items():
if k in nation or nation in k:
new_dict['nation'] = k
return new_dict
def map_data(data_list):
#特殊值地区编号
for idx, row in enumerate(data_list):
for k,v in excel_mapping.items():
value = row.get(k)
if value:
if v.get(value,''):
data_list[idx][k] = v.get(value)
return data_list
def dict_get_mul_key(keys: list, dict_data: dict):
for k in keys:
if k in dict_data.keys() and dict_data[k]:
return dict_data[k]
return ''
def convert_data(old_dict: dict) -> dict:
new_dict = {}
new_dict['name'] = dict_get_mul_key(['姓名', '姓 名'], old_dict)
new_dict['sex'] = dict_get_mul_key(['性别', '性 别'], old_dict)
new_dict['nation'] = dict_get_mul_key(['民族', '民 族'], old_dict)
new_dict['brith'] = dict_get_mul_key(['出生年月', '出生年月(岁)'], old_dict)
new_dict['origin'] = dict_get_mul_key(['籍贯', '籍 贯'], old_dict)
new_dict['address'] = dict_get_mul_key(['出 生 地', '出生地'], old_dict)
new_dict['education'] = dict_get_mul_key(['学历', '学 历'], old_dict)
new_dict['degree'] = dict_get_mul_key(['学位', '学 位'], old_dict)
new_dict['politics'] = '党员' if len(dict_get_mul_key(['入党时间', '入 党 时 间'], old_dict)) > 0 and dict_get_mul_key(['入党时间'], old_dict) not in ('/','\\','None','nan','') else '群众'
new_dict['department'] = dict_get_mul_key(['部门', '部 门'], old_dict)
new_dict['position'] = dict_get_mul_key(['现任职务', '现 任 职 务'], old_dict)
new_dict['phone'] = dict_get_mul_key(['手机号', '手 机 号'], old_dict)
new_dict['title'] = dict_get_mul_key(['专业技术职务', '职 称'], old_dict)
new_dict['start_work_time'] = dict_get_mul_key(['开始工作时间', '开始 工作 时间'], old_dict)
new_dict['id_number'] = dict_get_mul_key(['身份证', '身 份 证','身份证号码','身份证号'], old_dict)
new_dict['honor'] = dict_get_mul_key(['奖惩情况', '奖惩 情况'], old_dict)
new_dict['work_text'] = dict_get_mul_key(['简历', '简 历'], old_dict)
return new_dict
def format_and_write_excel_file(dir_id, data_list, template_row=5):
logger.info("Start to format and write excel file ")
try:
outpath = pathlib.Path(BASE_PATH).joinpath(dir_id)
output_path = outpath.joinpath((dir_id + '_out.xlsx'))
template_path = Path.cwd().joinpath('template.xlsx')
wb = load_workbook(template_path)
ws = wb.active
pattern = re.compile(r'\{\{(\w+)\}\}')
placeholder_columns = {}
for col in range(1, ws.max_column + 1):
cell = ws.cell(row=template_row, column=col)
if cell.value and isinstance(cell.value, str) and '{{' in cell.value:
matches = pattern.findall(cell.value)
if matches:
placeholder_columns[col] = matches[0]
logger.info(f"找到占位符列: {placeholder_columns}")
# 处理每条数据
for index, data in enumerate(data_list):
target_row = template_row + index
if index > 0:
# 插入新行并复制格式
ws.insert_rows(target_row)
for col in range(1, ws.max_column + 1):
source_cell = ws.cell(row=target_row - 1, column=col)
target_cell = ws.cell(row=target_row, column=col)
target_cell.style = source_cell.style
if source_cell.alignment:
# 必须明确复制对齐属性
target_cell.alignment = styles.Alignment(
horizontal=source_cell.alignment.horizontal, # 水平对齐
vertical=source_cell.alignment.vertical, # 垂直对齐
text_rotation=source_cell.alignment.text_rotation,
wrap_text=source_cell.alignment.wrap_text,
shrink_to_fit=source_cell.alignment.shrink_to_fit,
indent=source_cell.alignment.indent
)
# 只复制样式,不复制值
# if source_cell.has_style:
# target_cell.font = copy(source_cell.font)
# target_cell.border = copy(source_cell.border)
# target_cell.fill = copy(source_cell.fill)
# target_cell.number_format = source_cell.number_format
# target_cell.alignment = copy(source_cell.alignment)
# 复制边框(这是你缺失的关键部分)
if hasattr(source_cell, 'border') and source_cell.border:
# 创建新的边框对象
from openpyxl.styles import Border, Side
# 获取源单元格的边框样式
source_border = source_cell.border
# 创建新的边框对象
new_border = Border(
left=Side(
style=source_border.left.style,
color=source_border.left.color
) if source_border.left else None,
right=Side(
style=source_border.right.style,
color=source_border.right.color
) if source_border.right else None,
top=Side(
style=source_border.top.style,
color=source_border.top.color
) if source_border.top else None,
bottom=Side(
style=source_border.bottom.style,
color=source_border.bottom.color
) if source_border.bottom else None
)
target_cell.border = new_border
# 填充数据
for col, field in placeholder_columns.items():
print()
cell = ws.cell(row=target_row, column=col)
if field in data and data[field] is not None:
value = data[field]
cell.value = value
else:
# 数据为空,保持单元格空白
cell.value = None
# 保存文件
wb.save(output_path)
except Exception as e:
logger.error(f"format and write excel file failed {e}")
def format_and_write_file(dir_id: str, ctx: dict):
logger.info(f'format_and_write_file dir id is {dir_id}')
user_name = ctx.get('name', str(uuid.uuid4()))
file_path = Path.cwd().joinpath('template.docx')
print(file_path)
template = DocxTemplate(file_path)
print(f"ctx {ctx}")
print("test1")
template.render(ctx)
print("test2")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
print("test3")
pathxx = pathxx.joinpath((user_name + '.docx'))
print(pathxx)
template.save(pathxx)
print('222222222')
def format_excel_to_words(dir_id: str, dict_data_list: list[dict]):
if not dict_data_list or len(dict_data_list) < 1:
return
logger.info("dict_data_list is {0}".format(dict_data_list))
excel_data_list = [convert_excel_data(data) for data in dict_data_list]
excel_data_list = map_data(excel_data_list)
logger.info(f"excel map data is {excel_data_list}")
format_and_write_excel_file(dir_id, excel_data_list)
session = SqliteSqlalchemy().session
# 同时写出一份到数据库后期后继汇总excel
try:
save_data = json.dumps(excel_data_list, ensure_ascii=False)
session.execute(update(DBEXCEL).where(DBEXCEL.id == dir_id).values(status=1, file_name=dir_id + '_out.xlsx', excel_info=save_data))
session.commit()
except Exception as e:
session.rollback()
finally:
session.close()
for dict_data in dict_data_list:
new_data = convert_data(dict_data)
print(new_data)
format_and_write_file(dir_id, new_data)

View File

@@ -1,10 +1,16 @@
import os.path
import re
import json
from pathlib import Path
import pypandoc
from docx import Document
from typing import Dict, List, Any, Tuple
from collections import defaultdict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EnhancedDocxExtractor:
def __init__(self):
@@ -19,19 +25,28 @@ class EnhancedDocxExtractor:
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
'毕业院校': ['毕业院校', '毕业学校', '毕业院校:','毕业院校系及专业'],
'毕业院校系及专业': ['毕业院校', '毕业学校', '毕业院校:'],
'专业': ['专业', '专业:', '系及专业', '所学专业'],
}
def convert_doc_to_docx(self, file_path: str) -> Document:
pass
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
"""
提取 .docx 中的表格结构数据
"""
logger.info(f"into extract_with_table_structure")
doc = Document(docx_path)
results = defaultdict(dict)
# 分析每个表格
for table_idx, table in enumerate(doc.tables):
print(f"\n处理表格 {table_idx + 1} ({len(table.rows)}× {len(table.columns)}列)")
logger.info(f"\n处理表格 {table_idx + 1} ({len(table.rows)}× {len(table.columns)}列)")
# 获取表格结构
table_structure = self._analyze_table_structure(table)
@@ -83,22 +98,20 @@ class EnhancedDocxExtractor:
key_recode = []
for row_idx, row in enumerate(structure):
for col_idx, cell in enumerate(row):
print(f"visited is {visited} ")
print(f'row {row_idx} col {col_idx} all cell is {cell}')
logger.info(f"visited is {visited} ")
logger.info(f'row {row_idx} col {col_idx} all cell is {cell}')
if (row_idx, col_idx) in visited:
print(f'---{row_idx}, {col_idx} ')
print(f'cell is {cell}')
logger.info(f'---{row_idx}, {col_idx} in visited ')
continue
if cell['is_key']:
next_cell = structure[row_idx][col_idx+1]
# next_cell = structure[row_idx][col_idx+1]
# 寻找对应的值
print(f"cell2 is {cell} row {row_idx} col {col_idx}")
logger.info(f"cell2 is {cell} row {row_idx} col {col_idx}")
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
if value:
key = self._normalize_key(cell['text'])
found = False
kv_pairs = [(k,v+","+value)if k == cell['text'] else (k, v) for k,v in kv_pairs ]
for i, (k,v) in enumerate(kv_pairs):
if k == cell['text']:
kv_pairs[i] = (k,value)
@@ -107,41 +120,75 @@ class EnhancedDocxExtractor:
kv_pairs.append((key, value))
else:
print("不是key")
logger.info("不是key")
return kv_pairs
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
"""
为键找到对应的值
"""
logger.info(f"======================kv==================={kv_pairs}")
# 尝试右侧单元格
if key_col + 1 < len(structure[key_row]):
value_cell = structure[key_row][key_col + 1]
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
current_key_cell = structure[key_row][key_col]
if not value_cell['text']:
value_cell['text'] = "None"
current_key_text = current_key_cell['text'].translate(translation_table)
if value_cell['is_key']:
return None
# 特殊处理学历
spec_coll = ['全日制教育','在职教育']
if current_key_cell['text'].replace('\n','') in spec_coll :
if not value_cell['text']:
value_cell['text'] = ""
else:
value_cell['text'] = ''
if not value_cell['text']:
value_cell['text'] = "None"
spec_cell_meb = ['称谓', '姓名', '年龄', '政治面貌']
if current_key_text == "家庭主要成员及重要社会关系":
logger.info("特殊处理家庭主要成员及重要社会关系")
values = []
old_value = None
for k, v in kv_pairs:
if k == current_key_text:
old_value = v
logger.info(f"old_value is {old_value}")
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
v =value_cell['text'].translate(translation_table)
logger.info(f"当前值为 {str(v)}")
if v not in spec_cell_meb:
for i in range(key_col+1,len(structure[key_row])):
col_value = structure[key_row][i]['text']
logger.info(f"col_value is {col_value}")
if col_value not in values and col_value != 'None':
values.append(col_value)
visited.add((key_row, i))
vs = "_".join(values)
if old_value :
if vs:
value_cell['text'] = old_value + "|" + vs
else:
value_cell['text'] = old_value
else:
value_cell['text'] = "_".join(values)
return value_cell['text']
logger.info(f"in {spec_cell_meb }")
return None
if value_cell['text'] and (key_row, key_col + 1) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("前一个不重复")
print(f"visited add {key_row} {key_col + 1}")
logger.info("前一个不重复")
logger.info(f"visited add {key_row} {key_col + 1}")
visited.add((key_row, key_col + 1))
return value_cell['text']
else:
current_key = structure[key_row][key_col]['text']
print(f"key值重复------------------------------key {current_key}")
# current_key = structure[key_row][key_col]['text']
logger.info(f"key值重复------------------------------key {current_key_text}")
for key, value in kv_pairs:
if key == current_key:
if key == current_key_text:
# value_arr = value.strip(',')
if value_cell['text'] in value:
return value
return value+","+value_cell['text']
@@ -151,8 +198,8 @@ class EnhancedDocxExtractor:
if value_cell['text'] and (key_row + 1, key_col) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("下一个不重复")
print(f"visited add {key_row} {key_col + 1}")
logger.info("下一个不重复")
logger.info(f"visited add {key_row} {key_col + 1}")
visited.add((key_row + 1, key_col))
return value_cell['text']
@@ -165,8 +212,8 @@ class EnhancedDocxExtractor:
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
# 检查这个值是否与前一个键提取的值相同
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("合并不重复")
print(f"visited add {key_row} {key_col + 1}")
logger.info("合并不重复")
logger.info(f"visited add {key_row} {key_col + 1}")
visited.add((row_idx, col_idx))
return cell['text']
return None
@@ -183,7 +230,8 @@ class EnhancedDocxExtractor:
|
|————————————————————————————————————————————————————
"""
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
text = text.translate(translation_table)
for k, v in kv_pairs:
if text == k:
return True
@@ -198,18 +246,17 @@ class EnhancedDocxExtractor:
def _is_likely_key(self, text: str) -> bool:
"""判断文本是否可能是键"""
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
text = text.translate(translation_table)
if not text or len(text) > 20:
return False
# 检查是否包含常见字段词
key_indicators = ['籍贯','籍 贯','政治面貌','政治\n面貌','姓名','性别','姓 名', '性 别', '出生年月', '民族','民 族', '单位', '部门','联系地址','主要学习经历','全日制教育','在职教育',
'职务','职 务','\n', '职称','职 称', '电话', '地址', '学历', '学位','现任职务','业资格','奖惩情况(近三年主要奖惩信息)'
'专业', '岗位', '经历', '时间', '资格','现任职单位及部门','身份证号','婚姻状况','健康状况','应聘岗位','应聘部门/岗位','毕业院校系及专业']
key_indicators = ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业','称谓',
'年龄','政治面貌','工作单位及职务','参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','称取得时间',
'年核度结考果','简历','奖惩情况', '年度考核结果', '任免理由', '家庭主要成员及重要社会关系']
for indicator in key_indicators:
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
indicator = indicator.translate(translation_table)
text = text.translate(translation_table)
if indicator in text:
return True
@@ -219,30 +266,6 @@ class EnhancedDocxExtractor:
if any(indicator in key_part for indicator in key_indicators):
return True
for indicator in key_indicators:
print("indicator is ===============================", indicator)
print("text is ===============================", text)
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
indicator = indicator.translate(translation_table)
text = text.translate(translation_table)
clean_text = self.extract_parentheses_content(text)
print(text)
clean_indicator = self.extract_parentheses_content(indicator)
print(indicator)
if not clean_text:
print("特殊匹配失败")
return False
if clean_indicator:
print("开始匹配=========")
clean_text = clean_text[0]
clean_indicator = clean_indicator[0]
if clean_indicator in clean_text:
print(f"特殊情况匹配成功======={text}")
return True
else:
print("继续匹配")
continue
return False
def _is_likely_value(self, text: str) -> bool:
@@ -267,8 +290,9 @@ class EnhancedDocxExtractor:
def _normalize_key(self, key_text: str) -> str:
"""标准化键名"""
# 移除冒号和空格
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
key_text = key_text.translate(translation_table)
key_text = re.sub(r'[:\s]+$', '', key_text)
# 映射到标准键名
for std_key, variants in self.field_variants.items():
for variant in variants:
@@ -279,18 +303,16 @@ class EnhancedDocxExtractor:
def _categorize_field(self, key: str) -> str:
"""将字段分类"""
categories = {
'基本信息': ['姓名', '性别', '出生年月', '民族', '政治面貌','学历学位','毕业院校系及专业','全日制教育','在职教育'
'婚姻状况', '健康状况', '籍贯', '身份证号','联系电话','婚姻状况','健康状况','身份证号','联系电话(手机)','毕业院校系及专业','联系地址','主要学习经历','奖惩情况(近三年主要奖惩信息)'],
'工作信息': ['现任职单位及部门', '现任职务', '职称', '职业资格',
'参加工作时间', '职称取得时间','应聘部门/岗位','是否接受调剂职级/岗位','奖惩情况(近三年主要奖惩信息)'],
'基本信息': ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业'],
'工作信息': ['参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间', '年核度结考果'],
}
for category, fields in categories.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
key = key.translate(translation_table)
if key in fields:
# print(f"filed is {fields} key is {key} ")
return category
return '其他信息'
@@ -323,50 +345,103 @@ class EnhancedDocxExtractor:
# 快速使用示例
def quick_extract(docx_path: str):
"""快速提取并显示结果"""
logger.info('into quick_extract')
extractor = EnhancedDocxExtractor()
try:
result = extractor.extract_with_table_structure(docx_path)
print("\n提取结果 (键值对格式):")
print("=" * 60)
logger.info("\n提取结果 (键值对格式):")
logger.info("=" * 60)
for category, fields in result.items():
if fields:
print(f"\n{category}:")
logger.info(f"\n{category}:")
for key, value in fields.items():
print(f" {key}: {value}")
logger.info(f" {key}: {value}")
return result
except Exception as e:
print(f"提取失败: {e}")
logger.info(f"提取失败: {e}")
base_map = ['姓名','性别','籍贯','政治面貌','出生年月','身份证号','现居住地','民族','学历','学位','学历学位','特长','联系电话','联系电话(手机)',
'婚姻状况','健康状况','毕业院校系及专业','主要学习经历','联系地址','入党/团时间','全日制教育','在职教育','奖惩情况(近三年主要奖惩信息)']
work_map = ['参加工作时间','现任职单位及部门','职务','现任职务','职称','奖惩','工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位']
other_map = ['工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位','是否接受调剂职级/岗位']
def is_valid_year_month(year, month):
"""验证年月有效性"""
if len(year) != 4:
return False
try:
month_int = int(month)
return 1 <= month_int <= 12
except ValueError:
return False
def get_year_month(text):
"""
最终版本:覆盖所有情况
直接匹配并提取年月
"""
# 统一正则表达式,一步到位
pattern = r'''
(\d{4}) # 年4位数字
[-./年] # 分隔符
(\d{1,2}) # 月1-2位数字
(?: # 非捕获组:日的部分(可选)
[-./月] # 分隔符
\d{1,2} # 日
(?:[日号]?|[-./]?) # 可选的""""
)? # 整个日的部分是可选的
(?!\d) # 后面不能紧跟数字(避免匹配到年龄)
'''
match = re.search(pattern, text, re.VERBOSE | re.IGNORECASE)
if match:
year, month = match.groups()[:2]
month = month.zfill(2)
if is_valid_year_month(year, month):
return f"{year}-{month}"
return None
base_map = ['姓名','性别','出生年月(岁)','民族','籍贯','出生地','入党时间','健康状况','全日制教育','在职教育','毕业院校系及专业']
work_map = ['参加工作时间','专业技术职务','熟悉专业有何专长','现任职务','拟任职务','拟免职务','职称取得时间','年核度结考果']
other_map = ['简历','奖惩情况','年度考核结果','任免理由','家庭主要成员及重要社会关系']
def fetch_info(data):
map_word = base_map + work_map + other_map
print("data is {0}".format(data))
print("map_word is {0}".format(map_word))
logger.info("data is {0}".format(data))
logger.info("map_word is {0}".format(map_word))
final_res = {}
for key, value in data.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
clean_key = key.translate(translation_table)
print(f"key is {clean_key} ")
logger.info(f"key is {clean_key} ")
if clean_key in map_word:
# clean_value = value.translate(translation_table)
# 特殊处理家庭成员
if clean_key == "家庭主要成员及重要社会关系":
value_arr = value.split('|')
final_value = []
if value_arr:
for value in value_arr:
v = value.split('_')
# v = ",".join(v)
final_value.append(v)
value = final_value
# 特殊处理年月
if clean_key == "出生年月(岁)":
value = get_year_month(value)
final_res[clean_key] = value
return final_res
def extra_resume(file_path):
logger.info(f"Start to quick extract {file_path}")
result = quick_extract(file_path)
print(result)
logger.info(f"result isv ------------------------------{result}")
base_data = result['基本信息']
work_data = result['工作信息']
other_data = result['其他信息']
@@ -379,8 +454,9 @@ def extra_resume(file_path):
# if __name__ == "__main__":
# logger = logging.getLogger(__name__)
# # 使用方法
# docx_file = "../1.报名登记表.docx" # 替换为你的文件
# docx_file = "../任免审批表数信中心--谭杰明.docx" # 替换为你的文件
# print(extra_resume(docx_file))

BIN
service/template.xlsx Normal file

Binary file not shown.

BIN
template.docx Normal file

Binary file not shown.

BIN
template.xlsx Normal file

Binary file not shown.