Compare commits

...

26 Commits

Author SHA1 Message Date
yujj128
b383a52bdc 导出excel 2025-12-12 15:31:14 +08:00
雷雨
5e70e79365 feat:处理doc转docx 2025-12-10 15:25:16 +08:00
雷雨
e057917151 feat:更新任务状态 2025-12-10 11:02:20 +08:00
雷雨
50f3ab9438 feat:dockerfile修改 2025-12-10 10:54:56 +08:00
雷雨
0f7a32270b feat:bug修复 2025-12-10 10:51:19 +08:00
雷雨
f32104994d feat:增加文件格式化,下载,压缩 2025-12-10 10:48:39 +08:00
yujj128
15d778fdb9 提取简历 2025-12-10 10:33:25 +08:00
yujj128
fba18a0cd6 去包 2025-12-08 18:32:20 +08:00
yujj128
5f3c61c18c Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume 2025-12-06 18:00:31 +08:00
yujj128
0f666f18c1 空值特殊处理 2025-12-06 18:00:04 +08:00
雷雨
c00328ed8d feat:处理文件修改为异步 2025-12-06 17:39:21 +08:00
yujj128
16583dbb06 简历提取,写入 2025-12-06 17:04:05 +08:00
yujj128
ec0995d08a Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	main.py
2025-12-06 16:48:51 +08:00
yujj128
a124651a7e 简历提取,写入 2025-12-06 16:44:53 +08:00
雷雨
a7ddfcde2a feat:dockerfile提交 2025-12-06 16:30:49 +08:00
雷雨
43af924920 feat:提交下载文件代码 2025-12-06 16:23:21 +08:00
yujj128
e9d225939a Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	requirements.txt
2025-12-06 11:19:38 +08:00
yujj128
ff1c0e890c 简历提取 2025-12-06 11:19:09 +08:00
雷雨
9fd3376557 feat:增加导出到excel功能_api 2025-12-06 11:14:45 +08:00
雷雨
8f35513063 feat:增加导出到excel功能 2025-12-06 10:58:46 +08:00
雷雨
f32aa61c0f feat:修改提示信息 2025-12-06 10:44:06 +08:00
雷雨
4e8995eaed feat:上传文件,生成记录,保存文件 2025-12-06 10:40:30 +08:00
yujj128
f1063146d2 Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	requirements.txt
2025-12-05 20:49:00 +08:00
yujj128
992bab2887 简历提取 2025-12-05 20:48:27 +08:00
雷雨
eb32528f7e feat:提交格式化模板 2025-12-05 16:34:01 +08:00
雷雨
dcc6db2363 修改结构 2025-12-05 11:43:37 +08:00
17 changed files with 1300 additions and 4 deletions

9
Dockerfile Normal file
View File

@@ -0,0 +1,9 @@
FROM docker.m.daocloud.io/python:3.12-slim
WORKDIR /app
COPY . /app
ENV TZ=Asia/Shanghai \
LANG=C.UTF-8
RUN rm -rf logs .git .idea .venv && apt-get update && apt-get install -y pandoc vim curl sqlite3 && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
RUN mkdir -p /app/uploads && mkdir -p /app/zip
EXPOSE 3006
CMD ["python", "main.py"]

0
db/__init__.py Normal file
View File

62
db/sql_db.py Normal file
View File

@@ -0,0 +1,62 @@
from sqlalchemy import Column, DateTime, Integer, BigInteger, String, create_engine, Boolean, Text
from sqlalchemy.orm import declarative_base, sessionmaker
# 申明基类对象
Base = declarative_base()
from decouple import config
# DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
DB_PATH = config('DB_PATH', default='D://PycharmProject//yj_resume//main.sqlite3')
class DBTASK(Base):
__tablename__ = 'db_task'
id = Column(String(100), primary_key=True)
name = Column(String(100), nullable=False)
create_time = Column(DateTime, nullable=False, )
task_type = Column(String(20), nullable=False, )
# 0 代表待执行1 成功2 失败
status = Column(Integer, nullable=False, default=0)
success_num = Column(Integer, nullable=False, default=0)
total_num = Column(Integer, nullable=False, default=0)
fail_num = Column(Integer, nullable=False, default=0)
message = Column(Text, nullable=True)
class DBRESUME(Base):
__tablename__ = 'db_resume'
id = Column(String(100), primary_key=True)
# 每个任务对应一个文件夹ID
task_id = Column(String(100), nullable=False)
# 0 代表待执行1 成功2 失败
status = Column(Integer, nullable=False, default=0)
file_name = Column(String(100), nullable=True)
# 可以用json表示提取的数据
data_info = Column(Text, nullable=True)
# 错误信息等
message = Column(Text, nullable=True)
class DBEXCEL(Base):
__tablename__ = 'db_excel'
# 每个任务对应一个文件夹ID
id = Column(String(100), primary_key=True)
# 0 代表待执行1 成功2 失败
status = Column(Integer, nullable=False, default=0)
file_name = Column(String(100), nullable=True)
# 可以用json表示提取的数据
excel_info = Column(Text, nullable=True)
# 错误信息等
message = Column(Text, nullable=True)
class SqliteSqlalchemy(object):
def __init__(self):
# 创建sqlite连接引擎
engine = create_engine(f'sqlite:///{DB_PATH}', echo=True)
# 创建表
Base.metadata.create_all(engine, checkfirst=True)
# 创建sqlite的session连接对象
self.session = sessionmaker(bind=engine)()

62
logging_config.py Normal file
View File

@@ -0,0 +1,62 @@
# logging_config.py
import logging
import logging.config
from pathlib import Path
# 确保 logs 目录存在
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
},
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "default",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler", # 自动轮转
"level": "INFO",
"formatter": "detailed",
"filename": "logs/resume.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5, # 保留5个备份
"encoding": "utf8"
},
},
"root": {
"level": "INFO",
"handlers": ["console", "file"]
},
"loggers": {
"uvicorn": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn.error": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn.access": {
"level": "WARNING", # 只记录警告以上,避免刷屏
"handlers": ["file"], # 只写入文件
"propagate": False
}
}
}
# 应用配置
logging.config.dictConfig(LOGGING_CONFIG)

84
main.py
View File

@@ -1,14 +1,94 @@
from fastapi import FastAPI
import uvicorn
import uvicorn
from fastapi import FastAPI, File, UploadFile, HTTPException
from typing import List
from service.file_service import download_format_words, check_and_create_directory, upload_and_format_file, \
upload_and_save_file, fetch_files, fetch_and_format_file
from service import excel_service
from service.db_service import get_task_list
from fastapi.responses import FileResponse
import threading
from logging_config import LOGGING_CONFIG
import logging
import pandas as pd
logger = logging.getLogger(__name__)
app = FastAPI()
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
@app.get("/api/v1/hw")
def read_root():
return {"Hello": "World"}
# 上传文件并解析,解析是异步错误
@app.post("/yj_resume/upload_files_and_parse")
async def create_upload_files(files: List[UploadFile] = File(...)):
dir_id = check_and_create_directory(files, 'parse')
if not dir_id:
return {"result": False, "code": 500, "message": "create directory failed"}
flag, message = await upload_and_save_file(dir_id, files)
logger.info(f"flag is {flag}")
if flag:
# flag, message = await fetch_files(dir_id)
executor.submit(fetch_files, dir_id)
return {"result": flag, "message": message, "task_id": dir_id}
@app.get("/yj_resume/export_task_data_to_excel")
def export_task_data_to_excel(task_id: str):
path_xx = excel_service.export_task_data_to_excel(task_id)
if not path_xx:
raise HTTPException(status_code=404, detail="file not found")
return FileResponse(
path=path_xx,
media_type="application/octet-stream", # 通用二进制流
filename=f"{task_id}.xlsx" # 浏览器下载时使用的文件名
)
@app.get("/yj_resume/parse_task_list")
def parse_task_list():
data = get_task_list('parse')
return {"data": data, "code": 200, }
@app.get("/yj_resume/format_task_list")
def format_task_list():
data = get_task_list('format')
return {"data": data, "code": 200, }
@app.post("/yj_resume/upload_files_and_format")
async def create_upload_files(files: List[UploadFile] = File(...)):
dir_id = check_and_create_directory(files, 'format')
if not dir_id:
return {"result": False, "code": 500, "message": "create directory failed"}
flag, message = await upload_and_format_file(dir_id, files)
logger.info(f"flag is {flag}")
if flag:
# flag, message = await fetch_files(dir_id)
executor.submit(fetch_and_format_file, dir_id)
return {"result": flag, "message": message, "task_id": dir_id}
@app.get("/yj_resume/download_format_words")
def export_task_data_to_excel(task_id: str):
path_xx = download_format_words(task_id)
if not path_xx:
raise HTTPException(status_code=404, detail="file not found")
return FileResponse(
path=path_xx,
media_type="application/octet-stream", # 通用二进制流
filename=f"{task_id}.zip" # 浏览器下载时使用的文件名
)
if __name__ == '__main__':
uvicorn.run(app, host="127.0.0.1", port=3006)
logger.info("start server")
uvicorn.run(app, host="0.0.0.0", port=3006)

View File

@@ -1,3 +1,13 @@
python-docx== 0.8.11
python-docx
fastapi
uvicorn
uvicorn
docxtpl
SQLAlchemy
python-decouple
python-multipart
pandas
openpyxl
python-multipart
Pillow>=10.0.0
numpy
openpyxl

0
service/__init__.py Normal file
View File

19
service/db_service.py Normal file
View File

@@ -0,0 +1,19 @@
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
def get_task_list(task_type):
task_list = SqliteSqlalchemy().session.query(DBTASK).filter(DBTASK.task_type == task_type).order_by(
DBTASK.create_time.desc()).all()
result = []
for task in task_list:
result.append({
"id": task.id,
"name": task.name,
"success_num": task.success_num,
"fail_num": task.fail_num,
"status": task.status,
"total_num": task.total_num,
"message": task.message,
"create_time": task.create_time.strftime("%Y-%m-%d %H:%M:%S") if task.create_time else None,
})
return result

35
service/excel_service.py Normal file
View File

@@ -0,0 +1,35 @@
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
import json
import pandas as pd
import pathlib
from decouple import config
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
# 导出数据到excel
def export_to_excel(task_id):
# 获取所有成功的信息
list_data = SqliteSqlalchemy().session.query(DBRESUME).filter_by(task_id=task_id, status=1).all()
pd_data = []
for data in list_data:
pd_data.append(json.loads(data.data_info))
data_frame = pd.DataFrame(pd_data)
# 导出到excel
pathxx = pathlib.Path(BASE_PATH).joinpath(task_id)
pathxx = pathxx.joinpath(f"{task_id}.xlsx")
data_frame.to_excel(pathxx, index=False)
def export_task_data_to_excel(task_id):
pathxx = pathlib.Path(BASE_PATH).joinpath(task_id)
pathxx = pathxx.joinpath(f"{task_id}.xlsx")
if pathxx.exists():
return pathxx
session = SqliteSqlalchemy().session
task = session.query(DBTASK).filter_by(id=task_id).first()
if not task or task.status == 0 or task.status == 2:
return None
export_to_excel(task_id)
return pathxx

254
service/file_service.py Normal file
View File

@@ -0,0 +1,254 @@
import json
from sqlalchemy import update
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy, DBEXCEL
import uuid
from datetime import datetime
from decouple import config
import pathlib
from fastapi import File, UploadFile
from typing import List
import os
import asyncio
import logging
from logging_config import LOGGING_CONFIG
from service.format_template_resume import format_excel_to_words
from service.parse_resume2_doc import extra_resume
import pypandoc
logger = logging.getLogger(__name__)
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
# ZIP_PATH = config('ZIP_PATh', default='E://pyptoject//yj_resume//zip//')
#
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
ZIP_PATH = config('ZIP_PATh', default='D://PycharmProject//yj_resume//zip//')
import pandas as pd
import zipfile
import os
import subprocess
import msvcrt
import tempfile
def convert_doc_to_docx_secure(input_file,out_put_dir):
# 环境配置
with tempfile.TemporaryDirectory() as tmpdir:
os.environ['TMP'] = tmpdir
os.environ['TEMP'] = tmpdir
# 构建命令
cmd = [
'soffice',
'--headless',
'--nologo',
'--nodefault',
'--norestore',
'--convert-to', 'docx',
'--outdir', out_put_dir,
input_file
]
# 执行转换
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
timeout=30 # 设置超时防止卡死
)
return True
except subprocess.CalledProcessError as e:
print(f"深度错误信息:\nSTDOUT: {e.stdout}\nSTDERR: {e.stderr}")
return False
def check_and_create_directory(files, task_type):
logger.info("check_and_create_directory in service")
# 先创建一个task
if not files or len(files) == 0:
logger.warning("check_and_create_directory is empty")
return None
id = str(uuid.uuid4())
current_time = datetime.now()
# 格式化时间为字符串
formatted_time = current_time.strftime("%Y-%m-%d-%H-%M-%S")
task = DBTASK(id=id, task_type=task_type, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
fail_num=0, name=f"解析任务({formatted_time})")
session = SqliteSqlalchemy().session
try:
session.add(task)
session.commit()
except Exception as e:
print(f"Failed to save DBTASK info error {e}")
session.rollback()
return None
finally:
session.close()
return id
async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
logger.info(f"upload_and_save_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathxx.mkdir(parents=True, exist_ok=True)
data = []
for file in files:
name, fix = os.path.splitext(file.filename)
id = str(uuid.uuid4())
if fix not in ['.doc', '.docx']:
continue
with open(pathxx.joinpath(id + fix), 'wb') as f:
file_content = await file.read()
f.write(file_content)
if fix=='.doc':
convert_doc_to_docx_secure(str(pathxx.joinpath(id + fix)),str(pathxx))
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + '.docx'))
session = SqliteSqlalchemy().session
try:
session.bulk_save_objects(data)
session.commit()
except Exception as e:
print(f"Failed to save DBRESUME error {e}")
session.rollback()
return False, f"Failed to save DBRESUME error {e}"
finally:
session.close()
return True, "success"
def fetch_files(dir_id) -> (bool, str):
logger.info(f"start fetching files task {dir_id} in service")
if not os.path.exists(BASE_PATH):
logger.info(f"目录{BASE_PATH}不存在")
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
file_extensions = ['.docx']
files_list = []
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
for root, dirs, files in os.walk(dir_path):
for file in files:
_, ext = os.path.splitext(file)
if file_extensions and ext not in file_extensions:
logger.error(f"文件{file}格式不符合预期")
continue
file_path = os.path.join(root, file)
if os.path.isfile(file_path):
files_list.append(file_path)
else:
logger.error(f"路径下{file_path}不是文件")
update_success_mapping = []
update_fail_mapping = []
for file in files_list:
logger.info(f"file is {file} {os.path.basename(file)}")
file_name = os.path.basename(file)
id = os.path.splitext(file_name)[0]
result = extra_resume(file)
result = json.dumps(result, ensure_ascii=False)
logger.info(f"result type is {type(result)}")
logger.info(f"file content is {result}")
if not result:
logger.warning(f"file {file_name} 提取为空")
update_fail_mapping.append({'id': id, 'status': 0,
'message': f"task {dir_id} => file {file_name} 提取为空"})
continue
update_success_mapping.append({'id': id, 'status': 1, 'data_info': result})
session = SqliteSqlalchemy().session
logger.info(f"update success mapping => {update_success_mapping}")
logger.info(f"update fail mapping => {update_fail_mapping}")
success_num = len(update_success_mapping)
fail_num = len(update_fail_mapping)
try:
update_data = update_success_mapping + update_fail_mapping
session.bulk_update_mappings(DBRESUME, update_data)
if update_fail_mapping:
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 2, 'success_num': success_num,
'fail_num': fail_num, 'message': f'fail => {update_fail_mapping}'}])
else:
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
'success_num': success_num, 'fail_num': fail_num}])
session.commit()
except Exception as e:
logger.error(f"update failed => task {dir_id} error {e}")
session.rollback()
return False, f"Failed to update DBRESUME error {e}"
finally:
session.close()
return True, 'success'
async def upload_and_format_file(dir_id, files: List[UploadFile]) -> (bool, str):
logger.info(f"upload_and_format_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathxx.mkdir(parents=True, exist_ok=True)
data = []
for file in files:
# id = str(uuid.uuid4())
name, fix = os.path.splitext(file.filename)
if fix not in ['.xls', '.xlsx']:
continue
with open(pathxx.joinpath(dir_id + fix), 'wb') as f:
file_content = await file.read()
f.write(file_content)
data.append(DBEXCEL(id=dir_id, status=0, file_name=dir_id + '.xlsx'))
session = SqliteSqlalchemy().session
try:
session.bulk_save_objects(data)
session.commit()
except Exception as e:
print(f"Failed to save DBEXCEL error {e}")
session.rollback()
return False, f"Failed to save DBEXCEL error {e}"
finally:
session.close()
return True, "success"
def zip_file_folder(dir_id):
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
output_filename = pathlib.Path(ZIP_PATH).joinpath((dir_id + ".zip"))
with zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 遍历文件夹中的所有文件和子文件夹
for root, dirs, files in os.walk(pathxx):
for file in files:
# 构建完整的文件路径
file_path = os.path.join(root, file)
# 将文件添加到ZIP文件中使用相对于原始文件夹的路径
zipf.write(file_path, arcname=os.path.relpath(file_path, pathxx))
def fetch_and_format_file(dir_id) -> (bool, str):
logger.info(f"fetch_and_format_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathx_1 = pathxx.joinpath(dir_id + ".xlsx")
if not pathx_1.exists():
pathx_1 = pathxx.joinpath(dir_id + ".xls")
data = pd.read_excel(pathx_1)
data_dict = data.to_dict('records')
print(data_dict)
format_excel_to_words(dir_id, data_dict)
zip_file_folder(dir_id)
session = SqliteSqlalchemy().session
try:
session.execute(update(DBTASK).where(DBTASK.id == dir_id).values(status=1))
session.commit()
except Exception as e:
session.rollback()
finally:
session.close()
def download_format_words(task_id):
pathxx = pathlib.Path(ZIP_PATH).joinpath((task_id + ".zip"))
if not pathxx.exists():
return None
return pathxx

View File

@@ -0,0 +1,303 @@
import json
import re
from docxtpl import DocxTemplate
from pathlib import Path
from decouple import config
import pathlib,logging
import uuid
from sqlalchemy import update
from openpyxl import load_workbook,styles
from db.sql_db import SqliteSqlalchemy, DBEXCEL
logger = logging.getLogger(__name__)
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
context = {
'name': '张三',
'sex': '',
'nation': '汉族',
'brith': '1990-01-01',
'address': '北京市海淀区西二旗',
'education': '本科',
'degree': '学士',
# 籍贯
'origin': '山东',
'politics': '党员',
# 部门
'department': '数信部',
'position': '助理开发工程师',
'phone': '13812345678',
'title': '后端开发工程师',
'start_work_time': '2018-01-01',
# 身份证
'id_number': '500221199001010010101',
# 荣誉,交给大模型
'honor': '一等奖',
# 工作内容
'work_text': '''
2023.12-2024.10:负责《边缘计算+5G自组网的水电物联网系统建设与研究》项目异常检测算法和项目实施利用5G自组网技术、自建边缘计算单元等实际实现在线异常检测、时间序列趋势分析、模型轻量化等功能缓解通信带宽压力在观音岩、彭水、渝能等场站实施应用。完成项目科技成果凝练、项目报奖等工作。本项目工作获得第六届全国设备管理与技术创新成果一等奖、中电联职工创新成果二等奖。
2024.04-2025.至今:广西河池源网荷储一体化项目/大唐西藏玉曲河扎拉电厂可行性研究报告&方案编写、AI支持中心方案策划
'''
}
excel_mapping = {
'politics': {'群众': '1', '中共党员': '2', '民主党派': '3', '共青团员': '4'},
'education': {'小学及以下': '1', '初中': '2', '高中、技校': '3', '中专': '4',
'大专': '5', '大学本科': '6', '硕士研究生': '7', '博士研究生': '8'},
'nation': {'汉族': '1', '蒙古族': '2', '回族': '3', '藏族': '4', '维吾尔族': '5',
'苗族': '6', '彝族': '7', '壮族': '8', '布依族': '9', '朝鲜族': '10',
'满族': '11', '侗族': '12', '瑶族': '13', '白族': '14', '土家族': '15',
'哈尼族': '16', '哈萨克族': '17', '傣族': '18', '黎族': '19', '傈僳族': '20',
'佤族': '21', '畲族': '22', '高山族': '23', '拉祜族': '24', '水族': '25',
'东乡族': '26', '纳西族': '27', '景颇族': '28', '柯尔克孜族': '29', '土族': '30',
'达斡尔族': '31', '仫佬族': '32', '羌族': '33', '布朗族': '34', '撒拉族': '35',
'毛南族': '36', '仡佬族': '37', '锡伯族': '38', '阿昌族': '39', '普米族': '40',
'塔吉克族': '41', '怒族': '42', '乌孜别克族': '43', '俄罗斯族': '44', '鄂温克族': '45',
'德昂族': '46', '保安族': '47', '裕固族': '48', '京族': '49', '塔塔尔族': '50', '独龙族': '51',
'鄂伦春族': '52', '赫哲族': '53', '门巴族': '54', '珞巴族': '55', '基诺族': '56', '其他': '57'},
'address': {'拉萨市': '540100', '昌都地区': '542100', '山南地区': '542200', '日喀则地区': '542300',
'那曲地区': '542400', '阿里地区': '542500', '林芝地区': '542600', '藏外地区': '549999'}}
def convert_excel_data(old_dict: dict) -> dict:
new_dict = {}
pre_edu_mapping = {
'':'小学及以下',
'小学':'小学及以下',
'初中':'初中',
'高中':'高中、技校',
'高职':'高中、技校',
'中专':'中专',
'大专': '大专',
'本科': '大学本科',
'硕士': '硕士研究生',
'博士': '博士研究生',
}
pre_addr_mapping = {
'拉萨': '拉萨市',
'昌都': '昌都地区',
'山南': '山南地区',
'日喀则': '日喀则地区',
'那曲': '那曲地区',
'阿里': '阿里地区',
'林芝': '林芝地区',
}
pre_nation_mapping = excel_mapping.get('nation')
new_dict['name'] = dict_get_mul_key(['姓名', '姓 名'], old_dict)
new_dict['sex'] = dict_get_mul_key(['性别', '性 别'], old_dict)
new_dict['origin'] = dict_get_mul_key(['籍贯', '籍 贯'], old_dict)
new_dict['politics'] = '中共党员' if len(dict_get_mul_key(['入党时间', '入 党 时 间'], old_dict)) > 0 and dict_get_mul_key(['入党时间'],old_dict) not in ('/', '\\','None','nan','') else '群众'
address = dict_get_mul_key(['出 生 地', '出生地'], old_dict)
nation = dict_get_mul_key(['民族', '民 族'], old_dict)
new_dict['nation'] = nation
# 学历标准化
r1 = old_dict.get("全日制教育")
r2 = old_dict.get("在职教育")
if len(r1) > 0 and r1 not in ('/','\\','None','nan','','','null'):
education = r1
else:
education = r2
education = education.lower().replace(' ', '')
for k,v in pre_edu_mapping.items():
if k in education:
new_dict['education'] = v
addr_found = False
# 居住地标准化
for k,v in pre_addr_mapping.items():
if k in address:
print("藏内地区")
addr_found = True
new_dict['address'] = v
if not addr_found:
print("藏外地区")
new_dict['address'] = '藏外地区'
# 民族标准化
for k,v in pre_nation_mapping.items():
if k in nation or nation in k:
new_dict['nation'] = k
return new_dict
def map_data(data_list):
#特殊值地区编号
for idx, row in enumerate(data_list):
for k,v in excel_mapping.items():
value = row.get(k)
if value:
if v.get(value,''):
data_list[idx][k] = v.get(value)
return data_list
def dict_get_mul_key(keys: list, dict_data: dict):
for k in keys:
if k in dict_data.keys() and dict_data[k]:
return dict_data[k]
return ''
def convert_data(old_dict: dict) -> dict:
new_dict = {}
new_dict['name'] = dict_get_mul_key(['姓名', '姓 名'], old_dict)
new_dict['sex'] = dict_get_mul_key(['性别', '性 别'], old_dict)
new_dict['nation'] = dict_get_mul_key(['民族', '民 族'], old_dict)
new_dict['brith'] = dict_get_mul_key(['出生年月', '出生年月(岁)'], old_dict)
new_dict['origin'] = dict_get_mul_key(['籍贯', '籍 贯'], old_dict)
new_dict['address'] = dict_get_mul_key(['出 生 地', '出生地'], old_dict)
new_dict['education'] = dict_get_mul_key(['学历', '学 历'], old_dict)
new_dict['degree'] = dict_get_mul_key(['学位', '学 位'], old_dict)
new_dict['politics'] = '党员' if len(dict_get_mul_key(['入党时间', '入 党 时 间'], old_dict)) > 0 and dict_get_mul_key(['入党时间'], old_dict) not in ('/','\\','None','nan','') else '群众'
new_dict['department'] = dict_get_mul_key(['部门', '部 门'], old_dict)
new_dict['position'] = dict_get_mul_key(['现任职务', '现 任 职 务'], old_dict)
new_dict['phone'] = dict_get_mul_key(['手机号', '手 机 号'], old_dict)
new_dict['title'] = dict_get_mul_key(['专业技术职务', '职 称'], old_dict)
new_dict['start_work_time'] = dict_get_mul_key(['开始工作时间', '开始 工作 时间'], old_dict)
new_dict['id_number'] = dict_get_mul_key(['身份证', '身 份 证','身份证号码','身份证号'], old_dict)
new_dict['honor'] = dict_get_mul_key(['奖惩情况', '奖惩 情况'], old_dict)
new_dict['work_text'] = dict_get_mul_key(['简历', '简 历'], old_dict)
return new_dict
def format_and_write_excel_file(dir_id, data_list, template_row=5):
logger.info("Start to format and write excel file ")
try:
outpath = pathlib.Path(BASE_PATH).joinpath(dir_id)
output_path = outpath.joinpath((dir_id + '_out.xlsx'))
template_path = Path.cwd().joinpath('template.xlsx')
wb = load_workbook(template_path)
ws = wb.active
pattern = re.compile(r'\{\{(\w+)\}\}')
placeholder_columns = {}
for col in range(1, ws.max_column + 1):
cell = ws.cell(row=template_row, column=col)
if cell.value and isinstance(cell.value, str) and '{{' in cell.value:
matches = pattern.findall(cell.value)
if matches:
placeholder_columns[col] = matches[0]
logger.info(f"找到占位符列: {placeholder_columns}")
# 处理每条数据
for index, data in enumerate(data_list):
target_row = template_row + index
if index > 0:
# 插入新行并复制格式
ws.insert_rows(target_row)
for col in range(1, ws.max_column + 1):
source_cell = ws.cell(row=target_row - 1, column=col)
target_cell = ws.cell(row=target_row, column=col)
target_cell.style = source_cell.style
if source_cell.alignment:
# 必须明确复制对齐属性
target_cell.alignment = styles.Alignment(
horizontal=source_cell.alignment.horizontal, # 水平对齐
vertical=source_cell.alignment.vertical, # 垂直对齐
text_rotation=source_cell.alignment.text_rotation,
wrap_text=source_cell.alignment.wrap_text,
shrink_to_fit=source_cell.alignment.shrink_to_fit,
indent=source_cell.alignment.indent
)
# 只复制样式,不复制值
# if source_cell.has_style:
# target_cell.font = copy(source_cell.font)
# target_cell.border = copy(source_cell.border)
# target_cell.fill = copy(source_cell.fill)
# target_cell.number_format = source_cell.number_format
# target_cell.alignment = copy(source_cell.alignment)
# 复制边框(这是你缺失的关键部分)
if hasattr(source_cell, 'border') and source_cell.border:
# 创建新的边框对象
from openpyxl.styles import Border, Side
# 获取源单元格的边框样式
source_border = source_cell.border
# 创建新的边框对象
new_border = Border(
left=Side(
style=source_border.left.style,
color=source_border.left.color
) if source_border.left else None,
right=Side(
style=source_border.right.style,
color=source_border.right.color
) if source_border.right else None,
top=Side(
style=source_border.top.style,
color=source_border.top.color
) if source_border.top else None,
bottom=Side(
style=source_border.bottom.style,
color=source_border.bottom.color
) if source_border.bottom else None
)
target_cell.border = new_border
# 填充数据
for col, field in placeholder_columns.items():
print()
cell = ws.cell(row=target_row, column=col)
if field in data and data[field] is not None:
value = data[field]
cell.value = value
else:
# 数据为空,保持单元格空白
cell.value = None
# 保存文件
wb.save(output_path)
except Exception as e:
logger.error(f"format and write excel file failed {e}")
def format_and_write_file(dir_id: str, ctx: dict):
logger.info(f'format_and_write_file dir id is {dir_id}')
user_name = ctx.get('name', str(uuid.uuid4()))
file_path = Path.cwd().joinpath('template.docx')
print(file_path)
template = DocxTemplate(file_path)
print(f"ctx {ctx}")
print("test1")
template.render(ctx)
print("test2")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
print("test3")
pathxx = pathxx.joinpath((user_name + '.docx'))
print(pathxx)
template.save(pathxx)
print('222222222')
def format_excel_to_words(dir_id: str, dict_data_list: list[dict]):
if not dict_data_list or len(dict_data_list) < 1:
return
logger.info("dict_data_list is {0}".format(dict_data_list))
excel_data_list = [convert_excel_data(data) for data in dict_data_list]
excel_data_list = map_data(excel_data_list)
logger.info(f"excel map data is {excel_data_list}")
format_and_write_excel_file(dir_id, excel_data_list)
session = SqliteSqlalchemy().session
# 同时写出一份到数据库后期后继汇总excel
try:
save_data = json.dumps(excel_data_list, ensure_ascii=False)
session.execute(update(DBEXCEL).where(DBEXCEL.id == dir_id).values(status=1, file_name=dir_id + '_out.xlsx', excel_info=save_data))
session.commit()
except Exception as e:
session.rollback()
finally:
session.close()
for dict_data in dict_data_list:
new_data = convert_data(dict_data)
print(new_data)
format_and_write_file(dir_id, new_data)

View File

@@ -0,0 +1,462 @@
import os.path
import re
import json
from pathlib import Path
import pypandoc
from docx import Document
from typing import Dict, List, Any, Tuple
from collections import defaultdict
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EnhancedDocxExtractor:
def __init__(self):
# 定义字段名称的多种变体
self.field_variants = {
'姓名': ['姓名', '姓 名', '姓 名', '姓名:', '姓 名:','姓 名'],
'性别': ['性别', '性 别', '性 别', '性别:', '性 别:','性 别'],
'出生年月': ['出生年月', '出生年月:', '出生日期', '出生日期:'],
'民族': ['民族', '民族:', '民 族'],
'政治面貌': ['政治面貌', '政治面貌:', '政治面貌:'],
'现任职单位及部门': ['现任职单位及部门', '单位及部门', '工作单位', '现任职单位'],
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
'毕业院校系及专业': ['毕业院校', '毕业学校', '毕业院校:'],
'专业': ['专业', '专业:', '系及专业', '所学专业'],
}
def convert_doc_to_docx(self, file_path: str) -> Document:
pass
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
"""
提取 .docx 中的表格结构数据
"""
logger.info(f"into extract_with_table_structure")
doc = Document(docx_path)
results = defaultdict(dict)
# 分析每个表格
for table_idx, table in enumerate(doc.tables):
logger.info(f"\n处理表格 {table_idx + 1} ({len(table.rows)}× {len(table.columns)}列)")
# 获取表格结构
table_structure = self._analyze_table_structure(table)
# 提取键值对
kv_pairs = self._extract_from_table_structure(table, table_structure)
# 分类存储
for key, value in kv_pairs:
category = self._categorize_field(key)
results[category][key] = value
# 提取段落中的信息
paragraph_info = self._extract_from_paragraphs(doc.paragraphs)
for key, value in paragraph_info:
category = self._categorize_field(key)
results[category][key] = value
return dict(results)
def _analyze_table_structure(self, table) -> List[List[Dict]]:
"""
分析表格结构,返回每个单元格的元信息
"""
structure = []
for row_idx, row in enumerate(table.rows):
row_info = []
for col_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
# 分析单元格属性
cell_info = {
'text': cell_text,
'row': row_idx,
'col': col_idx,
'rowspan': 1,
'colspan': 1,
'is_key': self._is_likely_key(cell_text),
'is_value': self._is_likely_value(cell_text),
}
row_info.append(cell_info)
structure.append(row_info)
return structure
def _extract_from_table_structure(self, table, structure) -> List[Tuple[str, str]]:
"""
从表格结构中提取键值对
"""
kv_pairs = []
visited = set()
key_recode = []
for row_idx, row in enumerate(structure):
for col_idx, cell in enumerate(row):
logger.info(f"visited is {visited} ")
logger.info(f'row {row_idx} col {col_idx} all cell is {cell}')
if (row_idx, col_idx) in visited:
logger.info(f'---{row_idx}, {col_idx} in visited ')
continue
if cell['is_key']:
# next_cell = structure[row_idx][col_idx+1]
# 寻找对应的值
logger.info(f"cell2 is {cell} row {row_idx} col {col_idx}")
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
if value:
key = self._normalize_key(cell['text'])
found = False
for i, (k,v) in enumerate(kv_pairs):
if k == cell['text']:
kv_pairs[i] = (k,value)
found = True
if not found:
kv_pairs.append((key, value))
else:
logger.info("不是key")
return kv_pairs
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
"""
为键找到对应的值
"""
logger.info(f"======================kv==================={kv_pairs}")
# 尝试右侧单元格
if key_col + 1 < len(structure[key_row]):
value_cell = structure[key_row][key_col + 1]
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
current_key_cell = structure[key_row][key_col]
if not value_cell['text']:
value_cell['text'] = "None"
current_key_text = current_key_cell['text'].translate(translation_table)
if value_cell['is_key']:
return None
# 特殊处理学历
spec_cell_meb = ['称谓', '姓名', '年龄', '政治面貌']
if current_key_text == "家庭主要成员及重要社会关系":
logger.info("特殊处理家庭主要成员及重要社会关系")
values = []
old_value = None
for k, v in kv_pairs:
if k == current_key_text:
old_value = v
logger.info(f"old_value is {old_value}")
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
v =value_cell['text'].translate(translation_table)
logger.info(f"当前值为 {str(v)}")
if v not in spec_cell_meb:
for i in range(key_col+1,len(structure[key_row])):
col_value = structure[key_row][i]['text']
logger.info(f"col_value is {col_value}")
if col_value not in values and col_value != 'None':
values.append(col_value)
visited.add((key_row, i))
vs = "_".join(values)
if old_value :
if vs:
value_cell['text'] = old_value + "|" + vs
else:
value_cell['text'] = old_value
else:
value_cell['text'] = "_".join(values)
return value_cell['text']
logger.info(f"in {spec_cell_meb }")
return None
if value_cell['text'] and (key_row, key_col + 1) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
logger.info("前一个不重复")
logger.info(f"visited add {key_row} {key_col + 1}")
visited.add((key_row, key_col + 1))
return value_cell['text']
else:
# current_key = structure[key_row][key_col]['text']
logger.info(f"key值重复------------------------------key {current_key_text}")
for key, value in kv_pairs:
if key == current_key_text:
# value_arr = value.strip(',')
if value_cell['text'] in value:
return value
return value+","+value_cell['text']
# 尝试下方单元格
if key_row + 1 < len(structure):
value_cell = structure[key_row + 1][key_col]
if value_cell['text'] and (key_row + 1, key_col) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
logger.info("下一个不重复")
logger.info(f"visited add {key_row} {key_col + 1}")
visited.add((key_row + 1, key_col))
return value_cell['text']
# 尝试合并单元格的情况
for row_idx in range(len(structure)):
for col_idx in range(len(structure[row_idx])):
cell = structure[row_idx][col_idx]
if (row_idx, col_idx) not in visited and cell['text']:
# 检查是否在键的附近
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
# 检查这个值是否与前一个键提取的值相同
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
logger.info("合并不重复")
logger.info(f"visited add {key_row} {key_col + 1}")
visited.add((row_idx, col_idx))
return cell['text']
return None
def _is_key_duplicate_merged_cell(self, text, kv_pairs) -> bool:
"""
检查当前文本value是否可能和已收录的kv集合里的key值重复
如下例1行0列 2行0列 都是毕业院校
第一次 1行0列:1行2列组成key:value
第二次到2行0列检测到 毕业院校已出现在kv_pairs中不再组合2行0列:2行1列
| 硕士学位/研究生学历:中国科学院计算技术研究所计算机技术专业
毕业院校 |——————————————————————————————————————————————————
|
|————————————————————————————————————————————————————
"""
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
text = text.translate(translation_table)
for k, v in kv_pairs:
if text == k:
return True
return False
def extract_parentheses_content(self, text):
# 使用正则表达式提取括号内的所有内容
matches = re.findall(r'[(]([^)]*)[)]', text)
return matches # 返回列表,可能包含多个括号
def _is_likely_key(self, text: str) -> bool:
"""判断文本是否可能是键"""
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
text = text.translate(translation_table)
if not text or len(text) > 20:
return False
# 检查是否包含常见字段词
key_indicators = ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业','称谓',
'年龄','政治面貌','工作单位及职务','参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间',
'年核度结考果','简历','奖惩情况', '年度考核结果', '任免理由', '家庭主要成员及重要社会关系']
for indicator in key_indicators:
if indicator in text:
return True
# 检查是否有冒号(中文文档常用)
if '' in text or ':' in text:
key_part = text.split('')[0].split(':')[0]
if any(indicator in key_part for indicator in key_indicators):
return True
return False
def _is_likely_value(self, text: str) -> bool:
"""判断文本是否可能是值"""
if not text:
return False
# 值通常不是常见的字段名
if self._is_likely_key(text):
return False
# 值可能包含特定内容
if re.match(r'^\d{11}$', text): # 手机号
return True
if re.match(r'^\d{4}', text): # 日期
return True
if len(text) > 10: # 长文本可能是值
return True
return True
def _normalize_key(self, key_text: str) -> str:
"""标准化键名"""
# 移除冒号和空格
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
key_text = key_text.translate(translation_table)
key_text = re.sub(r'[:\s]+$', '', key_text)
# 映射到标准键名
for std_key, variants in self.field_variants.items():
for variant in variants:
if variant == key_text or key_text in variant:
return std_key
return key_text
def _categorize_field(self, key: str) -> str:
"""将字段分类"""
categories = {
'基本信息': ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业'],
'工作信息': ['参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间', '年核度结考果'],
}
for category, fields in categories.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
key = key.translate(translation_table)
if key in fields:
return category
return '其他信息'
def _extract_from_paragraphs(self, paragraphs) -> List[Tuple[str, str]]:
"""从段落中提取信息"""
kv_pairs = []
for para in paragraphs:
text = para.text.strip()
if not text:
continue
# 尝试提取冒号分隔的键值对
if '' in text or ':' in text:
separator = '' if '' in text else ':'
parts = text.split(separator, 1)
if len(parts) == 2:
key = parts[0].strip()
value = parts[1].strip()
if self._is_likely_key(key) and value:
normalized_key = self._normalize_key(key)
kv_pairs.append((normalized_key, value))
return kv_pairs
# 快速使用示例
def quick_extract(docx_path: str):
"""快速提取并显示结果"""
logger.info('into quick_extract')
extractor = EnhancedDocxExtractor()
try:
result = extractor.extract_with_table_structure(docx_path)
logger.info("\n提取结果 (键值对格式):")
logger.info("=" * 60)
for category, fields in result.items():
if fields:
logger.info(f"\n{category}:")
for key, value in fields.items():
logger.info(f" {key}: {value}")
return result
except Exception as e:
logger.info(f"提取失败: {e}")
def is_valid_year_month(year, month):
"""验证年月有效性"""
if len(year) != 4:
return False
try:
month_int = int(month)
return 1 <= month_int <= 12
except ValueError:
return False
def get_year_month(text):
"""
最终版本:覆盖所有情况
直接匹配并提取年月
"""
# 统一正则表达式,一步到位
pattern = r'''
(\d{4}) # 年4位数字
[-./年] # 分隔符
(\d{1,2}) # 月1-2位数字
(?: # 非捕获组:日的部分(可选)
[-./月] # 分隔符
\d{1,2} # 日
(?:[日号]?|[-./]?) # 可选的""""
)? # 整个日的部分是可选的
(?!\d) # 后面不能紧跟数字(避免匹配到年龄)
'''
match = re.search(pattern, text, re.VERBOSE | re.IGNORECASE)
if match:
year, month = match.groups()[:2]
month = month.zfill(2)
if is_valid_year_month(year, month):
return f"{year}-{month}"
return None
base_map = ['姓名','性别','出生年月(岁)','民族','籍贯','出生地','入党时间','健康状况','全日制教育','在职教育','毕业院校系及专业']
work_map = ['参加工作时间','专业技术职务','熟悉专业有何专长','现任职务','拟任职务','拟免职务','职称取得时间','年核度结考果']
other_map = ['简历','奖惩情况','年度考核结果','任免理由','家庭主要成员及重要社会关系']
def fetch_info(data):
map_word = base_map + work_map + other_map
logger.info("data is {0}".format(data))
logger.info("map_word is {0}".format(map_word))
final_res = {}
for key, value in data.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
clean_key = key.translate(translation_table)
logger.info(f"key is {clean_key} ")
if clean_key in map_word:
# clean_value = value.translate(translation_table)
# 特殊处理家庭成员
if clean_key == "家庭主要成员及重要社会关系":
value_arr = value.split('|')
final_value = []
if value_arr:
for value in value_arr:
v = value.split('_')
# v = ",".join(v)
final_value.append(v)
value = final_value
# 特殊处理年月
if clean_key == "出生年月(岁)":
value = get_year_month(value)
final_res[clean_key] = value
return final_res
def extra_resume(file_path):
logger.info(f"Start to quick extract {file_path}")
result = quick_extract(file_path)
logger.info(f"result isv ------------------------------{result}")
base_data = result['基本信息']
work_data = result['工作信息']
other_data = result['其他信息']
data = {}
data.update(base_data)
data.update(work_data)
data.update(other_data)
res = fetch_info(data)
return res
# if __name__ == "__main__":
# logger = logging.getLogger(__name__)
# # 使用方法
# docx_file = "../任免审批表数信中心--谭杰明.docx" # 替换为你的文件
# print(extra_resume(docx_file))

BIN
service/template.docx Normal file

Binary file not shown.

BIN
service/template.xlsx Normal file

Binary file not shown.

BIN
template.docx Normal file

Binary file not shown.

BIN
template.xlsx Normal file

Binary file not shown.