Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b383a52bdc | ||
|
|
5e70e79365 | ||
|
|
e057917151 | ||
|
|
50f3ab9438 | ||
|
|
0f7a32270b | ||
|
|
f32104994d | ||
|
|
15d778fdb9 | ||
|
|
fba18a0cd6 |
@@ -3,7 +3,7 @@ WORKDIR /app
|
||||
COPY . /app
|
||||
ENV TZ=Asia/Shanghai \
|
||||
LANG=C.UTF-8
|
||||
RUN rm -rf logs .git .idea .venv && apt-get update && apt-get install -y vim curl sqlite3 && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
|
||||
RUN mkdir -p /app/uploads
|
||||
RUN rm -rf logs .git .idea .venv && apt-get update && apt-get install -y pandoc vim curl sqlite3 && pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
|
||||
RUN mkdir -p /app/uploads && mkdir -p /app/zip
|
||||
EXPOSE 3006
|
||||
CMD ["python", "main.py"]
|
||||
|
||||
18
db/sql_db.py
18
db/sql_db.py
@@ -4,14 +4,15 @@ from sqlalchemy.orm import declarative_base, sessionmaker
|
||||
Base = declarative_base()
|
||||
from decouple import config
|
||||
|
||||
DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
|
||||
|
||||
# DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
|
||||
DB_PATH = config('DB_PATH', default='D://PycharmProject//yj_resume//main.sqlite3')
|
||||
|
||||
class DBTASK(Base):
|
||||
__tablename__ = 'db_task'
|
||||
id = Column(String(100), primary_key=True)
|
||||
name = Column(String(100), nullable=False)
|
||||
create_time = Column(DateTime, nullable=False, )
|
||||
task_type = Column(String(20), nullable=False, )
|
||||
# 0 代表待执行,1 成功,2 失败
|
||||
status = Column(Integer, nullable=False, default=0)
|
||||
success_num = Column(Integer, nullable=False, default=0)
|
||||
@@ -33,6 +34,19 @@ class DBRESUME(Base):
|
||||
# 错误信息等
|
||||
message = Column(Text, nullable=True)
|
||||
|
||||
class DBEXCEL(Base):
|
||||
__tablename__ = 'db_excel'
|
||||
# 每个任务对应一个文件夹ID
|
||||
id = Column(String(100), primary_key=True)
|
||||
# 0 代表待执行,1 成功,2 失败
|
||||
status = Column(Integer, nullable=False, default=0)
|
||||
file_name = Column(String(100), nullable=True)
|
||||
# 可以用json表示提取的数据
|
||||
excel_info = Column(Text, nullable=True)
|
||||
# 错误信息等
|
||||
message = Column(Text, nullable=True)
|
||||
|
||||
|
||||
|
||||
class SqliteSqlalchemy(object):
|
||||
def __init__(self):
|
||||
|
||||
52
main.py
52
main.py
@@ -2,40 +2,45 @@ from fastapi import FastAPI
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, File, UploadFile, HTTPException
|
||||
from typing import List
|
||||
from service.file_service import check_and_create_directory, upload_and_save_file, fetch_files
|
||||
from service.file_service import download_format_words, check_and_create_directory, upload_and_format_file, \
|
||||
upload_and_save_file, fetch_files, fetch_and_format_file
|
||||
from service import excel_service
|
||||
from service.db_service import get_task_list
|
||||
from fastapi.responses import FileResponse
|
||||
import threading
|
||||
from logging_config import LOGGING_CONFIG
|
||||
import logging
|
||||
import pandas as pd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
import concurrent.futures
|
||||
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
||||
|
||||
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
|
||||
|
||||
|
||||
@app.get("/api/v1/hw")
|
||||
def read_root():
|
||||
return {"Hello": "World"}
|
||||
|
||||
|
||||
# 上传文件并解析,解析是异步错误
|
||||
@app.post("/upload_files_and_parse")
|
||||
@app.post("/yj_resume/upload_files_and_parse")
|
||||
async def create_upload_files(files: List[UploadFile] = File(...)):
|
||||
dir_id = check_and_create_directory(files)
|
||||
dir_id = check_and_create_directory(files, 'parse')
|
||||
if not dir_id:
|
||||
return {"result": False, "code": 500, "message": "create directory failed"}
|
||||
flag, message = await upload_and_save_file(dir_id, files)
|
||||
logger.info(f"flag is {flag}")
|
||||
if flag:
|
||||
#flag, message = await fetch_files(dir_id)
|
||||
# flag, message = await fetch_files(dir_id)
|
||||
executor.submit(fetch_files, dir_id)
|
||||
return {"result": flag, "message": message, "task_id": dir_id}
|
||||
|
||||
|
||||
@app.get("/export_task_data_to_excel")
|
||||
@app.get("/yj_resume/export_task_data_to_excel")
|
||||
def export_task_data_to_excel(task_id: str):
|
||||
path_xx = excel_service.export_task_data_to_excel(task_id)
|
||||
if not path_xx:
|
||||
@@ -47,12 +52,43 @@ def export_task_data_to_excel(task_id: str):
|
||||
)
|
||||
|
||||
|
||||
@app.get("/parse_task_list")
|
||||
@app.get("/yj_resume/parse_task_list")
|
||||
def parse_task_list():
|
||||
data = get_task_list()
|
||||
data = get_task_list('parse')
|
||||
return {"data": data, "code": 200, }
|
||||
|
||||
|
||||
@app.get("/yj_resume/format_task_list")
|
||||
def format_task_list():
|
||||
data = get_task_list('format')
|
||||
return {"data": data, "code": 200, }
|
||||
|
||||
|
||||
@app.post("/yj_resume/upload_files_and_format")
|
||||
async def create_upload_files(files: List[UploadFile] = File(...)):
|
||||
dir_id = check_and_create_directory(files, 'format')
|
||||
if not dir_id:
|
||||
return {"result": False, "code": 500, "message": "create directory failed"}
|
||||
flag, message = await upload_and_format_file(dir_id, files)
|
||||
logger.info(f"flag is {flag}")
|
||||
if flag:
|
||||
# flag, message = await fetch_files(dir_id)
|
||||
executor.submit(fetch_and_format_file, dir_id)
|
||||
return {"result": flag, "message": message, "task_id": dir_id}
|
||||
|
||||
|
||||
@app.get("/yj_resume/download_format_words")
|
||||
def export_task_data_to_excel(task_id: str):
|
||||
path_xx = download_format_words(task_id)
|
||||
if not path_xx:
|
||||
raise HTTPException(status_code=404, detail="file not found")
|
||||
return FileResponse(
|
||||
path=path_xx,
|
||||
media_type="application/octet-stream", # 通用二进制流
|
||||
filename=f"{task_id}.zip" # 浏览器下载时使用的文件名
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logger.info("start server")
|
||||
uvicorn.run(app, host="0.0.0.0", port=3006)
|
||||
|
||||
@@ -7,12 +7,7 @@ python-decouple
|
||||
python-multipart
|
||||
pandas
|
||||
openpyxl
|
||||
|
||||
python-multipart
|
||||
PyMuPDF>=1.23.0
|
||||
paddlepaddle>=2.5.0
|
||||
paddleocr>=2.7.0.3
|
||||
opencv-python>=4.8.0
|
||||
numpy>=1.24.0
|
||||
pdf2image>=1.16.3
|
||||
Pillow>=10.0.0
|
||||
Pillow>=10.0.0
|
||||
numpy
|
||||
openpyxl
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
|
||||
|
||||
|
||||
def get_task_list():
|
||||
task_list = SqliteSqlalchemy().session.query(DBTASK).order_by(DBTASK.create_time.desc()).all()
|
||||
result=[]
|
||||
def get_task_list(task_type):
|
||||
task_list = SqliteSqlalchemy().session.query(DBTASK).filter(DBTASK.task_type == task_type).order_by(
|
||||
DBTASK.create_time.desc()).all()
|
||||
result = []
|
||||
for task in task_list:
|
||||
result.append({
|
||||
"id": task.id,
|
||||
|
||||
@@ -4,8 +4,8 @@ import pandas as pd
|
||||
import pathlib
|
||||
from decouple import config
|
||||
|
||||
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
#BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
|
||||
|
||||
# 导出数据到excel
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import json
|
||||
|
||||
from pymupdf import message
|
||||
|
||||
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
|
||||
from sqlalchemy import update
|
||||
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy, DBEXCEL
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from decouple import config
|
||||
@@ -12,16 +10,66 @@ from typing import List
|
||||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from logging_config import LOGGING_CONFIG
|
||||
from service.format_template_resume import format_excel_to_words
|
||||
from service.parse_resume2_doc import extra_resume
|
||||
import pypandoc
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
#BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
# ZIP_PATH = config('ZIP_PATh', default='E://pyptoject//yj_resume//zip//')
|
||||
#
|
||||
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
ZIP_PATH = config('ZIP_PATh', default='D://PycharmProject//yj_resume//zip//')
|
||||
import pandas as pd
|
||||
import zipfile
|
||||
|
||||
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
import os
|
||||
import subprocess
|
||||
import msvcrt
|
||||
import tempfile
|
||||
|
||||
|
||||
def check_and_create_directory(files):
|
||||
def convert_doc_to_docx_secure(input_file,out_put_dir):
|
||||
|
||||
|
||||
# 环境配置
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
os.environ['TMP'] = tmpdir
|
||||
os.environ['TEMP'] = tmpdir
|
||||
|
||||
# 构建命令
|
||||
cmd = [
|
||||
'soffice',
|
||||
'--headless',
|
||||
'--nologo',
|
||||
'--nodefault',
|
||||
'--norestore',
|
||||
'--convert-to', 'docx',
|
||||
'--outdir', out_put_dir,
|
||||
input_file
|
||||
]
|
||||
|
||||
# 执行转换
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
timeout=30 # 设置超时防止卡死
|
||||
)
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"深度错误信息:\nSTDOUT: {e.stdout}\nSTDERR: {e.stderr}")
|
||||
return False
|
||||
|
||||
|
||||
|
||||
|
||||
def check_and_create_directory(files, task_type):
|
||||
logger.info("check_and_create_directory in service")
|
||||
# 先创建一个task
|
||||
if not files or len(files) == 0:
|
||||
@@ -31,8 +79,8 @@ def check_and_create_directory(files):
|
||||
current_time = datetime.now()
|
||||
# 格式化时间为字符串
|
||||
formatted_time = current_time.strftime("%Y-%m-%d-%H-%M-%S")
|
||||
task = DBTASK(id=id, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
|
||||
fail_num=0,name=f"解析任务({formatted_time})")
|
||||
task = DBTASK(id=id, task_type=task_type, create_time=datetime.now(), status=0, success_num=0, total_num=len(files),
|
||||
fail_num=0, name=f"解析任务({formatted_time})")
|
||||
|
||||
session = SqliteSqlalchemy().session
|
||||
try:
|
||||
@@ -57,11 +105,13 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
|
||||
id = str(uuid.uuid4())
|
||||
if fix not in ['.doc', '.docx']:
|
||||
continue
|
||||
|
||||
with open(pathxx.joinpath(id + fix), 'wb') as f:
|
||||
file_content = await file.read()
|
||||
f.write(file_content)
|
||||
|
||||
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
|
||||
if fix=='.doc':
|
||||
convert_doc_to_docx_secure(str(pathxx.joinpath(id + fix)),str(pathxx))
|
||||
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + '.docx'))
|
||||
session = SqliteSqlalchemy().session
|
||||
try:
|
||||
session.bulk_save_objects(data)
|
||||
@@ -74,22 +124,22 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
|
||||
session.close()
|
||||
return True, "success"
|
||||
|
||||
def fetch_files(dir_id) -> (bool, str):
|
||||
|
||||
def fetch_files(dir_id) -> (bool, str):
|
||||
logger.info(f"start fetching files task {dir_id} in service")
|
||||
if not os.path.exists(BASE_PATH):
|
||||
logger.info(f"目录{BASE_PATH}不存在")
|
||||
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
|
||||
file_extensions = ['.docx', '.doc']
|
||||
file_extensions = ['.docx']
|
||||
files_list = []
|
||||
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
for root,dirs,files in os.walk(dir_path):
|
||||
for root, dirs, files in os.walk(dir_path):
|
||||
for file in files:
|
||||
_,ext = os.path.splitext(file)
|
||||
_, ext = os.path.splitext(file)
|
||||
if file_extensions and ext not in file_extensions:
|
||||
logger.error(f"文件{file}格式不符合预期")
|
||||
continue
|
||||
file_path = os.path.join(root,file)
|
||||
file_path = os.path.join(root, file)
|
||||
if os.path.isfile(file_path):
|
||||
files_list.append(file_path)
|
||||
else:
|
||||
@@ -106,10 +156,10 @@ def fetch_files(dir_id) -> (bool, str):
|
||||
logger.info(f"file content is {result}")
|
||||
if not result:
|
||||
logger.warning(f"file {file_name} 提取为空")
|
||||
update_fail_mapping.append({'id':id, 'status':0,
|
||||
'message': f"task {dir_id} => file {file_name} 提取为空"})
|
||||
update_fail_mapping.append({'id': id, 'status': 0,
|
||||
'message': f"task {dir_id} => file {file_name} 提取为空"})
|
||||
continue
|
||||
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
|
||||
update_success_mapping.append({'id': id, 'status': 1, 'data_info': result})
|
||||
session = SqliteSqlalchemy().session
|
||||
logger.info(f"update success mapping => {update_success_mapping}")
|
||||
logger.info(f"update fail mapping => {update_fail_mapping}")
|
||||
@@ -120,8 +170,8 @@ def fetch_files(dir_id) -> (bool, str):
|
||||
session.bulk_update_mappings(DBRESUME, update_data)
|
||||
|
||||
if update_fail_mapping:
|
||||
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
|
||||
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
|
||||
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 2, 'success_num': success_num,
|
||||
'fail_num': fail_num, 'message': f'fail => {update_fail_mapping}'}])
|
||||
else:
|
||||
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
|
||||
'success_num': success_num, 'fail_num': fail_num}])
|
||||
@@ -136,5 +186,69 @@ def fetch_files(dir_id) -> (bool, str):
|
||||
return True, 'success'
|
||||
|
||||
|
||||
async def upload_and_format_file(dir_id, files: List[UploadFile]) -> (bool, str):
|
||||
logger.info(f"upload_and_format_file in service dir_id {dir_id}")
|
||||
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
pathxx.mkdir(parents=True, exist_ok=True)
|
||||
data = []
|
||||
for file in files:
|
||||
# id = str(uuid.uuid4())
|
||||
name, fix = os.path.splitext(file.filename)
|
||||
if fix not in ['.xls', '.xlsx']:
|
||||
continue
|
||||
with open(pathxx.joinpath(dir_id + fix), 'wb') as f:
|
||||
file_content = await file.read()
|
||||
f.write(file_content)
|
||||
data.append(DBEXCEL(id=dir_id, status=0, file_name=dir_id + '.xlsx'))
|
||||
session = SqliteSqlalchemy().session
|
||||
try:
|
||||
session.bulk_save_objects(data)
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
print(f"Failed to save DBEXCEL error {e}")
|
||||
session.rollback()
|
||||
return False, f"Failed to save DBEXCEL error {e}"
|
||||
finally:
|
||||
session.close()
|
||||
return True, "success"
|
||||
|
||||
|
||||
def zip_file_folder(dir_id):
|
||||
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
output_filename = pathlib.Path(ZIP_PATH).joinpath((dir_id + ".zip"))
|
||||
with zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
# 遍历文件夹中的所有文件和子文件夹
|
||||
for root, dirs, files in os.walk(pathxx):
|
||||
for file in files:
|
||||
# 构建完整的文件路径
|
||||
file_path = os.path.join(root, file)
|
||||
# 将文件添加到ZIP文件中,使用相对于原始文件夹的路径
|
||||
zipf.write(file_path, arcname=os.path.relpath(file_path, pathxx))
|
||||
|
||||
|
||||
def fetch_and_format_file(dir_id) -> (bool, str):
|
||||
logger.info(f"fetch_and_format_file in service dir_id {dir_id}")
|
||||
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
pathx_1 = pathxx.joinpath(dir_id + ".xlsx")
|
||||
if not pathx_1.exists():
|
||||
pathx_1 = pathxx.joinpath(dir_id + ".xls")
|
||||
data = pd.read_excel(pathx_1)
|
||||
data_dict = data.to_dict('records')
|
||||
print(data_dict)
|
||||
format_excel_to_words(dir_id, data_dict)
|
||||
zip_file_folder(dir_id)
|
||||
session = SqliteSqlalchemy().session
|
||||
try:
|
||||
session.execute(update(DBTASK).where(DBTASK.id == dir_id).values(status=1))
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def download_format_words(task_id):
|
||||
pathxx = pathlib.Path(ZIP_PATH).joinpath((task_id + ".zip"))
|
||||
if not pathxx.exists():
|
||||
return None
|
||||
return pathxx
|
||||
|
||||
@@ -1,6 +1,20 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
from docxtpl import DocxTemplate
|
||||
from pathlib import Path
|
||||
from decouple import config
|
||||
import pathlib,logging
|
||||
import uuid
|
||||
from sqlalchemy import update
|
||||
|
||||
from openpyxl import load_workbook,styles
|
||||
|
||||
from db.sql_db import SqliteSqlalchemy, DBEXCEL
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
context = {
|
||||
'name': '张三',
|
||||
'sex': '男',
|
||||
@@ -29,7 +43,261 @@ context = {
|
||||
'''
|
||||
|
||||
}
|
||||
file_path = Path.cwd().joinpath('template.docx')
|
||||
template = DocxTemplate(file_path)
|
||||
template.render(context)
|
||||
template.save('E://resu//output.docx')
|
||||
|
||||
excel_mapping = {
|
||||
'politics': {'群众': '1', '中共党员': '2', '民主党派': '3', '共青团员': '4'},
|
||||
'education': {'小学及以下': '1', '初中': '2', '高中、技校': '3', '中专': '4',
|
||||
'大专': '5', '大学本科': '6', '硕士研究生': '7', '博士研究生': '8'},
|
||||
'nation': {'汉族': '1', '蒙古族': '2', '回族': '3', '藏族': '4', '维吾尔族': '5',
|
||||
'苗族': '6', '彝族': '7', '壮族': '8', '布依族': '9', '朝鲜族': '10',
|
||||
'满族': '11', '侗族': '12', '瑶族': '13', '白族': '14', '土家族': '15',
|
||||
'哈尼族': '16', '哈萨克族': '17', '傣族': '18', '黎族': '19', '傈僳族': '20',
|
||||
'佤族': '21', '畲族': '22', '高山族': '23', '拉祜族': '24', '水族': '25',
|
||||
'东乡族': '26', '纳西族': '27', '景颇族': '28', '柯尔克孜族': '29', '土族': '30',
|
||||
'达斡尔族': '31', '仫佬族': '32', '羌族': '33', '布朗族': '34', '撒拉族': '35',
|
||||
'毛南族': '36', '仡佬族': '37', '锡伯族': '38', '阿昌族': '39', '普米族': '40',
|
||||
'塔吉克族': '41', '怒族': '42', '乌孜别克族': '43', '俄罗斯族': '44', '鄂温克族': '45',
|
||||
'德昂族': '46', '保安族': '47', '裕固族': '48', '京族': '49', '塔塔尔族': '50', '独龙族': '51',
|
||||
'鄂伦春族': '52', '赫哲族': '53', '门巴族': '54', '珞巴族': '55', '基诺族': '56', '其他': '57'},
|
||||
'address': {'拉萨市': '540100', '昌都地区': '542100', '山南地区': '542200', '日喀则地区': '542300',
|
||||
'那曲地区': '542400', '阿里地区': '542500', '林芝地区': '542600', '藏外地区': '549999'}}
|
||||
|
||||
|
||||
def convert_excel_data(old_dict: dict) -> dict:
|
||||
new_dict = {}
|
||||
pre_edu_mapping = {
|
||||
'无':'小学及以下',
|
||||
'小学':'小学及以下',
|
||||
'初中':'初中',
|
||||
'高中':'高中、技校',
|
||||
'高职':'高中、技校',
|
||||
'中专':'中专',
|
||||
'大专': '大专',
|
||||
'本科': '大学本科',
|
||||
'硕士': '硕士研究生',
|
||||
'博士': '博士研究生',
|
||||
}
|
||||
pre_addr_mapping = {
|
||||
'拉萨': '拉萨市',
|
||||
'昌都': '昌都地区',
|
||||
'山南': '山南地区',
|
||||
'日喀则': '日喀则地区',
|
||||
'那曲': '那曲地区',
|
||||
'阿里': '阿里地区',
|
||||
'林芝': '林芝地区',
|
||||
|
||||
}
|
||||
pre_nation_mapping = excel_mapping.get('nation')
|
||||
new_dict['name'] = dict_get_mul_key(['姓名', '姓 名'], old_dict)
|
||||
new_dict['sex'] = dict_get_mul_key(['性别', '性 别'], old_dict)
|
||||
new_dict['origin'] = dict_get_mul_key(['籍贯', '籍 贯'], old_dict)
|
||||
new_dict['politics'] = '中共党员' if len(dict_get_mul_key(['入党时间', '入 党 时 间'], old_dict)) > 0 and dict_get_mul_key(['入党时间'],old_dict) not in ('/', '\\','None','nan','无') else '群众'
|
||||
address = dict_get_mul_key(['出 生 地', '出生地'], old_dict)
|
||||
nation = dict_get_mul_key(['民族', '民 族'], old_dict)
|
||||
new_dict['nation'] = nation
|
||||
# 学历标准化
|
||||
r1 = old_dict.get("全日制教育")
|
||||
r2 = old_dict.get("在职教育")
|
||||
if len(r1) > 0 and r1 not in ('/','\\','None','nan','无','否','null'):
|
||||
education = r1
|
||||
else:
|
||||
education = r2
|
||||
education = education.lower().replace(' ', '')
|
||||
for k,v in pre_edu_mapping.items():
|
||||
if k in education:
|
||||
new_dict['education'] = v
|
||||
addr_found = False
|
||||
# 居住地标准化
|
||||
for k,v in pre_addr_mapping.items():
|
||||
if k in address:
|
||||
print("藏内地区")
|
||||
addr_found = True
|
||||
new_dict['address'] = v
|
||||
if not addr_found:
|
||||
print("藏外地区")
|
||||
new_dict['address'] = '藏外地区'
|
||||
# 民族标准化
|
||||
for k,v in pre_nation_mapping.items():
|
||||
if k in nation or nation in k:
|
||||
new_dict['nation'] = k
|
||||
return new_dict
|
||||
|
||||
def map_data(data_list):
|
||||
#特殊值地区编号
|
||||
for idx, row in enumerate(data_list):
|
||||
for k,v in excel_mapping.items():
|
||||
value = row.get(k)
|
||||
if value:
|
||||
if v.get(value,''):
|
||||
data_list[idx][k] = v.get(value)
|
||||
return data_list
|
||||
|
||||
def dict_get_mul_key(keys: list, dict_data: dict):
|
||||
for k in keys:
|
||||
if k in dict_data.keys() and dict_data[k]:
|
||||
return dict_data[k]
|
||||
return ''
|
||||
|
||||
|
||||
def convert_data(old_dict: dict) -> dict:
|
||||
new_dict = {}
|
||||
new_dict['name'] = dict_get_mul_key(['姓名', '姓 名'], old_dict)
|
||||
new_dict['sex'] = dict_get_mul_key(['性别', '性 别'], old_dict)
|
||||
new_dict['nation'] = dict_get_mul_key(['民族', '民 族'], old_dict)
|
||||
new_dict['brith'] = dict_get_mul_key(['出生年月', '出生年月(岁)'], old_dict)
|
||||
new_dict['origin'] = dict_get_mul_key(['籍贯', '籍 贯'], old_dict)
|
||||
new_dict['address'] = dict_get_mul_key(['出 生 地', '出生地'], old_dict)
|
||||
new_dict['education'] = dict_get_mul_key(['学历', '学 历'], old_dict)
|
||||
new_dict['degree'] = dict_get_mul_key(['学位', '学 位'], old_dict)
|
||||
new_dict['politics'] = '党员' if len(dict_get_mul_key(['入党时间', '入 党 时 间'], old_dict)) > 0 and dict_get_mul_key(['入党时间'], old_dict) not in ('/','\\','None','nan','无') else '群众'
|
||||
new_dict['department'] = dict_get_mul_key(['部门', '部 门'], old_dict)
|
||||
new_dict['position'] = dict_get_mul_key(['现任职务', '现 任 职 务'], old_dict)
|
||||
new_dict['phone'] = dict_get_mul_key(['手机号', '手 机 号'], old_dict)
|
||||
new_dict['title'] = dict_get_mul_key(['专业技术职务', '职 称'], old_dict)
|
||||
new_dict['start_work_time'] = dict_get_mul_key(['开始工作时间', '开始 工作 时间'], old_dict)
|
||||
new_dict['id_number'] = dict_get_mul_key(['身份证', '身 份 证','身份证号码','身份证号'], old_dict)
|
||||
new_dict['honor'] = dict_get_mul_key(['奖惩情况', '奖惩 情况'], old_dict)
|
||||
new_dict['work_text'] = dict_get_mul_key(['简历', '简 历'], old_dict)
|
||||
return new_dict
|
||||
|
||||
|
||||
def format_and_write_excel_file(dir_id, data_list, template_row=5):
|
||||
logger.info("Start to format and write excel file ")
|
||||
try:
|
||||
outpath = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
output_path = outpath.joinpath((dir_id + '_out.xlsx'))
|
||||
template_path = Path.cwd().joinpath('template.xlsx')
|
||||
wb = load_workbook(template_path)
|
||||
ws = wb.active
|
||||
|
||||
pattern = re.compile(r'\{\{(\w+)\}\}')
|
||||
|
||||
placeholder_columns = {}
|
||||
for col in range(1, ws.max_column + 1):
|
||||
cell = ws.cell(row=template_row, column=col)
|
||||
if cell.value and isinstance(cell.value, str) and '{{' in cell.value:
|
||||
matches = pattern.findall(cell.value)
|
||||
if matches:
|
||||
placeholder_columns[col] = matches[0]
|
||||
|
||||
logger.info(f"找到占位符列: {placeholder_columns}")
|
||||
|
||||
# 处理每条数据
|
||||
for index, data in enumerate(data_list):
|
||||
target_row = template_row + index
|
||||
|
||||
if index > 0:
|
||||
# 插入新行并复制格式
|
||||
ws.insert_rows(target_row)
|
||||
for col in range(1, ws.max_column + 1):
|
||||
source_cell = ws.cell(row=target_row - 1, column=col)
|
||||
target_cell = ws.cell(row=target_row, column=col)
|
||||
target_cell.style = source_cell.style
|
||||
if source_cell.alignment:
|
||||
# 必须明确复制对齐属性
|
||||
target_cell.alignment = styles.Alignment(
|
||||
horizontal=source_cell.alignment.horizontal, # 水平对齐
|
||||
vertical=source_cell.alignment.vertical, # 垂直对齐
|
||||
text_rotation=source_cell.alignment.text_rotation,
|
||||
wrap_text=source_cell.alignment.wrap_text,
|
||||
shrink_to_fit=source_cell.alignment.shrink_to_fit,
|
||||
indent=source_cell.alignment.indent
|
||||
)
|
||||
# 只复制样式,不复制值
|
||||
# if source_cell.has_style:
|
||||
# target_cell.font = copy(source_cell.font)
|
||||
# target_cell.border = copy(source_cell.border)
|
||||
# target_cell.fill = copy(source_cell.fill)
|
||||
# target_cell.number_format = source_cell.number_format
|
||||
# target_cell.alignment = copy(source_cell.alignment)
|
||||
# 复制边框(这是你缺失的关键部分)
|
||||
if hasattr(source_cell, 'border') and source_cell.border:
|
||||
# 创建新的边框对象
|
||||
from openpyxl.styles import Border, Side
|
||||
|
||||
# 获取源单元格的边框样式
|
||||
source_border = source_cell.border
|
||||
|
||||
# 创建新的边框对象
|
||||
new_border = Border(
|
||||
left=Side(
|
||||
style=source_border.left.style,
|
||||
color=source_border.left.color
|
||||
) if source_border.left else None,
|
||||
right=Side(
|
||||
style=source_border.right.style,
|
||||
color=source_border.right.color
|
||||
) if source_border.right else None,
|
||||
top=Side(
|
||||
style=source_border.top.style,
|
||||
color=source_border.top.color
|
||||
) if source_border.top else None,
|
||||
bottom=Side(
|
||||
style=source_border.bottom.style,
|
||||
color=source_border.bottom.color
|
||||
) if source_border.bottom else None
|
||||
)
|
||||
target_cell.border = new_border
|
||||
|
||||
# 填充数据
|
||||
for col, field in placeholder_columns.items():
|
||||
print()
|
||||
cell = ws.cell(row=target_row, column=col)
|
||||
|
||||
if field in data and data[field] is not None:
|
||||
value = data[field]
|
||||
cell.value = value
|
||||
else:
|
||||
# 数据为空,保持单元格空白
|
||||
cell.value = None
|
||||
|
||||
# 保存文件
|
||||
wb.save(output_path)
|
||||
except Exception as e:
|
||||
logger.error(f"format and write excel file failed {e}")
|
||||
|
||||
|
||||
def format_and_write_file(dir_id: str, ctx: dict):
|
||||
logger.info(f'format_and_write_file dir id is {dir_id}')
|
||||
user_name = ctx.get('name', str(uuid.uuid4()))
|
||||
file_path = Path.cwd().joinpath('template.docx')
|
||||
print(file_path)
|
||||
template = DocxTemplate(file_path)
|
||||
print(f"ctx {ctx}")
|
||||
print("test1")
|
||||
template.render(ctx)
|
||||
print("test2")
|
||||
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
print("test3")
|
||||
pathxx = pathxx.joinpath((user_name + '.docx'))
|
||||
print(pathxx)
|
||||
template.save(pathxx)
|
||||
print('222222222')
|
||||
|
||||
|
||||
|
||||
|
||||
def format_excel_to_words(dir_id: str, dict_data_list: list[dict]):
|
||||
if not dict_data_list or len(dict_data_list) < 1:
|
||||
return
|
||||
logger.info("dict_data_list is {0}".format(dict_data_list))
|
||||
excel_data_list = [convert_excel_data(data) for data in dict_data_list]
|
||||
excel_data_list = map_data(excel_data_list)
|
||||
logger.info(f"excel map data is {excel_data_list}")
|
||||
format_and_write_excel_file(dir_id, excel_data_list)
|
||||
session = SqliteSqlalchemy().session
|
||||
# 同时写出一份到数据库,后期后继汇总excel
|
||||
try:
|
||||
save_data = json.dumps(excel_data_list, ensure_ascii=False)
|
||||
session.execute(update(DBEXCEL).where(DBEXCEL.id == dir_id).values(status=1, file_name=dir_id + '_out.xlsx', excel_info=save_data))
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
finally:
|
||||
session.close()
|
||||
for dict_data in dict_data_list:
|
||||
|
||||
new_data = convert_data(dict_data)
|
||||
print(new_data)
|
||||
format_and_write_file(dir_id, new_data)
|
||||
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
import os.path
|
||||
import re
|
||||
import json
|
||||
from pathlib import Path
|
||||
import pypandoc
|
||||
|
||||
from docx import Document
|
||||
from typing import Dict, List, Any, Tuple
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class EnhancedDocxExtractor:
|
||||
def __init__(self):
|
||||
@@ -19,19 +25,28 @@ class EnhancedDocxExtractor:
|
||||
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
|
||||
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
|
||||
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
|
||||
'毕业院校': ['毕业院校', '毕业学校', '毕业院校:','毕业院校系及专业'],
|
||||
'毕业院校系及专业': ['毕业院校', '毕业学校', '毕业院校:'],
|
||||
'专业': ['专业', '专业:', '系及专业', '所学专业'],
|
||||
}
|
||||
|
||||
def convert_doc_to_docx(self, file_path: str) -> Document:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
提取 .docx 中的表格结构数据
|
||||
"""
|
||||
logger.info(f"into extract_with_table_structure")
|
||||
doc = Document(docx_path)
|
||||
results = defaultdict(dict)
|
||||
# 分析每个表格
|
||||
for table_idx, table in enumerate(doc.tables):
|
||||
print(f"\n处理表格 {table_idx + 1} ({len(table.rows)}行 × {len(table.columns)}列)")
|
||||
logger.info(f"\n处理表格 {table_idx + 1} ({len(table.rows)}行 × {len(table.columns)}列)")
|
||||
|
||||
# 获取表格结构
|
||||
table_structure = self._analyze_table_structure(table)
|
||||
@@ -83,22 +98,20 @@ class EnhancedDocxExtractor:
|
||||
key_recode = []
|
||||
for row_idx, row in enumerate(structure):
|
||||
for col_idx, cell in enumerate(row):
|
||||
print(f"visited is {visited} ")
|
||||
print(f'row {row_idx} col {col_idx} all cell is {cell}')
|
||||
logger.info(f"visited is {visited} ")
|
||||
logger.info(f'row {row_idx} col {col_idx} all cell is {cell}')
|
||||
if (row_idx, col_idx) in visited:
|
||||
print(f'---{row_idx}, {col_idx} ')
|
||||
print(f'cell is {cell}')
|
||||
logger.info(f'---{row_idx}, {col_idx} in visited ')
|
||||
continue
|
||||
|
||||
if cell['is_key']:
|
||||
next_cell = structure[row_idx][col_idx+1]
|
||||
# next_cell = structure[row_idx][col_idx+1]
|
||||
# 寻找对应的值
|
||||
print(f"cell2 is {cell} row {row_idx} col {col_idx}")
|
||||
logger.info(f"cell2 is {cell} row {row_idx} col {col_idx}")
|
||||
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
|
||||
if value:
|
||||
key = self._normalize_key(cell['text'])
|
||||
found = False
|
||||
kv_pairs = [(k,v+","+value)if k == cell['text'] else (k, v) for k,v in kv_pairs ]
|
||||
for i, (k,v) in enumerate(kv_pairs):
|
||||
if k == cell['text']:
|
||||
kv_pairs[i] = (k,value)
|
||||
@@ -107,41 +120,75 @@ class EnhancedDocxExtractor:
|
||||
kv_pairs.append((key, value))
|
||||
|
||||
else:
|
||||
print("不是key")
|
||||
logger.info("不是key")
|
||||
return kv_pairs
|
||||
|
||||
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
|
||||
"""
|
||||
为键找到对应的值
|
||||
"""
|
||||
logger.info(f"======================kv==================={kv_pairs}")
|
||||
# 尝试右侧单元格
|
||||
if key_col + 1 < len(structure[key_row]):
|
||||
|
||||
value_cell = structure[key_row][key_col + 1]
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
current_key_cell = structure[key_row][key_col]
|
||||
if not value_cell['text']:
|
||||
value_cell['text'] = "None"
|
||||
current_key_text = current_key_cell['text'].translate(translation_table)
|
||||
if value_cell['is_key']:
|
||||
return None
|
||||
# 特殊处理学历
|
||||
spec_coll = ['全日制教育','在职教育']
|
||||
if current_key_cell['text'].replace('\n','') in spec_coll :
|
||||
if not value_cell['text']:
|
||||
value_cell['text'] = "否"
|
||||
else:
|
||||
value_cell['text'] = '是'
|
||||
|
||||
if not value_cell['text']:
|
||||
value_cell['text'] = "None"
|
||||
spec_cell_meb = ['称谓', '姓名', '年龄', '政治面貌']
|
||||
if current_key_text == "家庭主要成员及重要社会关系":
|
||||
logger.info("特殊处理家庭主要成员及重要社会关系")
|
||||
values = []
|
||||
old_value = None
|
||||
for k, v in kv_pairs:
|
||||
if k == current_key_text:
|
||||
old_value = v
|
||||
logger.info(f"old_value is {old_value}")
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
v =value_cell['text'].translate(translation_table)
|
||||
logger.info(f"当前值为 {str(v)}")
|
||||
|
||||
if v not in spec_cell_meb:
|
||||
for i in range(key_col+1,len(structure[key_row])):
|
||||
col_value = structure[key_row][i]['text']
|
||||
logger.info(f"col_value is {col_value}")
|
||||
if col_value not in values and col_value != 'None':
|
||||
values.append(col_value)
|
||||
visited.add((key_row, i))
|
||||
vs = "_".join(values)
|
||||
if old_value :
|
||||
if vs:
|
||||
value_cell['text'] = old_value + "|" + vs
|
||||
else:
|
||||
value_cell['text'] = old_value
|
||||
else:
|
||||
value_cell['text'] = "_".join(values)
|
||||
|
||||
return value_cell['text']
|
||||
logger.info(f"in {spec_cell_meb }")
|
||||
return None
|
||||
|
||||
if value_cell['text'] and (key_row, key_col + 1) not in visited:
|
||||
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
|
||||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||||
print("前一个不重复")
|
||||
print(f"visited add {key_row} {key_col + 1}")
|
||||
logger.info("前一个不重复")
|
||||
logger.info(f"visited add {key_row} {key_col + 1}")
|
||||
visited.add((key_row, key_col + 1))
|
||||
return value_cell['text']
|
||||
else:
|
||||
current_key = structure[key_row][key_col]['text']
|
||||
print(f"key值重复------------------------------key {current_key}")
|
||||
# current_key = structure[key_row][key_col]['text']
|
||||
logger.info(f"key值重复------------------------------key {current_key_text}")
|
||||
for key, value in kv_pairs:
|
||||
if key == current_key:
|
||||
if key == current_key_text:
|
||||
# value_arr = value.strip(',')
|
||||
if value_cell['text'] in value:
|
||||
return value
|
||||
return value+","+value_cell['text']
|
||||
|
||||
|
||||
@@ -151,8 +198,8 @@ class EnhancedDocxExtractor:
|
||||
if value_cell['text'] and (key_row + 1, key_col) not in visited:
|
||||
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
|
||||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||||
print("下一个不重复")
|
||||
print(f"visited add {key_row} {key_col + 1}")
|
||||
logger.info("下一个不重复")
|
||||
logger.info(f"visited add {key_row} {key_col + 1}")
|
||||
visited.add((key_row + 1, key_col))
|
||||
return value_cell['text']
|
||||
|
||||
@@ -165,8 +212,8 @@ class EnhancedDocxExtractor:
|
||||
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
|
||||
# 检查这个值是否与前一个键提取的值相同
|
||||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||||
print("合并不重复")
|
||||
print(f"visited add {key_row} {key_col + 1}")
|
||||
logger.info("合并不重复")
|
||||
logger.info(f"visited add {key_row} {key_col + 1}")
|
||||
visited.add((row_idx, col_idx))
|
||||
return cell['text']
|
||||
return None
|
||||
@@ -183,7 +230,8 @@ class EnhancedDocxExtractor:
|
||||
|
|
||||
|————————————————————————————————————————————————————
|
||||
"""
|
||||
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
text = text.translate(translation_table)
|
||||
for k, v in kv_pairs:
|
||||
if text == k:
|
||||
return True
|
||||
@@ -198,18 +246,17 @@ class EnhancedDocxExtractor:
|
||||
|
||||
def _is_likely_key(self, text: str) -> bool:
|
||||
"""判断文本是否可能是键"""
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
text = text.translate(translation_table)
|
||||
if not text or len(text) > 20:
|
||||
return False
|
||||
|
||||
# 检查是否包含常见字段词
|
||||
key_indicators = ['籍贯','籍 贯','政治面貌','政治\n面貌','姓名','性别','姓 名', '性 别', '出生年月', '民族','民 族', '单位', '部门','联系地址','主要学习经历','全日制教育','在职教育',
|
||||
'职务','职 务','职\n务', '职称','职 称', '电话', '地址', '学历', '学位','现任职务','职业资格','奖惩情况(近三年主要奖惩信息)'
|
||||
'专业', '岗位', '经历', '时间', '资格','现任职单位及部门','身份证号','婚姻状况','健康状况','应聘岗位','应聘部门/岗位','毕业院校系及专业']
|
||||
key_indicators = ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业','称谓',
|
||||
'年龄','政治面貌','工作单位及职务','参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间',
|
||||
'年核度结考果','简历','奖惩情况', '年度考核结果', '任免理由', '家庭主要成员及重要社会关系']
|
||||
|
||||
for indicator in key_indicators:
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
indicator = indicator.translate(translation_table)
|
||||
text = text.translate(translation_table)
|
||||
if indicator in text:
|
||||
return True
|
||||
|
||||
@@ -219,30 +266,6 @@ class EnhancedDocxExtractor:
|
||||
if any(indicator in key_part for indicator in key_indicators):
|
||||
return True
|
||||
|
||||
for indicator in key_indicators:
|
||||
print("indicator is ===============================", indicator)
|
||||
print("text is ===============================", text)
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
indicator = indicator.translate(translation_table)
|
||||
text = text.translate(translation_table)
|
||||
clean_text = self.extract_parentheses_content(text)
|
||||
print(text)
|
||||
clean_indicator = self.extract_parentheses_content(indicator)
|
||||
print(indicator)
|
||||
if not clean_text:
|
||||
print("特殊匹配失败")
|
||||
return False
|
||||
if clean_indicator:
|
||||
print("开始匹配=========")
|
||||
clean_text = clean_text[0]
|
||||
clean_indicator = clean_indicator[0]
|
||||
if clean_indicator in clean_text:
|
||||
print(f"特殊情况匹配成功======={text}")
|
||||
return True
|
||||
else:
|
||||
print("继续匹配")
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
def _is_likely_value(self, text: str) -> bool:
|
||||
@@ -267,8 +290,9 @@ class EnhancedDocxExtractor:
|
||||
def _normalize_key(self, key_text: str) -> str:
|
||||
"""标准化键名"""
|
||||
# 移除冒号和空格
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
key_text = key_text.translate(translation_table)
|
||||
key_text = re.sub(r'[::\s]+$', '', key_text)
|
||||
|
||||
# 映射到标准键名
|
||||
for std_key, variants in self.field_variants.items():
|
||||
for variant in variants:
|
||||
@@ -279,18 +303,16 @@ class EnhancedDocxExtractor:
|
||||
|
||||
def _categorize_field(self, key: str) -> str:
|
||||
"""将字段分类"""
|
||||
|
||||
categories = {
|
||||
'基本信息': ['姓名', '性别', '出生年月', '民族', '政治面貌','学历学位','毕业院校系及专业','全日制教育','在职教育'
|
||||
'婚姻状况', '健康状况', '籍贯', '身份证号','联系电话','婚姻状况','健康状况','身份证号','联系电话(手机)','毕业院校系及专业','联系地址','主要学习经历','奖惩情况(近三年主要奖惩信息)'],
|
||||
'工作信息': ['现任职单位及部门', '现任职务', '职称', '职业资格',
|
||||
'参加工作时间', '职称取得时间','应聘部门/岗位','是否接受调剂职级/岗位','奖惩情况(近三年主要奖惩信息)'],
|
||||
'基本信息': ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业'],
|
||||
'工作信息': ['参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间', '年核度结考果'],
|
||||
}
|
||||
|
||||
for category, fields in categories.items():
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
key = key.translate(translation_table)
|
||||
if key in fields:
|
||||
# print(f"filed is {fields} key is {key} ")
|
||||
return category
|
||||
|
||||
return '其他信息'
|
||||
@@ -323,50 +345,103 @@ class EnhancedDocxExtractor:
|
||||
# 快速使用示例
|
||||
def quick_extract(docx_path: str):
|
||||
"""快速提取并显示结果"""
|
||||
logger.info('into quick_extract')
|
||||
extractor = EnhancedDocxExtractor()
|
||||
|
||||
try:
|
||||
result = extractor.extract_with_table_structure(docx_path)
|
||||
print("\n提取结果 (键值对格式):")
|
||||
print("=" * 60)
|
||||
logger.info("\n提取结果 (键值对格式):")
|
||||
logger.info("=" * 60)
|
||||
|
||||
for category, fields in result.items():
|
||||
if fields:
|
||||
print(f"\n{category}:")
|
||||
logger.info(f"\n{category}:")
|
||||
for key, value in fields.items():
|
||||
print(f" {key}: {value}")
|
||||
logger.info(f" {key}: {value}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
print(f"提取失败: {e}")
|
||||
logger.info(f"提取失败: {e}")
|
||||
|
||||
|
||||
base_map = ['姓名','性别','籍贯','政治面貌','出生年月','身份证号','现居住地','民族','学历','学位','学历学位','特长','联系电话','联系电话(手机)',
|
||||
'婚姻状况','健康状况','毕业院校系及专业','主要学习经历','联系地址','入党/团时间','全日制教育','在职教育','奖惩情况(近三年主要奖惩信息)']
|
||||
work_map = ['参加工作时间','现任职单位及部门','职务','现任职务','职称','奖惩','工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位']
|
||||
other_map = ['工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位','是否接受调剂职级/岗位']
|
||||
def is_valid_year_month(year, month):
|
||||
"""验证年月有效性"""
|
||||
if len(year) != 4:
|
||||
return False
|
||||
|
||||
try:
|
||||
month_int = int(month)
|
||||
return 1 <= month_int <= 12
|
||||
except ValueError:
|
||||
return False
|
||||
def get_year_month(text):
|
||||
"""
|
||||
最终版本:覆盖所有情况
|
||||
直接匹配并提取年月
|
||||
"""
|
||||
# 统一正则表达式,一步到位
|
||||
pattern = r'''
|
||||
(\d{4}) # 年:4位数字
|
||||
[-./年] # 分隔符
|
||||
(\d{1,2}) # 月:1-2位数字
|
||||
(?: # 非捕获组:日的部分(可选)
|
||||
[-./月] # 分隔符
|
||||
\d{1,2} # 日
|
||||
(?:[日号]?|[-./]?) # 可选的"日"或"号"
|
||||
)? # 整个日的部分是可选的
|
||||
(?!\d) # 后面不能紧跟数字(避免匹配到年龄)
|
||||
'''
|
||||
|
||||
match = re.search(pattern, text, re.VERBOSE | re.IGNORECASE)
|
||||
|
||||
if match:
|
||||
year, month = match.groups()[:2]
|
||||
month = month.zfill(2)
|
||||
|
||||
if is_valid_year_month(year, month):
|
||||
return f"{year}-{month}"
|
||||
|
||||
return None
|
||||
|
||||
base_map = ['姓名','性别','出生年月(岁)','民族','籍贯','出生地','入党时间','健康状况','全日制教育','在职教育','毕业院校系及专业']
|
||||
work_map = ['参加工作时间','专业技术职务','熟悉专业有何专长','现任职务','拟任职务','拟免职务','职称取得时间','年核度结考果']
|
||||
other_map = ['简历','奖惩情况','年度考核结果','任免理由','家庭主要成员及重要社会关系']
|
||||
|
||||
|
||||
|
||||
def fetch_info(data):
|
||||
map_word = base_map + work_map + other_map
|
||||
print("data is {0}".format(data))
|
||||
print("map_word is {0}".format(map_word))
|
||||
logger.info("data is {0}".format(data))
|
||||
logger.info("map_word is {0}".format(map_word))
|
||||
final_res = {}
|
||||
for key, value in data.items():
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
clean_key = key.translate(translation_table)
|
||||
print(f"key is {clean_key} ")
|
||||
logger.info(f"key is {clean_key} ")
|
||||
if clean_key in map_word:
|
||||
# clean_value = value.translate(translation_table)
|
||||
# 特殊处理家庭成员
|
||||
if clean_key == "家庭主要成员及重要社会关系":
|
||||
value_arr = value.split('|')
|
||||
final_value = []
|
||||
if value_arr:
|
||||
for value in value_arr:
|
||||
v = value.split('_')
|
||||
# v = ",".join(v)
|
||||
final_value.append(v)
|
||||
value = final_value
|
||||
# 特殊处理年月
|
||||
if clean_key == "出生年月(岁)":
|
||||
value = get_year_month(value)
|
||||
final_res[clean_key] = value
|
||||
|
||||
|
||||
return final_res
|
||||
|
||||
|
||||
def extra_resume(file_path):
|
||||
logger.info(f"Start to quick extract {file_path}")
|
||||
result = quick_extract(file_path)
|
||||
print(result)
|
||||
logger.info(f"result isv ------------------------------{result}")
|
||||
base_data = result['基本信息']
|
||||
work_data = result['工作信息']
|
||||
other_data = result['其他信息']
|
||||
@@ -379,8 +454,9 @@ def extra_resume(file_path):
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# logger = logging.getLogger(__name__)
|
||||
# # 使用方法
|
||||
# docx_file = "../1.报名登记表.docx" # 替换为你的文件
|
||||
# docx_file = "../任免审批表数信中心--谭杰明.docx" # 替换为你的文件
|
||||
# print(extra_resume(docx_file))
|
||||
|
||||
|
||||
|
||||
BIN
service/template.xlsx
Normal file
BIN
service/template.xlsx
Normal file
Binary file not shown.
BIN
template.docx
Normal file
BIN
template.docx
Normal file
Binary file not shown.
BIN
template.xlsx
Normal file
BIN
template.xlsx
Normal file
Binary file not shown.
Reference in New Issue
Block a user