zbparse/flask_app/main/table_content_extraction.py

139 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from docx import Document
import json
def read_tables_from_docx(file_path):
"""读取DOCX文件中的表格数据并以嵌套字典的形式返回."""
doc = Document(file_path)
table_list = {}
cur_title = []
header = None
for table in doc.tables:
for i, row in enumerate(table.rows):
cell_texts = [cell.text.strip() for cell in row.cells]
# 检查是否是表头
if header is None:
header = cell_texts
continue # 跳过第一个表头行
# 如果遇到与第一个表头相同的行,跳过
if cell_texts == header:
continue
cur_level = table_list
temp_title = []
for j, cell in enumerate(row.cells):
text_str = cell.text.strip().replace(' ', '').replace('\n', '')
if j < len(row.cells) - 1:
if text_str == "":
text_str = cur_title[j] if j < len(cur_title) else "<未识别到上级标题>"
if text_str not in cur_level:
cur_level[text_str] = {}
cur_level = cur_level[text_str]
temp_title.append(text_str)
else:
cell_text = cell.text.strip().replace(' ', '')
if len(temp_title) > 0:
last_key = temp_title[-1]
if last_key in cur_level:
if isinstance(cur_level[last_key], dict):
cur_level[last_key] = f"\n{cell_text}"
else:
cur_level[last_key] += f"\n{cell_text}"
else:
cur_level[last_key] = cell_text
else:
last_key = f"{i}行内容"
if last_key in cur_level:
if isinstance(cur_level[last_key], dict):
cur_level[last_key] = f"\n{cell_text}"
else:
cur_level[last_key] += f"\n{cell_text}"
else:
cur_level[last_key] = cell_text
cur_title = temp_title[:]
return table_list
def flatten_nested_dicts(d):
"""平坦化嵌套字典以便更简洁地保存为JSON."""
keys_to_remove = []
items_to_add = {}
for key, value in list(d.items()):
if isinstance(value, dict):
value = flatten_nested_dicts(value)
if len(value) == 1 and key in value:
keys_to_remove.append(key)
items_to_add[key] = value[key]
elif len(value) == 1 and list(value.keys())[0] == list(value.values())[0]:
items_to_add[key] = list(value.values())[0]
for key in keys_to_remove:
del d[key]
d.update(items_to_add)
return d
def save_data_to_json(data, output_folder):
filename = "truncate_output.json"
output_filepath = os.path.join(output_folder, filename)
"""将数据保存到JSON文件中."""
with open(output_filepath, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
print(f"table_content_extraction: The data has been processed and saved to '{output_filepath}'.")
return output_filepath
def extract_tables_main(path, output_folder):
if not os.path.exists(path):
print(f"table_content_extraction: The specified file does not exist: {path}")
return ""
# 读取文档表格数据
table_data = read_tables_from_docx(path)
# 平坦化嵌套字典
flattened_data = flatten_nested_dicts(table_data)
# 保存平坦化后的数据到JSON文件
return save_data_to_json(flattened_data, output_folder)
#货物标批量生成truncate_output.json
def process_all_part1_pdfs(folder_path, output_folder):
"""遍历指定文件夹中的所有以part1.docx结尾的文件执行extract_tables_main操作并创建子文件夹."""
if not os.path.exists(folder_path):
print(f"指定的文件夹不存在: {folder_path}")
return
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith("part1.docx"):
file_path = os.path.join(root, file)
print(f"正在处理文件: {file_path}")
# 获取文件名(去除扩展名)
base_filename = os.path.splitext(file)[0]
# 在output_folder中创建对应的子文件夹
subfolder_path = os.path.join(output_folder, base_filename)
os.makedirs(subfolder_path, exist_ok=True)
# 调用extract_tables_main将子文件夹作为output_folder传递
extract_tables_main(file_path, subfolder_path)
if __name__ == "__main__":
path = 'C:\\Users\\Administrator\\Desktop\\货物标\\zbfilesdocx\\2-招标文件.docx'
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfilesdocx\\tmp" # 前附表json文件
res=extract_tables_main(path, output_folder)
#
# folder_path='C:\\Users\\Administrator\\Desktop\\货物标\\output4'
# output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp2"
# process_all_part1_pdfs(folder_path, output_folder)