463 lines
19 KiB
Python
463 lines
19 KiB
Python
import os.path
|
||
import re
|
||
import json
|
||
from pathlib import Path
|
||
import pypandoc
|
||
|
||
from docx import Document
|
||
from typing import Dict, List, Any, Tuple
|
||
from collections import defaultdict
|
||
import logging
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class EnhancedDocxExtractor:
|
||
def __init__(self):
|
||
# 定义字段名称的多种变体
|
||
self.field_variants = {
|
||
'姓名': ['姓名', '姓 名', '姓 名', '姓名:', '姓 名:','姓 名'],
|
||
'性别': ['性别', '性 别', '性 别', '性别:', '性 别:','性 别'],
|
||
'出生年月': ['出生年月', '出生年月:', '出生日期', '出生日期:'],
|
||
'民族': ['民族', '民族:', '民 族'],
|
||
'政治面貌': ['政治面貌', '政治面貌:', '政治面貌:'],
|
||
'现任职单位及部门': ['现任职单位及部门', '单位及部门', '工作单位', '现任职单位'],
|
||
'联系电话': ['联系电话', '电话', '手机', '联系电话:', '手机号'],
|
||
'联系地址': ['联系地址', '地址', '联系地址:', '家庭地址'],
|
||
'学历学位': ['学历', '学历:', '学 历', '学历\n学位','学位','学位:','学 位'],
|
||
'毕业院校系及专业': ['毕业院校', '毕业学校', '毕业院校:'],
|
||
'专业': ['专业', '专业:', '系及专业', '所学专业'],
|
||
}
|
||
|
||
def convert_doc_to_docx(self, file_path: str) -> Document:
|
||
pass
|
||
|
||
|
||
|
||
|
||
|
||
|
||
def extract_with_table_structure(self, docx_path: str) -> Dict[str, Any]:
|
||
"""
|
||
提取 .docx 中的表格结构数据
|
||
"""
|
||
logger.info(f"into extract_with_table_structure")
|
||
doc = Document(docx_path)
|
||
results = defaultdict(dict)
|
||
# 分析每个表格
|
||
for table_idx, table in enumerate(doc.tables):
|
||
logger.info(f"\n处理表格 {table_idx + 1} ({len(table.rows)}行 × {len(table.columns)}列)")
|
||
|
||
# 获取表格结构
|
||
table_structure = self._analyze_table_structure(table)
|
||
# 提取键值对
|
||
kv_pairs = self._extract_from_table_structure(table, table_structure)
|
||
# 分类存储
|
||
for key, value in kv_pairs:
|
||
category = self._categorize_field(key)
|
||
results[category][key] = value
|
||
# 提取段落中的信息
|
||
paragraph_info = self._extract_from_paragraphs(doc.paragraphs)
|
||
for key, value in paragraph_info:
|
||
category = self._categorize_field(key)
|
||
results[category][key] = value
|
||
|
||
return dict(results)
|
||
|
||
def _analyze_table_structure(self, table) -> List[List[Dict]]:
|
||
"""
|
||
分析表格结构,返回每个单元格的元信息
|
||
"""
|
||
structure = []
|
||
|
||
for row_idx, row in enumerate(table.rows):
|
||
row_info = []
|
||
for col_idx, cell in enumerate(row.cells):
|
||
cell_text = cell.text.strip()
|
||
# 分析单元格属性
|
||
cell_info = {
|
||
'text': cell_text,
|
||
'row': row_idx,
|
||
'col': col_idx,
|
||
'rowspan': 1,
|
||
'colspan': 1,
|
||
'is_key': self._is_likely_key(cell_text),
|
||
'is_value': self._is_likely_value(cell_text),
|
||
}
|
||
row_info.append(cell_info)
|
||
structure.append(row_info)
|
||
|
||
return structure
|
||
|
||
def _extract_from_table_structure(self, table, structure) -> List[Tuple[str, str]]:
|
||
"""
|
||
从表格结构中提取键值对
|
||
"""
|
||
kv_pairs = []
|
||
visited = set()
|
||
key_recode = []
|
||
for row_idx, row in enumerate(structure):
|
||
for col_idx, cell in enumerate(row):
|
||
logger.info(f"visited is {visited} ")
|
||
logger.info(f'row {row_idx} col {col_idx} all cell is {cell}')
|
||
if (row_idx, col_idx) in visited:
|
||
logger.info(f'---{row_idx}, {col_idx} in visited ')
|
||
continue
|
||
|
||
if cell['is_key']:
|
||
# next_cell = structure[row_idx][col_idx+1]
|
||
# 寻找对应的值
|
||
logger.info(f"cell2 is {cell} row {row_idx} col {col_idx}")
|
||
value = self._find_value_for_key(table, structure, row_idx, col_idx, visited, kv_pairs)
|
||
if value:
|
||
key = self._normalize_key(cell['text'])
|
||
found = False
|
||
for i, (k,v) in enumerate(kv_pairs):
|
||
if k == cell['text']:
|
||
kv_pairs[i] = (k,value)
|
||
found = True
|
||
if not found:
|
||
kv_pairs.append((key, value))
|
||
|
||
else:
|
||
logger.info("不是key")
|
||
return kv_pairs
|
||
|
||
def _find_value_for_key(self, table, structure, key_row, key_col, visited, kv_pairs) -> str:
|
||
"""
|
||
为键找到对应的值
|
||
"""
|
||
logger.info(f"======================kv==================={kv_pairs}")
|
||
# 尝试右侧单元格
|
||
if key_col + 1 < len(structure[key_row]):
|
||
|
||
value_cell = structure[key_row][key_col + 1]
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
current_key_cell = structure[key_row][key_col]
|
||
if not value_cell['text']:
|
||
value_cell['text'] = "None"
|
||
current_key_text = current_key_cell['text'].translate(translation_table)
|
||
if value_cell['is_key']:
|
||
return None
|
||
# 特殊处理学历
|
||
|
||
spec_cell_meb = ['称谓', '姓名', '年龄', '政治面貌']
|
||
if current_key_text == "家庭主要成员及重要社会关系":
|
||
logger.info("特殊处理家庭主要成员及重要社会关系")
|
||
values = []
|
||
old_value = None
|
||
for k, v in kv_pairs:
|
||
if k == current_key_text:
|
||
old_value = v
|
||
logger.info(f"old_value is {old_value}")
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
v =value_cell['text'].translate(translation_table)
|
||
logger.info(f"当前值为 {str(v)}")
|
||
|
||
if v not in spec_cell_meb:
|
||
for i in range(key_col+1,len(structure[key_row])):
|
||
col_value = structure[key_row][i]['text']
|
||
logger.info(f"col_value is {col_value}")
|
||
if col_value not in values and col_value != 'None':
|
||
values.append(col_value)
|
||
visited.add((key_row, i))
|
||
vs = "_".join(values)
|
||
if old_value :
|
||
if vs:
|
||
value_cell['text'] = old_value + "|" + vs
|
||
else:
|
||
value_cell['text'] = old_value
|
||
else:
|
||
value_cell['text'] = "_".join(values)
|
||
|
||
return value_cell['text']
|
||
logger.info(f"in {spec_cell_meb }")
|
||
return None
|
||
|
||
if value_cell['text'] and (key_row, key_col + 1) not in visited:
|
||
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
|
||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||
logger.info("前一个不重复")
|
||
logger.info(f"visited add {key_row} {key_col + 1}")
|
||
visited.add((key_row, key_col + 1))
|
||
return value_cell['text']
|
||
else:
|
||
# current_key = structure[key_row][key_col]['text']
|
||
logger.info(f"key值重复------------------------------key {current_key_text}")
|
||
for key, value in kv_pairs:
|
||
if key == current_key_text:
|
||
# value_arr = value.strip(',')
|
||
if value_cell['text'] in value:
|
||
return value
|
||
return value+","+value_cell['text']
|
||
|
||
|
||
# 尝试下方单元格
|
||
if key_row + 1 < len(structure):
|
||
value_cell = structure[key_row + 1][key_col]
|
||
if value_cell['text'] and (key_row + 1, key_col) not in visited:
|
||
# 检查这个值是否与前一个键提取的值相同(可能是合并单元格)
|
||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||
logger.info("下一个不重复")
|
||
logger.info(f"visited add {key_row} {key_col + 1}")
|
||
visited.add((key_row + 1, key_col))
|
||
return value_cell['text']
|
||
|
||
# 尝试合并单元格的情况
|
||
for row_idx in range(len(structure)):
|
||
for col_idx in range(len(structure[row_idx])):
|
||
cell = structure[row_idx][col_idx]
|
||
if (row_idx, col_idx) not in visited and cell['text']:
|
||
# 检查是否在键的附近
|
||
if abs(row_idx - key_row) <= 2 and abs(col_idx - key_col) <= 2:
|
||
# 检查这个值是否与前一个键提取的值相同
|
||
if not self._is_key_duplicate_merged_cell(structure[key_row][key_col]['text'], kv_pairs):
|
||
logger.info("合并不重复")
|
||
logger.info(f"visited add {key_row} {key_col + 1}")
|
||
visited.add((row_idx, col_idx))
|
||
return cell['text']
|
||
return None
|
||
|
||
|
||
def _is_key_duplicate_merged_cell(self, text, kv_pairs) -> bool:
|
||
"""
|
||
检查当前文本value是否可能和已收录的kv集合里的key值重复
|
||
如下例:1行0列 ,2行0列 都是毕业院校
|
||
第一次 1行0列:1行2列组成key:value
|
||
第二次到2行0列,检测到 毕业院校已出现在kv_pairs中,不再组合2行0列:2行1列
|
||
| 硕士学位/研究生学历:中国科学院计算技术研究所计算机技术专业
|
||
毕业院校 |——————————————————————————————————————————————————
|
||
|
|
||
|————————————————————————————————————————————————————
|
||
"""
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
text = text.translate(translation_table)
|
||
for k, v in kv_pairs:
|
||
if text == k:
|
||
return True
|
||
|
||
return False
|
||
|
||
def extract_parentheses_content(self, text):
|
||
# 使用正则表达式提取括号内的所有内容
|
||
matches = re.findall(r'[((]([^))]*)[))]', text)
|
||
|
||
return matches # 返回列表,可能包含多个括号
|
||
|
||
def _is_likely_key(self, text: str) -> bool:
|
||
"""判断文本是否可能是键"""
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
text = text.translate(translation_table)
|
||
if not text or len(text) > 20:
|
||
return False
|
||
|
||
# 检查是否包含常见字段词
|
||
key_indicators = ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业','称谓',
|
||
'年龄','政治面貌','工作单位及职务','参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间',
|
||
'年核度结考果','简历','奖惩情况', '年度考核结果', '任免理由', '家庭主要成员及重要社会关系']
|
||
|
||
for indicator in key_indicators:
|
||
if indicator in text:
|
||
return True
|
||
|
||
# 检查是否有冒号(中文文档常用)
|
||
if ':' in text or ':' in text:
|
||
key_part = text.split(':')[0].split(':')[0]
|
||
if any(indicator in key_part for indicator in key_indicators):
|
||
return True
|
||
|
||
return False
|
||
|
||
def _is_likely_value(self, text: str) -> bool:
|
||
"""判断文本是否可能是值"""
|
||
if not text:
|
||
return False
|
||
|
||
# 值通常不是常见的字段名
|
||
if self._is_likely_key(text):
|
||
return False
|
||
|
||
# 值可能包含特定内容
|
||
if re.match(r'^\d{11}$', text): # 手机号
|
||
return True
|
||
if re.match(r'^\d{4}年', text): # 日期
|
||
return True
|
||
if len(text) > 10: # 长文本可能是值
|
||
return True
|
||
|
||
return True
|
||
|
||
def _normalize_key(self, key_text: str) -> str:
|
||
"""标准化键名"""
|
||
# 移除冒号和空格
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
key_text = key_text.translate(translation_table)
|
||
key_text = re.sub(r'[::\s]+$', '', key_text)
|
||
# 映射到标准键名
|
||
for std_key, variants in self.field_variants.items():
|
||
for variant in variants:
|
||
if variant == key_text or key_text in variant:
|
||
return std_key
|
||
|
||
return key_text
|
||
|
||
def _categorize_field(self, key: str) -> str:
|
||
"""将字段分类"""
|
||
|
||
categories = {
|
||
'基本信息': ['姓名', '性别', '出生年月(岁)', '民族', '籍贯', '出生地', '入党时间', '健康状况', '全日制教育','在职教育', '毕业院校系及专业'],
|
||
'工作信息': ['参加工作时间', '专业技术职务', '熟悉专业有何专长', '现任职务', '拟任职务', '拟免职务','职称取得时间', '年核度结考果'],
|
||
}
|
||
|
||
for category, fields in categories.items():
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
key = key.translate(translation_table)
|
||
if key in fields:
|
||
return category
|
||
|
||
return '其他信息'
|
||
|
||
def _extract_from_paragraphs(self, paragraphs) -> List[Tuple[str, str]]:
|
||
"""从段落中提取信息"""
|
||
kv_pairs = []
|
||
|
||
for para in paragraphs:
|
||
text = para.text.strip()
|
||
if not text:
|
||
continue
|
||
|
||
# 尝试提取冒号分隔的键值对
|
||
if ':' in text or ':' in text:
|
||
separator = ':' if ':' in text else ':'
|
||
parts = text.split(separator, 1)
|
||
|
||
if len(parts) == 2:
|
||
key = parts[0].strip()
|
||
value = parts[1].strip()
|
||
|
||
if self._is_likely_key(key) and value:
|
||
normalized_key = self._normalize_key(key)
|
||
kv_pairs.append((normalized_key, value))
|
||
|
||
return kv_pairs
|
||
|
||
|
||
# 快速使用示例
|
||
def quick_extract(docx_path: str):
|
||
"""快速提取并显示结果"""
|
||
logger.info('into quick_extract')
|
||
extractor = EnhancedDocxExtractor()
|
||
try:
|
||
result = extractor.extract_with_table_structure(docx_path)
|
||
logger.info("\n提取结果 (键值对格式):")
|
||
logger.info("=" * 60)
|
||
|
||
for category, fields in result.items():
|
||
if fields:
|
||
logger.info(f"\n{category}:")
|
||
for key, value in fields.items():
|
||
logger.info(f" {key}: {value}")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.info(f"提取失败: {e}")
|
||
|
||
|
||
def is_valid_year_month(year, month):
|
||
"""验证年月有效性"""
|
||
if len(year) != 4:
|
||
return False
|
||
|
||
try:
|
||
month_int = int(month)
|
||
return 1 <= month_int <= 12
|
||
except ValueError:
|
||
return False
|
||
def get_year_month(text):
|
||
"""
|
||
最终版本:覆盖所有情况
|
||
直接匹配并提取年月
|
||
"""
|
||
# 统一正则表达式,一步到位
|
||
pattern = r'''
|
||
(\d{4}) # 年:4位数字
|
||
[-./年] # 分隔符
|
||
(\d{1,2}) # 月:1-2位数字
|
||
(?: # 非捕获组:日的部分(可选)
|
||
[-./月] # 分隔符
|
||
\d{1,2} # 日
|
||
(?:[日号]?|[-./]?) # 可选的"日"或"号"
|
||
)? # 整个日的部分是可选的
|
||
(?!\d) # 后面不能紧跟数字(避免匹配到年龄)
|
||
'''
|
||
|
||
match = re.search(pattern, text, re.VERBOSE | re.IGNORECASE)
|
||
|
||
if match:
|
||
year, month = match.groups()[:2]
|
||
month = month.zfill(2)
|
||
|
||
if is_valid_year_month(year, month):
|
||
return f"{year}-{month}"
|
||
|
||
return None
|
||
|
||
base_map = ['姓名','性别','出生年月(岁)','民族','籍贯','出生地','入党时间','健康状况','全日制教育','在职教育','毕业院校系及专业']
|
||
work_map = ['参加工作时间','专业技术职务','熟悉专业有何专长','现任职务','拟任职务','拟免职务','职称取得时间','年核度结考果']
|
||
other_map = ['简历','奖惩情况','年度考核结果','任免理由','家庭主要成员及重要社会关系']
|
||
|
||
|
||
|
||
def fetch_info(data):
|
||
map_word = base_map + work_map + other_map
|
||
logger.info("data is {0}".format(data))
|
||
logger.info("map_word is {0}".format(map_word))
|
||
final_res = {}
|
||
for key, value in data.items():
|
||
translation_table = str.maketrans('', '', ' \t\n\r\f\v')
|
||
clean_key = key.translate(translation_table)
|
||
logger.info(f"key is {clean_key} ")
|
||
if clean_key in map_word:
|
||
# clean_value = value.translate(translation_table)
|
||
# 特殊处理家庭成员
|
||
if clean_key == "家庭主要成员及重要社会关系":
|
||
value_arr = value.split('|')
|
||
final_value = []
|
||
if value_arr:
|
||
for value in value_arr:
|
||
v = value.split('_')
|
||
# v = ",".join(v)
|
||
final_value.append(v)
|
||
value = final_value
|
||
# 特殊处理年月
|
||
if clean_key == "出生年月(岁)":
|
||
value = get_year_month(value)
|
||
final_res[clean_key] = value
|
||
|
||
|
||
return final_res
|
||
|
||
|
||
def extra_resume(file_path):
|
||
logger.info(f"Start to quick extract {file_path}")
|
||
result = quick_extract(file_path)
|
||
logger.info(f"result isv ------------------------------{result}")
|
||
base_data = result['基本信息']
|
||
work_data = result['工作信息']
|
||
other_data = result['其他信息']
|
||
data = {}
|
||
data.update(base_data)
|
||
data.update(work_data)
|
||
data.update(other_data)
|
||
res = fetch_info(data)
|
||
return res
|
||
|
||
|
||
# if __name__ == "__main__":
|
||
# logger = logging.getLogger(__name__)
|
||
# # 使用方法
|
||
# docx_file = "../任免审批表数信中心--谭杰明.docx" # 替换为你的文件
|
||
# print(extra_resume(docx_file))
|
||
|
||
|