Compare commits

...

7 Commits

Author SHA1 Message Date
yujj128
16583dbb06 简历提取,写入 2025-12-06 17:04:05 +08:00
yujj128
ec0995d08a Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	main.py
2025-12-06 16:48:51 +08:00
yujj128
a124651a7e 简历提取,写入 2025-12-06 16:44:53 +08:00
yujj128
e9d225939a Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	requirements.txt
2025-12-06 11:19:38 +08:00
yujj128
ff1c0e890c 简历提取 2025-12-06 11:19:09 +08:00
yujj128
f1063146d2 Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	requirements.txt
2025-12-05 20:49:00 +08:00
yujj128
992bab2887 简历提取 2025-12-05 20:48:27 +08:00
7 changed files with 553 additions and 12 deletions

View File

@@ -3,7 +3,8 @@ from sqlalchemy.orm import declarative_base, sessionmaker
# 申明基类对象 # 申明基类对象
Base = declarative_base() Base = declarative_base()
from decouple import config from decouple import config
DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
DB_PATH = config('DB_PATH', default='D://PycharmProject//yj_resume//main.sqlite3')
class DBTASK(Base): class DBTASK(Base):

62
logging_config.py Normal file
View File

@@ -0,0 +1,62 @@
# logging_config.py
import logging
import logging.config
from pathlib import Path
# 确保 logs 目录存在
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
},
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "default",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler", # 自动轮转
"level": "INFO",
"formatter": "detailed",
"filename": "logs/resume.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5, # 保留5个备份
"encoding": "utf8"
},
},
"root": {
"level": "INFO",
"handlers": ["console", "file"]
},
"loggers": {
"uvicorn": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn.error": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn.access": {
"level": "WARNING", # 只记录警告以上,避免刷屏
"handlers": ["file"], # 只写入文件
"propagate": False
}
}
}
# 应用配置
logging.config.dictConfig(LOGGING_CONFIG)

16
main.py
View File

@@ -2,10 +2,16 @@ from fastapi import FastAPI
import uvicorn import uvicorn
from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi import FastAPI, File, UploadFile, HTTPException
from typing import List from typing import List
from service.file_service import check_and_create_directory, upload_and_save_file from service.file_service import check_and_create_directory, upload_and_save_file,fetch_files
from service import excel_service from service import excel_service
from service.db_service import get_task_list from service.db_service import get_task_list
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
import threading
from logging_config import LOGGING_CONFIG
import logging
logger = logging.getLogger(__name__)
app = FastAPI() app = FastAPI()
@@ -20,11 +26,11 @@ async def create_upload_files(files: List[UploadFile] = File(...)):
dir_id = check_and_create_directory(files) dir_id = check_and_create_directory(files)
if not dir_id: if not dir_id:
return {"result": False, "code": 500, "message": "create directory failed"} return {"result": False, "code": 500, "message": "create directory failed"}
flag, message = await upload_and_save_file(dir_id, files) flag, message= await upload_and_save_file(dir_id, files)
logger.info(f"flag is {flag}")
if flag: if flag:
# 触发异步任务,解析文件 TODO flag,message = await fetch_files(dir_id)
pass return {"result": flag, "message": message,"task_id": dir_id}
return {"result": flag, "message": message}
@app.get("/export_task_data_to_excel") @app.get("/export_task_data_to_excel")

View File

@@ -7,3 +7,12 @@ python-decouple
python-multipart python-multipart
pandas pandas
openpyxl openpyxl
python-multipart
PyMuPDF>=1.23.0
paddlepaddle>=2.5.0
paddleocr>=2.7.0.3
opencv-python>=4.8.0
numpy>=1.24.0
pdf2image>=1.16.3
Pillow>=10.0.0

View File

@@ -4,7 +4,8 @@ import pandas as pd
import pathlib import pathlib
from decouple import config from decouple import config
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//') # BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
# 导出数据到excel # 导出数据到excel

View File

@@ -1,3 +1,7 @@
import json
from pymupdf import message
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
import uuid import uuid
from datetime import datetime from datetime import datetime
@@ -6,13 +10,22 @@ import pathlib
from fastapi import File, UploadFile from fastapi import File, UploadFile
from typing import List from typing import List
import os import os
import asyncio
import logging
from service.parse_resume2_doc import extra_resume
logger = logging.getLogger(__name__)
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
def check_and_create_directory(files): def check_and_create_directory(files):
logger.info("check_and_create_directory in service")
# 先创建一个task # 先创建一个task
if not files or len(files) == 0: if not files or len(files) == 0:
logger.warning("check_and_create_directory is empty")
return None return None
id = str(uuid.uuid4()) id = str(uuid.uuid4())
current_time = datetime.now() current_time = datetime.now()
@@ -35,19 +48,20 @@ def check_and_create_directory(files):
async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str): async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
logger.info(f"upload_and_save_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id) pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathxx.mkdir(parents=True, exist_ok=True) pathxx.mkdir(parents=True, exist_ok=True)
data = [] data = []
i = 0
for file in files: for file in files:
name, fix = os.path.splitext(file.filename) name, fix = os.path.splitext(file.filename)
id = str(uuid.uuid4())
if fix not in ['.doc', '.docx']: if fix not in ['.doc', '.docx']:
continue continue
i = i + 1 with open(pathxx.joinpath(id + fix), 'wb') as f:
with open(pathxx.joinpath(str(i) + fix), 'wb') as f:
file_content = await file.read() file_content = await file.read()
f.write(file_content) f.write(file_content)
data.append(DBRESUME(id=str(uuid.uuid4()), task_id=dir_id, status=0, file_name=str(i) + fix))
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
session = SqliteSqlalchemy().session session = SqliteSqlalchemy().session
try: try:
session.bulk_save_objects(data) session.bulk_save_objects(data)
@@ -60,3 +74,67 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
session.close() session.close()
return True, "success" return True, "success"
async def fetch_files(dir_id) -> (bool, str):
logger.info(f"start fetching files task {dir_id} in service")
if not os.path.exists(BASE_PATH):
logger.info(f"目录{BASE_PATH}不存在")
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
file_extensions = ['.docx', '.doc']
files_list = []
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
for root,dirs,files in os.walk(dir_path):
for file in files:
_,ext = os.path.splitext(file)
if file_extensions and ext not in file_extensions:
logger.error(f"文件{file}格式不符合预期")
continue
file_path = os.path.join(root,file)
if os.path.isfile(file_path):
files_list.append(file_path)
else:
logger.error(f"路径下{file_path}不是文件")
update_success_mapping = []
update_fail_mapping = []
for file in files_list:
logger.info(f"file is {file} {os.path.basename(file)}")
file_name = os.path.basename(file)
id = os.path.splitext(file_name)[0]
result = extra_resume(file)
result = json.dumps(result, ensure_ascii=False)
logger.info(f"result type is {type(result)}")
logger.info(f"file content is {result}")
if not result:
logger.warning(f"file {file_name} 提取为空")
update_fail_mapping.append({'id':id, 'status':0,
'message': f"task {dir_id} => file {file_name} 提取为空"})
continue
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
session = SqliteSqlalchemy().session
logger.info(f"update success mapping => {update_success_mapping}")
logger.info(f"update fail mapping => {update_fail_mapping}")
success_num = len(update_success_mapping)
fail_num = len(update_fail_mapping)
try:
update_data = update_success_mapping + update_fail_mapping
session.bulk_update_mappings(DBRESUME, update_data)
if update_fail_mapping:
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
else:
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
'success_num': success_num, 'fail_num': fail_num}])
session.commit()
except Exception as e:
logger.error(f"update failed => task {dir_id} error {e}")
session.rollback()
return False, f"Failed to update DBRESUME error {e}"
finally:
session.close()
return True, 'success'

View File

@@ -0,0 +1,384 @@
import re
import json
from pathlib import Path
from docx import Document
from typing import Dict, List, Any, Tuple
from collections import defaultdict
class EnhancedDocxExtractor:
def __init__(self):
# 定义字段名称的多种变体
self.field_variants = {
'姓名': ['姓名', '姓 名', '姓 名', '姓名:', '姓 名:','姓 名'],
'性别': ['性别', '性 别', '性 别', '性别:', '性 别:','性 别'],
'出生年月': ['出生年月', '出生年月:', '出生日期', '出生日期:'],
'民族': ['民族', '民族:', '民 族'],
'政治面貌': ['政治面貌', '政治面貌:', '政治面貌:'],
'现任职单位及部门': ['现任职单位及部门', '单位及部门', '工作单位', '现任职单位'],
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
'毕业院校': ['毕业院校', '毕业学校', '毕业院校:','毕业院校系及专业'],
'专业': ['专业', '专业:', '系及专业', '所学专业'],
}
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
"""
提取 .docx 中的表格结构数据
"""
doc = Document(docx_path)
results = defaultdict(dict)
# 分析每个表格
for table_idx, table in enumerate(doc.tables):
print(f"\n处理表格 {table_idx + 1} ({len(table.rows)}× {len(table.columns)}列)")
# 获取表格结构
table_structure = self._analyze_table_structure(table)
# 提取键值对
kv_pairs = self._extract_from_table_structure(table, table_structure)
# 分类存储
for key, value in kv_pairs:
category = self._categorize_field(key)
results[category][key] = value
# 提取段落中的信息
paragraph_info = self._extract_from_paragraphs(doc.paragraphs)
for key, value in paragraph_info:
category = self._categorize_field(key)
results[category][key] = value
return dict(results)
def _analyze_table_structure(self, table) -> List[List[Dict]]:
"""
分析表格结构,返回每个单元格的元信息
"""
structure = []
for row_idx, row in enumerate(table.rows):
row_info = []
for col_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
# 分析单元格属性
cell_info = {
'text': cell_text,
'row': row_idx,
'col': col_idx,
'rowspan': 1,
'colspan': 1,
'is_key': self._is_likely_key(cell_text),
'is_value': self._is_likely_value(cell_text),
}
row_info.append(cell_info)
structure.append(row_info)
return structure
def _extract_from_table_structure(self, table, structure) -> List[Tuple[str, str]]:
"""
从表格结构中提取键值对
"""
kv_pairs = []
visited = set()
key_recode = []
for row_idx, row in enumerate(structure):
for col_idx, cell in enumerate(row):
print(f"visited is {visited} ")
print(f'row {row_idx} col {col_idx} all cell is {cell}')
if (row_idx, col_idx) in visited:
print(f'---{row_idx}, {col_idx} ')
print(f'cell is {cell}')
continue
if cell['is_key']:
next_cell = structure[row_idx][col_idx+1]
# 寻找对应的值
print(f"cell2 is {cell} row {row_idx} col {col_idx}")
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
if value:
key = self._normalize_key(cell['text'])
found = False
kv_pairs = [(k,v+","+value)if k == cell['text'] else (k, v) for k,v in kv_pairs ]
for i, (k,v) in enumerate(kv_pairs):
if k == cell['text']:
kv_pairs[i] = (k,value)
found = True
if not found:
kv_pairs.append((key, value))
else:
print("不是key")
return kv_pairs
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
"""
为键找到对应的值
"""
# 尝试右侧单元格
if key_col + 1 < len(structure[key_row]):
value_cell = structure[key_row][key_col + 1]
current_key_cell = structure[key_row][key_col]
if value_cell['is_key']:
return None
# 特殊处理学历
spec_coll = ['全日制教育','在职教育']
if current_key_cell['text'].replace('\n','') in spec_coll :
if not value_cell['text']:
value_cell['text'] = 'False'
else:
value_cell['text'] = 'True'
if value_cell['text'] and (key_row, key_col + 1) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("前一个不重复")
print(f"visited add {key_row} {key_col + 1}")
visited.add((key_row, key_col + 1))
return value_cell['text']
else:
current_key = structure[key_row][key_col]['text']
print(f"key值重复------------------------------key {current_key}")
for key, value in kv_pairs:
if key == current_key:
return value+","+value_cell['text']
# 尝试下方单元格
if key_row + 1 < len(structure):
value_cell = structure[key_row + 1][key_col]
if value_cell['text'] and (key_row + 1, key_col) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("下一个不重复")
print(f"visited add {key_row} {key_col + 1}")
visited.add((key_row + 1, key_col))
return value_cell['text']
# 尝试合并单元格的情况
for row_idx in range(len(structure)):
for col_idx in range(len(structure[row_idx])):
cell = structure[row_idx][col_idx]
if (row_idx, col_idx) not in visited and cell['text']:
# 检查是否在键的附近
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
# 检查这个值是否与前一个键提取的值相同
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("合并不重复")
print(f"visited add {key_row} {key_col + 1}")
visited.add((row_idx, col_idx))
return cell['text']
return None
def _is_key_duplicate_merged_cell(self, text, kv_pairs) -> bool:
"""
检查当前文本value是否可能和已收录的kv集合里的key值重复
如下例1行0列 2行0列 都是毕业院校
第一次 1行0列:1行2列组成key:value
第二次到2行0列检测到 毕业院校已出现在kv_pairs中不再组合2行0列:2行1列
| 硕士学位/研究生学历:中国科学院计算技术研究所计算机技术专业
毕业院校 |——————————————————————————————————————————————————
|
|————————————————————————————————————————————————————
"""
for k, v in kv_pairs:
if text == k:
return True
return False
def extract_parentheses_content(self, text):
# 使用正则表达式提取括号内的所有内容
matches = re.findall(r'[(]([^)]*)[)]', text)
return matches # 返回列表,可能包含多个括号
def _is_likely_key(self, text: str) -> bool:
"""判断文本是否可能是键"""
if not text or len(text) > 20:
return False
# 检查是否包含常见字段词
key_indicators = ['籍贯','籍 贯','政治面貌','政治\n面貌','姓名','性别','姓 名', '性 别', '出生年月', '民族','民 族', '单位', '部门','联系地址','主要学习经历','全日制教育','在职教育',
'职务','职 务','\n', '职称','职 称', '电话', '地址', '学历', '学位','现任职务','职业资格','奖惩情况(近三年主要奖惩信息)'
'专业', '岗位', '经历', '时间', '资格','现任职单位及部门','身份证号','婚姻状况','健康状况','应聘岗位','应聘部门/岗位','毕业院校系及专业']
for indicator in key_indicators:
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
indicator = indicator.translate(translation_table)
text = text.translate(translation_table)
if indicator in text:
return True
# 检查是否有冒号(中文文档常用)
if '' in text or ':' in text:
key_part = text.split('')[0].split(':')[0]
if any(indicator in key_part for indicator in key_indicators):
return True
for indicator in key_indicators:
print("indicator is ===============================", indicator)
print("text is ===============================", text)
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
indicator = indicator.translate(translation_table)
text = text.translate(translation_table)
clean_text = self.extract_parentheses_content(text)
print(text)
clean_indicator = self.extract_parentheses_content(indicator)
print(indicator)
if not clean_text:
print("特殊匹配失败")
return False
if clean_indicator:
print("开始匹配=========")
clean_text = clean_text[0]
clean_indicator = clean_indicator[0]
if clean_indicator in clean_text:
print(f"特殊情况匹配成功======={text}")
return True
else:
print("继续匹配")
continue
return False
def _is_likely_value(self, text: str) -> bool:
"""判断文本是否可能是值"""
if not text:
return False
# 值通常不是常见的字段名
if self._is_likely_key(text):
return False
# 值可能包含特定内容
if re.match(r'^\d{11}$', text): # 手机号
return True
if re.match(r'^\d{4}', text): # 日期
return True
if len(text) > 10: # 长文本可能是值
return True
return True
def _normalize_key(self, key_text: str) -> str:
"""标准化键名"""
# 移除冒号和空格
key_text = re.sub(r'[:\s]+$', '', key_text)
# 映射到标准键名
for std_key, variants in self.field_variants.items():
for variant in variants:
if variant == key_text or key_text in variant:
return std_key
return key_text
def _categorize_field(self, key: str) -> str:
"""将字段分类"""
categories = {
'基本信息': ['姓名', '性别', '出生年月', '民族', '政治面貌','学历学位','毕业院校系及专业','全日制教育','在职教育'
'婚姻状况', '健康状况', '籍贯', '身份证号','联系电话','婚姻状况','健康状况','身份证号','联系电话(手机)','毕业院校系及专业','联系地址','主要学习经历','奖惩情况(近三年主要奖惩信息)'],
'工作信息': ['现任职单位及部门', '现任职务', '职称', '职业资格',
'参加工作时间', '职称取得时间','应聘部门/岗位','是否接受调剂职级/岗位','奖惩情况(近三年主要奖惩信息)'],
}
for category, fields in categories.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
key = key.translate(translation_table)
if key in fields:
# print(f"filed is {fields} key is {key} ")
return category
return '其他信息'
def _extract_from_paragraphs(self, paragraphs) -> List[Tuple[str, str]]:
"""从段落中提取信息"""
kv_pairs = []
for para in paragraphs:
text = para.text.strip()
if not text:
continue
# 尝试提取冒号分隔的键值对
if '' in text or ':' in text:
separator = '' if '' in text else ':'
parts = text.split(separator, 1)
if len(parts) == 2:
key = parts[0].strip()
value = parts[1].strip()
if self._is_likely_key(key) and value:
normalized_key = self._normalize_key(key)
kv_pairs.append((normalized_key, value))
return kv_pairs
# 快速使用示例
def quick_extract(docx_path: str):
"""快速提取并显示结果"""
extractor = EnhancedDocxExtractor()
try:
result = extractor.extract_with_table_structure(docx_path)
print("\n提取结果 (键值对格式):")
print("=" * 60)
for category, fields in result.items():
if fields:
print(f"\n{category}:")
for key, value in fields.items():
print(f" {key}: {value}")
return result
except Exception as e:
print(f"提取失败: {e}")
base_map = ['姓名','性别','籍贯','政治面貌','出生年月','身份证号','现居住地','民族','学历','学位','学历学位','特长','联系电话','联系电话(手机)',
'婚姻状况','健康状况','毕业院校系及专业','主要学习经历','联系地址','入党/团时间','全日制教育','在职教育','奖惩情况(近三年主要奖惩信息)']
work_map = ['参加工作时间','现任职单位及部门','职务','现任职务','职称','奖惩','工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位']
other_map = ['工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位','是否接受调剂职级/岗位']
def fetch_info(data):
map_word = base_map + work_map + other_map
print("data is {0}".format(data))
print("map_word is {0}".format(map_word))
final_res = {}
for key, value in data.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
clean_key = key.translate(translation_table)
print(f"key is {clean_key} ")
if clean_key in map_word:
# clean_value = value.translate(translation_table)
final_res[clean_key] = value
return final_res
def extra_resume(file_path):
result = quick_extract(file_path)
print(result)
base_data = result['基本信息']
work_data = result['工作信息']
other_data = result['其他信息']
data = {}
data.update(base_data)
data.update(work_data)
data.update(other_data)
res = fetch_info(data)
return res
# if __name__ == "__main__":
# # 使用方法
# docx_file = "../1.报名登记表.docx" # 替换为你的文件
# print(extra_resume(docx_file))