Compare commits
7 Commits
a7ddfcde2a
...
16583dbb06
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16583dbb06 | ||
|
|
ec0995d08a | ||
|
|
a124651a7e | ||
|
|
e9d225939a | ||
|
|
ff1c0e890c | ||
|
|
f1063146d2 | ||
|
|
992bab2887 |
@@ -3,7 +3,8 @@ from sqlalchemy.orm import declarative_base, sessionmaker
|
||||
# 申明基类对象
|
||||
Base = declarative_base()
|
||||
from decouple import config
|
||||
DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
|
||||
|
||||
DB_PATH = config('DB_PATH', default='D://PycharmProject//yj_resume//main.sqlite3')
|
||||
|
||||
|
||||
class DBTASK(Base):
|
||||
|
||||
62
logging_config.py
Normal file
62
logging_config.py
Normal file
@@ -0,0 +1,62 @@
|
||||
# logging_config.py
|
||||
import logging
|
||||
import logging.config
|
||||
from pathlib import Path
|
||||
|
||||
# 确保 logs 目录存在
|
||||
log_dir = Path("logs")
|
||||
log_dir.mkdir(exist_ok=True)
|
||||
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"default": {
|
||||
"format": "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
|
||||
},
|
||||
"detailed": {
|
||||
"format": "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s",
|
||||
}
|
||||
},
|
||||
"handlers": {
|
||||
"console": {
|
||||
"class": "logging.StreamHandler",
|
||||
"level": "INFO",
|
||||
"formatter": "default",
|
||||
"stream": "ext://sys.stdout"
|
||||
},
|
||||
"file": {
|
||||
"class": "logging.handlers.RotatingFileHandler", # 自动轮转
|
||||
"level": "INFO",
|
||||
"formatter": "detailed",
|
||||
"filename": "logs/resume.log",
|
||||
"maxBytes": 10485760, # 10MB
|
||||
"backupCount": 5, # 保留5个备份
|
||||
"encoding": "utf8"
|
||||
},
|
||||
},
|
||||
"root": {
|
||||
"level": "INFO",
|
||||
"handlers": ["console", "file"]
|
||||
},
|
||||
"loggers": {
|
||||
"uvicorn": {
|
||||
"level": "INFO",
|
||||
"handlers": ["console", "file"],
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn.error": {
|
||||
"level": "INFO",
|
||||
"handlers": ["console", "file"],
|
||||
"propagate": False
|
||||
},
|
||||
"uvicorn.access": {
|
||||
"level": "WARNING", # 只记录警告以上,避免刷屏
|
||||
"handlers": ["file"], # 只写入文件
|
||||
"propagate": False
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# 应用配置
|
||||
logging.config.dictConfig(LOGGING_CONFIG)
|
||||
16
main.py
16
main.py
@@ -2,10 +2,16 @@ from fastapi import FastAPI
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, File, UploadFile, HTTPException
|
||||
from typing import List
|
||||
from service.file_service import check_and_create_directory, upload_and_save_file
|
||||
from service.file_service import check_and_create_directory, upload_and_save_file,fetch_files
|
||||
from service import excel_service
|
||||
from service.db_service import get_task_list
|
||||
from fastapi.responses import FileResponse
|
||||
import threading
|
||||
from logging_config import LOGGING_CONFIG
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@@ -20,11 +26,11 @@ async def create_upload_files(files: List[UploadFile] = File(...)):
|
||||
dir_id = check_and_create_directory(files)
|
||||
if not dir_id:
|
||||
return {"result": False, "code": 500, "message": "create directory failed"}
|
||||
flag, message = await upload_and_save_file(dir_id, files)
|
||||
flag, message= await upload_and_save_file(dir_id, files)
|
||||
logger.info(f"flag is {flag}")
|
||||
if flag:
|
||||
# 触发异步任务,解析文件 TODO
|
||||
pass
|
||||
return {"result": flag, "message": message}
|
||||
flag,message = await fetch_files(dir_id)
|
||||
return {"result": flag, "message": message,"task_id": dir_id}
|
||||
|
||||
|
||||
@app.get("/export_task_data_to_excel")
|
||||
|
||||
@@ -7,3 +7,12 @@ python-decouple
|
||||
python-multipart
|
||||
pandas
|
||||
openpyxl
|
||||
|
||||
python-multipart
|
||||
PyMuPDF>=1.23.0
|
||||
paddlepaddle>=2.5.0
|
||||
paddleocr>=2.7.0.3
|
||||
opencv-python>=4.8.0
|
||||
numpy>=1.24.0
|
||||
pdf2image>=1.16.3
|
||||
Pillow>=10.0.0
|
||||
@@ -4,7 +4,8 @@ import pandas as pd
|
||||
import pathlib
|
||||
from decouple import config
|
||||
|
||||
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
|
||||
|
||||
# 导出数据到excel
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
import json
|
||||
|
||||
from pymupdf import message
|
||||
|
||||
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
@@ -6,13 +10,22 @@ import pathlib
|
||||
from fastapi import File, UploadFile
|
||||
from typing import List
|
||||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from service.parse_resume2_doc import extra_resume
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
|
||||
|
||||
|
||||
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
|
||||
|
||||
|
||||
def check_and_create_directory(files):
|
||||
logger.info("check_and_create_directory in service")
|
||||
# 先创建一个task
|
||||
if not files or len(files) == 0:
|
||||
logger.warning("check_and_create_directory is empty")
|
||||
return None
|
||||
id = str(uuid.uuid4())
|
||||
current_time = datetime.now()
|
||||
@@ -35,19 +48,20 @@ def check_and_create_directory(files):
|
||||
|
||||
|
||||
async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
|
||||
logger.info(f"upload_and_save_file in service dir_id {dir_id}")
|
||||
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
pathxx.mkdir(parents=True, exist_ok=True)
|
||||
data = []
|
||||
i = 0
|
||||
for file in files:
|
||||
name, fix = os.path.splitext(file.filename)
|
||||
id = str(uuid.uuid4())
|
||||
if fix not in ['.doc', '.docx']:
|
||||
continue
|
||||
i = i + 1
|
||||
with open(pathxx.joinpath(str(i) + fix), 'wb') as f:
|
||||
with open(pathxx.joinpath(id + fix), 'wb') as f:
|
||||
file_content = await file.read()
|
||||
f.write(file_content)
|
||||
data.append(DBRESUME(id=str(uuid.uuid4()), task_id=dir_id, status=0, file_name=str(i) + fix))
|
||||
|
||||
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
|
||||
session = SqliteSqlalchemy().session
|
||||
try:
|
||||
session.bulk_save_objects(data)
|
||||
@@ -60,3 +74,67 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
|
||||
session.close()
|
||||
return True, "success"
|
||||
|
||||
async def fetch_files(dir_id) -> (bool, str):
|
||||
|
||||
logger.info(f"start fetching files task {dir_id} in service")
|
||||
if not os.path.exists(BASE_PATH):
|
||||
logger.info(f"目录{BASE_PATH}不存在")
|
||||
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
|
||||
file_extensions = ['.docx', '.doc']
|
||||
files_list = []
|
||||
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
|
||||
for root,dirs,files in os.walk(dir_path):
|
||||
for file in files:
|
||||
_,ext = os.path.splitext(file)
|
||||
if file_extensions and ext not in file_extensions:
|
||||
logger.error(f"文件{file}格式不符合预期")
|
||||
continue
|
||||
file_path = os.path.join(root,file)
|
||||
if os.path.isfile(file_path):
|
||||
files_list.append(file_path)
|
||||
else:
|
||||
logger.error(f"路径下{file_path}不是文件")
|
||||
update_success_mapping = []
|
||||
update_fail_mapping = []
|
||||
for file in files_list:
|
||||
logger.info(f"file is {file} {os.path.basename(file)}")
|
||||
file_name = os.path.basename(file)
|
||||
id = os.path.splitext(file_name)[0]
|
||||
result = extra_resume(file)
|
||||
result = json.dumps(result, ensure_ascii=False)
|
||||
logger.info(f"result type is {type(result)}")
|
||||
logger.info(f"file content is {result}")
|
||||
if not result:
|
||||
logger.warning(f"file {file_name} 提取为空")
|
||||
update_fail_mapping.append({'id':id, 'status':0,
|
||||
'message': f"task {dir_id} => file {file_name} 提取为空"})
|
||||
continue
|
||||
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
|
||||
session = SqliteSqlalchemy().session
|
||||
logger.info(f"update success mapping => {update_success_mapping}")
|
||||
logger.info(f"update fail mapping => {update_fail_mapping}")
|
||||
success_num = len(update_success_mapping)
|
||||
fail_num = len(update_fail_mapping)
|
||||
try:
|
||||
update_data = update_success_mapping + update_fail_mapping
|
||||
session.bulk_update_mappings(DBRESUME, update_data)
|
||||
|
||||
if update_fail_mapping:
|
||||
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
|
||||
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
|
||||
else:
|
||||
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
|
||||
'success_num': success_num, 'fail_num': fail_num}])
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"update failed => task {dir_id} error {e}")
|
||||
session.rollback()
|
||||
return False, f"Failed to update DBRESUME error {e}"
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
return True, 'success'
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
384
service/parse_resume2_doc.py
Normal file
384
service/parse_resume2_doc.py
Normal file
@@ -0,0 +1,384 @@
|
||||
import re
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from docx import Document
|
||||
from typing import Dict, List, Any, Tuple
|
||||
from collections import defaultdict
|
||||
|
||||
class EnhancedDocxExtractor:
|
||||
def __init__(self):
|
||||
# 定义字段名称的多种变体
|
||||
self.field_variants = {
|
||||
'姓名': ['姓名', '姓 名', '姓 名', '姓名:', '姓 名:','姓 名'],
|
||||
'性别': ['性别', '性 别', '性 别', '性别:', '性 别:','性 别'],
|
||||
'出生年月': ['出生年月', '出生年月:', '出生日期', '出生日期:'],
|
||||
'民族': ['民族', '民族:', '民 族'],
|
||||
'政治面貌': ['政治面貌', '政治面貌:', '政治面貌:'],
|
||||
'现任职单位及部门': ['现任职单位及部门', '单位及部门', '工作单位', '现任职单位'],
|
||||
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
|
||||
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
|
||||
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
|
||||
'毕业院校': ['毕业院校', '毕业学校', '毕业院校:','毕业院校系及专业'],
|
||||
'专业': ['专业', '专业:', '系及专业', '所学专业'],
|
||||
}
|
||||
|
||||
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
提取 .docx 中的表格结构数据
|
||||
"""
|
||||
doc = Document(docx_path)
|
||||
results = defaultdict(dict)
|
||||
# 分析每个表格
|
||||
for table_idx, table in enumerate(doc.tables):
|
||||
print(f"\n处理表格 {table_idx + 1} ({len(table.rows)}行 × {len(table.columns)}列)")
|
||||
|
||||
# 获取表格结构
|
||||
table_structure = self._analyze_table_structure(table)
|
||||
# 提取键值对
|
||||
kv_pairs = self._extract_from_table_structure(table, table_structure)
|
||||
# 分类存储
|
||||
for key, value in kv_pairs:
|
||||
category = self._categorize_field(key)
|
||||
results[category][key] = value
|
||||
# 提取段落中的信息
|
||||
paragraph_info = self._extract_from_paragraphs(doc.paragraphs)
|
||||
for key, value in paragraph_info:
|
||||
category = self._categorize_field(key)
|
||||
results[category][key] = value
|
||||
|
||||
return dict(results)
|
||||
|
||||
def _analyze_table_structure(self, table) -> List[List[Dict]]:
|
||||
"""
|
||||
分析表格结构,返回每个单元格的元信息
|
||||
"""
|
||||
structure = []
|
||||
|
||||
for row_idx, row in enumerate(table.rows):
|
||||
row_info = []
|
||||
for col_idx, cell in enumerate(row.cells):
|
||||
cell_text = cell.text.strip()
|
||||
# 分析单元格属性
|
||||
cell_info = {
|
||||
'text': cell_text,
|
||||
'row': row_idx,
|
||||
'col': col_idx,
|
||||
'rowspan': 1,
|
||||
'colspan': 1,
|
||||
'is_key': self._is_likely_key(cell_text),
|
||||
'is_value': self._is_likely_value(cell_text),
|
||||
}
|
||||
row_info.append(cell_info)
|
||||
structure.append(row_info)
|
||||
|
||||
return structure
|
||||
|
||||
def _extract_from_table_structure(self, table, structure) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
从表格结构中提取键值对
|
||||
"""
|
||||
kv_pairs = []
|
||||
visited = set()
|
||||
key_recode = []
|
||||
for row_idx, row in enumerate(structure):
|
||||
for col_idx, cell in enumerate(row):
|
||||
print(f"visited is {visited} ")
|
||||
print(f'row {row_idx} col {col_idx} all cell is {cell}')
|
||||
if (row_idx, col_idx) in visited:
|
||||
print(f'---{row_idx}, {col_idx} ')
|
||||
print(f'cell is {cell}')
|
||||
continue
|
||||
|
||||
if cell['is_key']:
|
||||
next_cell = structure[row_idx][col_idx+1]
|
||||
# 寻找对应的值
|
||||
print(f"cell2 is {cell} row {row_idx} col {col_idx}")
|
||||
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
|
||||
if value:
|
||||
key = self._normalize_key(cell['text'])
|
||||
found = False
|
||||
kv_pairs = [(k,v+","+value)if k == cell['text'] else (k, v) for k,v in kv_pairs ]
|
||||
for i, (k,v) in enumerate(kv_pairs):
|
||||
if k == cell['text']:
|
||||
kv_pairs[i] = (k,value)
|
||||
found = True
|
||||
if not found:
|
||||
kv_pairs.append((key, value))
|
||||
|
||||
else:
|
||||
print("不是key")
|
||||
return kv_pairs
|
||||
|
||||
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
|
||||
"""
|
||||
为键找到对应的值
|
||||
"""
|
||||
# 尝试右侧单元格
|
||||
if key_col + 1 < len(structure[key_row]):
|
||||
value_cell = structure[key_row][key_col + 1]
|
||||
current_key_cell = structure[key_row][key_col]
|
||||
if value_cell['is_key']:
|
||||
return None
|
||||
# 特殊处理学历
|
||||
spec_coll = ['全日制教育','在职教育']
|
||||
if current_key_cell['text'].replace('\n','') in spec_coll :
|
||||
if not value_cell['text']:
|
||||
value_cell['text'] = 'False'
|
||||
else:
|
||||
value_cell['text'] = 'True'
|
||||
|
||||
if value_cell['text'] and (key_row, key_col + 1) not in visited:
|
||||
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
|
||||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||||
print("前一个不重复")
|
||||
print(f"visited add {key_row} {key_col + 1}")
|
||||
visited.add((key_row, key_col + 1))
|
||||
return value_cell['text']
|
||||
else:
|
||||
current_key = structure[key_row][key_col]['text']
|
||||
print(f"key值重复------------------------------key {current_key}")
|
||||
for key, value in kv_pairs:
|
||||
if key == current_key:
|
||||
return value+","+value_cell['text']
|
||||
|
||||
|
||||
# 尝试下方单元格
|
||||
if key_row + 1 < len(structure):
|
||||
value_cell = structure[key_row + 1][key_col]
|
||||
if value_cell['text'] and (key_row + 1, key_col) not in visited:
|
||||
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
|
||||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||||
print("下一个不重复")
|
||||
print(f"visited add {key_row} {key_col + 1}")
|
||||
visited.add((key_row + 1, key_col))
|
||||
return value_cell['text']
|
||||
|
||||
# 尝试合并单元格的情况
|
||||
for row_idx in range(len(structure)):
|
||||
for col_idx in range(len(structure[row_idx])):
|
||||
cell = structure[row_idx][col_idx]
|
||||
if (row_idx, col_idx) not in visited and cell['text']:
|
||||
# 检查是否在键的附近
|
||||
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
|
||||
# 检查这个值是否与前一个键提取的值相同
|
||||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||||
print("合并不重复")
|
||||
print(f"visited add {key_row} {key_col + 1}")
|
||||
visited.add((row_idx, col_idx))
|
||||
return cell['text']
|
||||
return None
|
||||
|
||||
|
||||
def _is_key_duplicate_merged_cell(self, text, kv_pairs) -> bool:
|
||||
"""
|
||||
检查当前文本value是否可能和已收录的kv集合里的key值重复
|
||||
如下例:1行0列 ,2行0列 都是毕业院校
|
||||
第一次 1行0列:1行2列组成key:value
|
||||
第二次到2行0列,检测到 毕业院校已出现在kv_pairs中,不再组合2行0列:2行1列
|
||||
| 硕士学位/研究生学历:中国科学院计算技术研究所计算机技术专业
|
||||
毕业院校 |——————————————————————————————————————————————————
|
||||
|
|
||||
|————————————————————————————————————————————————————
|
||||
"""
|
||||
|
||||
for k, v in kv_pairs:
|
||||
if text == k:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def extract_parentheses_content(self, text):
|
||||
# 使用正则表达式提取括号内的所有内容
|
||||
matches = re.findall(r'[((]([^))]*)[))]', text)
|
||||
|
||||
return matches # 返回列表,可能包含多个括号
|
||||
|
||||
def _is_likely_key(self, text: str) -> bool:
|
||||
"""判断文本是否可能是键"""
|
||||
if not text or len(text) > 20:
|
||||
return False
|
||||
|
||||
# 检查是否包含常见字段词
|
||||
key_indicators = ['籍贯','籍 贯','政治面貌','政治\n面貌','姓名','性别','姓 名', '性 别', '出生年月', '民族','民 族', '单位', '部门','联系地址','主要学习经历','全日制教育','在职教育',
|
||||
'职务','职 务','职\n务', '职称','职 称', '电话', '地址', '学历', '学位','现任职务','职业资格','奖惩情况(近三年主要奖惩信息)'
|
||||
'专业', '岗位', '经历', '时间', '资格','现任职单位及部门','身份证号','婚姻状况','健康状况','应聘岗位','应聘部门/岗位','毕业院校系及专业']
|
||||
|
||||
for indicator in key_indicators:
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
indicator = indicator.translate(translation_table)
|
||||
text = text.translate(translation_table)
|
||||
if indicator in text:
|
||||
return True
|
||||
|
||||
# 检查是否有冒号(中文文档常用)
|
||||
if ':' in text or ':' in text:
|
||||
key_part = text.split(':')[0].split(':')[0]
|
||||
if any(indicator in key_part for indicator in key_indicators):
|
||||
return True
|
||||
|
||||
for indicator in key_indicators:
|
||||
print("indicator is ===============================", indicator)
|
||||
print("text is ===============================", text)
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
indicator = indicator.translate(translation_table)
|
||||
text = text.translate(translation_table)
|
||||
clean_text = self.extract_parentheses_content(text)
|
||||
print(text)
|
||||
clean_indicator = self.extract_parentheses_content(indicator)
|
||||
print(indicator)
|
||||
if not clean_text:
|
||||
print("特殊匹配失败")
|
||||
return False
|
||||
if clean_indicator:
|
||||
print("开始匹配=========")
|
||||
clean_text = clean_text[0]
|
||||
clean_indicator = clean_indicator[0]
|
||||
if clean_indicator in clean_text:
|
||||
print(f"特殊情况匹配成功======={text}")
|
||||
return True
|
||||
else:
|
||||
print("继续匹配")
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
def _is_likely_value(self, text: str) -> bool:
|
||||
"""判断文本是否可能是值"""
|
||||
if not text:
|
||||
return False
|
||||
|
||||
# 值通常不是常见的字段名
|
||||
if self._is_likely_key(text):
|
||||
return False
|
||||
|
||||
# 值可能包含特定内容
|
||||
if re.match(r'^\d{11}$', text): # 手机号
|
||||
return True
|
||||
if re.match(r'^\d{4}年', text): # 日期
|
||||
return True
|
||||
if len(text) > 10: # 长文本可能是值
|
||||
return True
|
||||
|
||||
return True
|
||||
|
||||
def _normalize_key(self, key_text: str) -> str:
|
||||
"""标准化键名"""
|
||||
# 移除冒号和空格
|
||||
key_text = re.sub(r'[::\s]+$', '', key_text)
|
||||
|
||||
# 映射到标准键名
|
||||
for std_key, variants in self.field_variants.items():
|
||||
for variant in variants:
|
||||
if variant == key_text or key_text in variant:
|
||||
return std_key
|
||||
|
||||
return key_text
|
||||
|
||||
def _categorize_field(self, key: str) -> str:
|
||||
"""将字段分类"""
|
||||
categories = {
|
||||
'基本信息': ['姓名', '性别', '出生年月', '民族', '政治面貌','学历学位','毕业院校系及专业','全日制教育','在职教育'
|
||||
'婚姻状况', '健康状况', '籍贯', '身份证号','联系电话','婚姻状况','健康状况','身份证号','联系电话(手机)','毕业院校系及专业','联系地址','主要学习经历','奖惩情况(近三年主要奖惩信息)'],
|
||||
'工作信息': ['现任职单位及部门', '现任职务', '职称', '职业资格',
|
||||
'参加工作时间', '职称取得时间','应聘部门/岗位','是否接受调剂职级/岗位','奖惩情况(近三年主要奖惩信息)'],
|
||||
}
|
||||
|
||||
for category, fields in categories.items():
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
key = key.translate(translation_table)
|
||||
if key in fields:
|
||||
# print(f"filed is {fields} key is {key} ")
|
||||
return category
|
||||
|
||||
return '其他信息'
|
||||
|
||||
def _extract_from_paragraphs(self, paragraphs) -> List[Tuple[str, str]]:
|
||||
"""从段落中提取信息"""
|
||||
kv_pairs = []
|
||||
|
||||
for para in paragraphs:
|
||||
text = para.text.strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# 尝试提取冒号分隔的键值对
|
||||
if ':' in text or ':' in text:
|
||||
separator = ':' if ':' in text else ':'
|
||||
parts = text.split(separator, 1)
|
||||
|
||||
if len(parts) == 2:
|
||||
key = parts[0].strip()
|
||||
value = parts[1].strip()
|
||||
|
||||
if self._is_likely_key(key) and value:
|
||||
normalized_key = self._normalize_key(key)
|
||||
kv_pairs.append((normalized_key, value))
|
||||
|
||||
return kv_pairs
|
||||
|
||||
|
||||
# 快速使用示例
|
||||
def quick_extract(docx_path: str):
|
||||
"""快速提取并显示结果"""
|
||||
extractor = EnhancedDocxExtractor()
|
||||
|
||||
try:
|
||||
result = extractor.extract_with_table_structure(docx_path)
|
||||
print("\n提取结果 (键值对格式):")
|
||||
print("=" * 60)
|
||||
|
||||
for category, fields in result.items():
|
||||
if fields:
|
||||
print(f"\n{category}:")
|
||||
for key, value in fields.items():
|
||||
print(f" {key}: {value}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
print(f"提取失败: {e}")
|
||||
|
||||
|
||||
base_map = ['姓名','性别','籍贯','政治面貌','出生年月','身份证号','现居住地','民族','学历','学位','学历学位','特长','联系电话','联系电话(手机)',
|
||||
'婚姻状况','健康状况','毕业院校系及专业','主要学习经历','联系地址','入党/团时间','全日制教育','在职教育','奖惩情况(近三年主要奖惩信息)']
|
||||
work_map = ['参加工作时间','现任职单位及部门','职务','现任职务','职称','奖惩','工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位']
|
||||
other_map = ['工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位','是否接受调剂职级/岗位']
|
||||
|
||||
|
||||
|
||||
def fetch_info(data):
|
||||
map_word = base_map + work_map + other_map
|
||||
print("data is {0}".format(data))
|
||||
print("map_word is {0}".format(map_word))
|
||||
final_res = {}
|
||||
for key, value in data.items():
|
||||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||||
clean_key = key.translate(translation_table)
|
||||
print(f"key is {clean_key} ")
|
||||
if clean_key in map_word:
|
||||
# clean_value = value.translate(translation_table)
|
||||
final_res[clean_key] = value
|
||||
|
||||
return final_res
|
||||
|
||||
|
||||
def extra_resume(file_path):
|
||||
result = quick_extract(file_path)
|
||||
print(result)
|
||||
base_data = result['基本信息']
|
||||
work_data = result['工作信息']
|
||||
other_data = result['其他信息']
|
||||
data = {}
|
||||
data.update(base_data)
|
||||
data.update(work_data)
|
||||
data.update(other_data)
|
||||
res = fetch_info(data)
|
||||
return res
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# # 使用方法
|
||||
# docx_file = "../1.报名登记表.docx" # 替换为你的文件
|
||||
# print(extra_resume(docx_file))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user