Compare commits

...

7 Commits

Author SHA1 Message Date
yujj128
16583dbb06 简历提取,写入 2025-12-06 17:04:05 +08:00
yujj128
ec0995d08a Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	main.py
2025-12-06 16:48:51 +08:00
yujj128
a124651a7e 简历提取,写入 2025-12-06 16:44:53 +08:00
yujj128
e9d225939a Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	requirements.txt
2025-12-06 11:19:38 +08:00
yujj128
ff1c0e890c 简历提取 2025-12-06 11:19:09 +08:00
yujj128
f1063146d2 Merge branch 'dev' of http://106.13.42.156:33077/lei_y601/yj_resume
# Conflicts:
#	requirements.txt
2025-12-05 20:49:00 +08:00
yujj128
992bab2887 简历提取 2025-12-05 20:48:27 +08:00
7 changed files with 553 additions and 12 deletions

View File

@@ -3,7 +3,8 @@ from sqlalchemy.orm import declarative_base, sessionmaker
# 申明基类对象
Base = declarative_base()
from decouple import config
DB_PATH = config('DB_PATH', default='E://pyptoject//yj_resume//main.sqlite3')
DB_PATH = config('DB_PATH', default='D://PycharmProject//yj_resume//main.sqlite3')
class DBTASK(Base):

62
logging_config.py Normal file
View File

@@ -0,0 +1,62 @@
# logging_config.py
import logging
import logging.config
from pathlib import Path
# 确保 logs 目录存在
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s",
},
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s",
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "default",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler", # 自动轮转
"level": "INFO",
"formatter": "detailed",
"filename": "logs/resume.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5, # 保留5个备份
"encoding": "utf8"
},
},
"root": {
"level": "INFO",
"handlers": ["console", "file"]
},
"loggers": {
"uvicorn": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn.error": {
"level": "INFO",
"handlers": ["console", "file"],
"propagate": False
},
"uvicorn.access": {
"level": "WARNING", # 只记录警告以上,避免刷屏
"handlers": ["file"], # 只写入文件
"propagate": False
}
}
}
# 应用配置
logging.config.dictConfig(LOGGING_CONFIG)

16
main.py
View File

@@ -2,10 +2,16 @@ from fastapi import FastAPI
import uvicorn
from fastapi import FastAPI, File, UploadFile, HTTPException
from typing import List
from service.file_service import check_and_create_directory, upload_and_save_file
from service.file_service import check_and_create_directory, upload_and_save_file,fetch_files
from service import excel_service
from service.db_service import get_task_list
from fastapi.responses import FileResponse
import threading
from logging_config import LOGGING_CONFIG
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
@@ -20,11 +26,11 @@ async def create_upload_files(files: List[UploadFile] = File(...)):
dir_id = check_and_create_directory(files)
if not dir_id:
return {"result": False, "code": 500, "message": "create directory failed"}
flag, message = await upload_and_save_file(dir_id, files)
flag, message= await upload_and_save_file(dir_id, files)
logger.info(f"flag is {flag}")
if flag:
# 触发异步任务,解析文件 TODO
pass
return {"result": flag, "message": message}
flag,message = await fetch_files(dir_id)
return {"result": flag, "message": message,"task_id": dir_id}
@app.get("/export_task_data_to_excel")

View File

@@ -7,3 +7,12 @@ python-decouple
python-multipart
pandas
openpyxl
python-multipart
PyMuPDF>=1.23.0
paddlepaddle>=2.5.0
paddleocr>=2.7.0.3
opencv-python>=4.8.0
numpy>=1.24.0
pdf2image>=1.16.3
Pillow>=10.0.0

View File

@@ -4,7 +4,8 @@ import pandas as pd
import pathlib
from decouple import config
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
# BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
# 导出数据到excel

View File

@@ -1,3 +1,7 @@
import json
from pymupdf import message
from db.sql_db import DBTASK, DBRESUME, SqliteSqlalchemy
import uuid
from datetime import datetime
@@ -6,13 +10,22 @@ import pathlib
from fastapi import File, UploadFile
from typing import List
import os
import asyncio
import logging
from service.parse_resume2_doc import extra_resume
logger = logging.getLogger(__name__)
BASE_PATH = config('BASE_PATH', default='D://PycharmProject//yj_resume//uploads//')
BASE_PATH = config('BASE_PATH', default='E://pyptoject//yj_resume//')
def check_and_create_directory(files):
logger.info("check_and_create_directory in service")
# 先创建一个task
if not files or len(files) == 0:
logger.warning("check_and_create_directory is empty")
return None
id = str(uuid.uuid4())
current_time = datetime.now()
@@ -35,19 +48,20 @@ def check_and_create_directory(files):
async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
logger.info(f"upload_and_save_file in service dir_id {dir_id}")
pathxx = pathlib.Path(BASE_PATH).joinpath(dir_id)
pathxx.mkdir(parents=True, exist_ok=True)
data = []
i = 0
for file in files:
name, fix = os.path.splitext(file.filename)
id = str(uuid.uuid4())
if fix not in ['.doc', '.docx']:
continue
i = i + 1
with open(pathxx.joinpath(str(i) + fix), 'wb') as f:
with open(pathxx.joinpath(id + fix), 'wb') as f:
file_content = await file.read()
f.write(file_content)
data.append(DBRESUME(id=str(uuid.uuid4()), task_id=dir_id, status=0, file_name=str(i) + fix))
data.append(DBRESUME(id=id, task_id=dir_id, status=0, file_name=id + fix))
session = SqliteSqlalchemy().session
try:
session.bulk_save_objects(data)
@@ -60,3 +74,67 @@ async def upload_and_save_file(dir_id, files: List[UploadFile]) -> (bool, str):
session.close()
return True, "success"
async def fetch_files(dir_id) -> (bool, str):
logger.info(f"start fetching files task {dir_id} in service")
if not os.path.exists(BASE_PATH):
logger.info(f"目录{BASE_PATH}不存在")
return False, f"Failed to fetch file 目录{BASE_PATH}不存在"
file_extensions = ['.docx', '.doc']
files_list = []
dir_path = pathlib.Path(BASE_PATH).joinpath(dir_id)
for root,dirs,files in os.walk(dir_path):
for file in files:
_,ext = os.path.splitext(file)
if file_extensions and ext not in file_extensions:
logger.error(f"文件{file}格式不符合预期")
continue
file_path = os.path.join(root,file)
if os.path.isfile(file_path):
files_list.append(file_path)
else:
logger.error(f"路径下{file_path}不是文件")
update_success_mapping = []
update_fail_mapping = []
for file in files_list:
logger.info(f"file is {file} {os.path.basename(file)}")
file_name = os.path.basename(file)
id = os.path.splitext(file_name)[0]
result = extra_resume(file)
result = json.dumps(result, ensure_ascii=False)
logger.info(f"result type is {type(result)}")
logger.info(f"file content is {result}")
if not result:
logger.warning(f"file {file_name} 提取为空")
update_fail_mapping.append({'id':id, 'status':0,
'message': f"task {dir_id} => file {file_name} 提取为空"})
continue
update_success_mapping.append({'id':id, 'status':1,'data_info': result})
session = SqliteSqlalchemy().session
logger.info(f"update success mapping => {update_success_mapping}")
logger.info(f"update fail mapping => {update_fail_mapping}")
success_num = len(update_success_mapping)
fail_num = len(update_fail_mapping)
try:
update_data = update_success_mapping + update_fail_mapping
session.bulk_update_mappings(DBRESUME, update_data)
if update_fail_mapping:
session.bulk_update_mappings(DBTASK, [{'id':dir_id, 'status':2, 'success_num':success_num,
'fail_num':fail_num,'message':f'fail => {update_fail_mapping}'}])
else:
session.bulk_update_mappings(DBTASK, [{'id': dir_id, 'status': 1,
'success_num': success_num, 'fail_num': fail_num}])
session.commit()
except Exception as e:
logger.error(f"update failed => task {dir_id} error {e}")
session.rollback()
return False, f"Failed to update DBRESUME error {e}"
finally:
session.close()
return True, 'success'

View File

@@ -0,0 +1,384 @@
import re
import json
from pathlib import Path
from docx import Document
from typing import Dict, List, Any, Tuple
from collections import defaultdict
class EnhancedDocxExtractor:
def __init__(self):
# 定义字段名称的多种变体
self.field_variants = {
'姓名': ['姓名', '姓 名', '姓 名', '姓名:', '姓 名:','姓 名'],
'性别': ['性别', '性 别', '性 别', '性别:', '性 别:','性 别'],
'出生年月': ['出生年月', '出生年月:', '出生日期', '出生日期:'],
'民族': ['民族', '民族:', '民 族'],
'政治面貌': ['政治面貌', '政治面貌:', '政治面貌:'],
'现任职单位及部门': ['现任职单位及部门', '单位及部门', '工作单位', '现任职单位'],
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
'毕业院校': ['毕业院校', '毕业学校', '毕业院校:','毕业院校系及专业'],
'专业': ['专业', '专业:', '系及专业', '所学专业'],
}
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
"""
提取 .docx 中的表格结构数据
"""
doc = Document(docx_path)
results = defaultdict(dict)
# 分析每个表格
for table_idx, table in enumerate(doc.tables):
print(f"\n处理表格 {table_idx + 1} ({len(table.rows)}× {len(table.columns)}列)")
# 获取表格结构
table_structure = self._analyze_table_structure(table)
# 提取键值对
kv_pairs = self._extract_from_table_structure(table, table_structure)
# 分类存储
for key, value in kv_pairs:
category = self._categorize_field(key)
results[category][key] = value
# 提取段落中的信息
paragraph_info = self._extract_from_paragraphs(doc.paragraphs)
for key, value in paragraph_info:
category = self._categorize_field(key)
results[category][key] = value
return dict(results)
def _analyze_table_structure(self, table) -> List[List[Dict]]:
"""
分析表格结构,返回每个单元格的元信息
"""
structure = []
for row_idx, row in enumerate(table.rows):
row_info = []
for col_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
# 分析单元格属性
cell_info = {
'text': cell_text,
'row': row_idx,
'col': col_idx,
'rowspan': 1,
'colspan': 1,
'is_key': self._is_likely_key(cell_text),
'is_value': self._is_likely_value(cell_text),
}
row_info.append(cell_info)
structure.append(row_info)
return structure
def _extract_from_table_structure(self, table, structure) -> List[Tuple[str, str]]:
"""
从表格结构中提取键值对
"""
kv_pairs = []
visited = set()
key_recode = []
for row_idx, row in enumerate(structure):
for col_idx, cell in enumerate(row):
print(f"visited is {visited} ")
print(f'row {row_idx} col {col_idx} all cell is {cell}')
if (row_idx, col_idx) in visited:
print(f'---{row_idx}, {col_idx} ')
print(f'cell is {cell}')
continue
if cell['is_key']:
next_cell = structure[row_idx][col_idx+1]
# 寻找对应的值
print(f"cell2 is {cell} row {row_idx} col {col_idx}")
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
if value:
key = self._normalize_key(cell['text'])
found = False
kv_pairs = [(k,v+","+value)if k == cell['text'] else (k, v) for k,v in kv_pairs ]
for i, (k,v) in enumerate(kv_pairs):
if k == cell['text']:
kv_pairs[i] = (k,value)
found = True
if not found:
kv_pairs.append((key, value))
else:
print("不是key")
return kv_pairs
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
"""
为键找到对应的值
"""
# 尝试右侧单元格
if key_col + 1 < len(structure[key_row]):
value_cell = structure[key_row][key_col + 1]
current_key_cell = structure[key_row][key_col]
if value_cell['is_key']:
return None
# 特殊处理学历
spec_coll = ['全日制教育','在职教育']
if current_key_cell['text'].replace('\n','') in spec_coll :
if not value_cell['text']:
value_cell['text'] = 'False'
else:
value_cell['text'] = 'True'
if value_cell['text'] and (key_row, key_col + 1) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("前一个不重复")
print(f"visited add {key_row} {key_col + 1}")
visited.add((key_row, key_col + 1))
return value_cell['text']
else:
current_key = structure[key_row][key_col]['text']
print(f"key值重复------------------------------key {current_key}")
for key, value in kv_pairs:
if key == current_key:
return value+","+value_cell['text']
# 尝试下方单元格
if key_row + 1 < len(structure):
value_cell = structure[key_row + 1][key_col]
if value_cell['text'] and (key_row + 1, key_col) not in visited:
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("下一个不重复")
print(f"visited add {key_row} {key_col + 1}")
visited.add((key_row + 1, key_col))
return value_cell['text']
# 尝试合并单元格的情况
for row_idx in range(len(structure)):
for col_idx in range(len(structure[row_idx])):
cell = structure[row_idx][col_idx]
if (row_idx, col_idx) not in visited and cell['text']:
# 检查是否在键的附近
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
# 检查这个值是否与前一个键提取的值相同
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
print("合并不重复")
print(f"visited add {key_row} {key_col + 1}")
visited.add((row_idx, col_idx))
return cell['text']
return None
def _is_key_duplicate_merged_cell(self, text, kv_pairs) -> bool:
"""
检查当前文本value是否可能和已收录的kv集合里的key值重复
如下例1行0列 2行0列 都是毕业院校
第一次 1行0列:1行2列组成key:value
第二次到2行0列检测到 毕业院校已出现在kv_pairs中不再组合2行0列:2行1列
| 硕士学位/研究生学历:中国科学院计算技术研究所计算机技术专业
毕业院校 |——————————————————————————————————————————————————
|
|————————————————————————————————————————————————————
"""
for k, v in kv_pairs:
if text == k:
return True
return False
def extract_parentheses_content(self, text):
# 使用正则表达式提取括号内的所有内容
matches = re.findall(r'[(]([^)]*)[)]', text)
return matches # 返回列表,可能包含多个括号
def _is_likely_key(self, text: str) -> bool:
"""判断文本是否可能是键"""
if not text or len(text) > 20:
return False
# 检查是否包含常见字段词
key_indicators = ['籍贯','籍 贯','政治面貌','政治\n面貌','姓名','性别','姓 名', '性 别', '出生年月', '民族','民 族', '单位', '部门','联系地址','主要学习经历','全日制教育','在职教育',
'职务','职 务','\n', '职称','职 称', '电话', '地址', '学历', '学位','现任职务','职业资格','奖惩情况(近三年主要奖惩信息)'
'专业', '岗位', '经历', '时间', '资格','现任职单位及部门','身份证号','婚姻状况','健康状况','应聘岗位','应聘部门/岗位','毕业院校系及专业']
for indicator in key_indicators:
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
indicator = indicator.translate(translation_table)
text = text.translate(translation_table)
if indicator in text:
return True
# 检查是否有冒号(中文文档常用)
if '' in text or ':' in text:
key_part = text.split('')[0].split(':')[0]
if any(indicator in key_part for indicator in key_indicators):
return True
for indicator in key_indicators:
print("indicator is ===============================", indicator)
print("text is ===============================", text)
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
indicator = indicator.translate(translation_table)
text = text.translate(translation_table)
clean_text = self.extract_parentheses_content(text)
print(text)
clean_indicator = self.extract_parentheses_content(indicator)
print(indicator)
if not clean_text:
print("特殊匹配失败")
return False
if clean_indicator:
print("开始匹配=========")
clean_text = clean_text[0]
clean_indicator = clean_indicator[0]
if clean_indicator in clean_text:
print(f"特殊情况匹配成功======={text}")
return True
else:
print("继续匹配")
continue
return False
def _is_likely_value(self, text: str) -> bool:
"""判断文本是否可能是值"""
if not text:
return False
# 值通常不是常见的字段名
if self._is_likely_key(text):
return False
# 值可能包含特定内容
if re.match(r'^\d{11}$', text): # 手机号
return True
if re.match(r'^\d{4}', text): # 日期
return True
if len(text) > 10: # 长文本可能是值
return True
return True
def _normalize_key(self, key_text: str) -> str:
"""标准化键名"""
# 移除冒号和空格
key_text = re.sub(r'[:\s]+$', '', key_text)
# 映射到标准键名
for std_key, variants in self.field_variants.items():
for variant in variants:
if variant == key_text or key_text in variant:
return std_key
return key_text
def _categorize_field(self, key: str) -> str:
"""将字段分类"""
categories = {
'基本信息': ['姓名', '性别', '出生年月', '民族', '政治面貌','学历学位','毕业院校系及专业','全日制教育','在职教育'
'婚姻状况', '健康状况', '籍贯', '身份证号','联系电话','婚姻状况','健康状况','身份证号','联系电话(手机)','毕业院校系及专业','联系地址','主要学习经历','奖惩情况(近三年主要奖惩信息)'],
'工作信息': ['现任职单位及部门', '现任职务', '职称', '职业资格',
'参加工作时间', '职称取得时间','应聘部门/岗位','是否接受调剂职级/岗位','奖惩情况(近三年主要奖惩信息)'],
}
for category, fields in categories.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
key = key.translate(translation_table)
if key in fields:
# print(f"filed is {fields} key is {key} ")
return category
return '其他信息'
def _extract_from_paragraphs(self, paragraphs) -> List[Tuple[str, str]]:
"""从段落中提取信息"""
kv_pairs = []
for para in paragraphs:
text = para.text.strip()
if not text:
continue
# 尝试提取冒号分隔的键值对
if '' in text or ':' in text:
separator = '' if '' in text else ':'
parts = text.split(separator, 1)
if len(parts) == 2:
key = parts[0].strip()
value = parts[1].strip()
if self._is_likely_key(key) and value:
normalized_key = self._normalize_key(key)
kv_pairs.append((normalized_key, value))
return kv_pairs
# 快速使用示例
def quick_extract(docx_path: str):
"""快速提取并显示结果"""
extractor = EnhancedDocxExtractor()
try:
result = extractor.extract_with_table_structure(docx_path)
print("\n提取结果 (键值对格式):")
print("=" * 60)
for category, fields in result.items():
if fields:
print(f"\n{category}:")
for key, value in fields.items():
print(f" {key}: {value}")
return result
except Exception as e:
print(f"提取失败: {e}")
base_map = ['姓名','性别','籍贯','政治面貌','出生年月','身份证号','现居住地','民族','学历','学位','学历学位','特长','联系电话','联系电话(手机)',
'婚姻状况','健康状况','毕业院校系及专业','主要学习经历','联系地址','入党/团时间','全日制教育','在职教育','奖惩情况(近三年主要奖惩信息)']
work_map = ['参加工作时间','现任职单位及部门','职务','现任职务','职称','奖惩','工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位']
other_map = ['工作经历','主要工作经历','职称取得时间','职业资格','应聘部门/岗位','是否接受调剂职级/岗位']
def fetch_info(data):
map_word = base_map + work_map + other_map
print("data is {0}".format(data))
print("map_word is {0}".format(map_word))
final_res = {}
for key, value in data.items():
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
clean_key = key.translate(translation_table)
print(f"key is {clean_key} ")
if clean_key in map_word:
# clean_value = value.translate(translation_table)
final_res[clean_key] = value
return final_res
def extra_resume(file_path):
result = quick_extract(file_path)
print(result)
base_data = result['基本信息']
work_data = result['工作信息']
other_data = result['其他信息']
data = {}
data.update(base_data)
data.update(work_data)
data.update(other_data)
res = fetch_info(data)
return res
# if __name__ == "__main__":
# # 使用方法
# docx_file = "../1.报名登记表.docx" # 替换为你的文件
# print(extra_resume(docx_file))