2024-09-06 17:00:35 +08:00
|
|
|
|
import os
|
|
|
|
|
|
2024-08-29 16:37:09 +08:00
|
|
|
|
from docx import Document
|
|
|
|
|
import json
|
|
|
|
|
|
2024-09-11 12:02:09 +08:00
|
|
|
|
|
2024-09-03 18:04:05 +08:00
|
|
|
|
|
2024-08-29 16:37:09 +08:00
|
|
|
|
def read_tables_from_docx(file_path):
|
|
|
|
|
"""读取DOCX文件中的表格数据,并以嵌套字典的形式返回."""
|
|
|
|
|
doc = Document(file_path)
|
|
|
|
|
table_list = {}
|
|
|
|
|
cur_title = []
|
2024-09-03 18:04:05 +08:00
|
|
|
|
header = None
|
2024-08-29 16:37:09 +08:00
|
|
|
|
|
|
|
|
|
for table in doc.tables:
|
|
|
|
|
for i, row in enumerate(table.rows):
|
2024-09-03 18:04:05 +08:00
|
|
|
|
cell_texts = [cell.text.strip() for cell in row.cells]
|
|
|
|
|
|
|
|
|
|
# 检查是否是表头
|
|
|
|
|
if header is None:
|
|
|
|
|
header = cell_texts
|
|
|
|
|
continue # 跳过第一个表头行
|
|
|
|
|
|
|
|
|
|
# 如果遇到与第一个表头相同的行,跳过
|
|
|
|
|
if cell_texts == header:
|
|
|
|
|
continue
|
|
|
|
|
|
2024-08-29 16:37:09 +08:00
|
|
|
|
cur_level = table_list
|
|
|
|
|
temp_title = []
|
|
|
|
|
for j, cell in enumerate(row.cells):
|
2024-09-03 18:04:05 +08:00
|
|
|
|
text_str = cell.text.strip().replace(' ', '').replace('\n', '')
|
2024-08-29 16:37:09 +08:00
|
|
|
|
if j < len(row.cells) - 1:
|
|
|
|
|
if text_str == "":
|
|
|
|
|
text_str = cur_title[j] if j < len(cur_title) else "<未识别到上级标题>"
|
|
|
|
|
if text_str not in cur_level:
|
|
|
|
|
cur_level[text_str] = {}
|
|
|
|
|
cur_level = cur_level[text_str]
|
|
|
|
|
temp_title.append(text_str)
|
|
|
|
|
else:
|
|
|
|
|
cell_text = cell.text.strip().replace(' ', '')
|
|
|
|
|
if len(temp_title) > 0:
|
|
|
|
|
last_key = temp_title[-1]
|
|
|
|
|
if last_key in cur_level:
|
|
|
|
|
if isinstance(cur_level[last_key], dict):
|
|
|
|
|
cur_level[last_key] = f"\n{cell_text}"
|
|
|
|
|
else:
|
2024-09-03 18:04:05 +08:00
|
|
|
|
cur_level[last_key] += f"\n{cell_text}"
|
2024-08-29 16:37:09 +08:00
|
|
|
|
else:
|
2024-09-03 18:04:05 +08:00
|
|
|
|
cur_level[last_key] = cell_text
|
2024-08-29 16:37:09 +08:00
|
|
|
|
else:
|
|
|
|
|
last_key = f"第{i}行内容"
|
|
|
|
|
if last_key in cur_level:
|
|
|
|
|
if isinstance(cur_level[last_key], dict):
|
|
|
|
|
cur_level[last_key] = f"\n{cell_text}"
|
|
|
|
|
else:
|
2024-09-03 18:04:05 +08:00
|
|
|
|
cur_level[last_key] += f"\n{cell_text}"
|
2024-08-29 16:37:09 +08:00
|
|
|
|
else:
|
|
|
|
|
cur_level[last_key] = cell_text
|
|
|
|
|
cur_title = temp_title[:]
|
|
|
|
|
|
|
|
|
|
return table_list
|
|
|
|
|
|
2024-09-06 17:00:35 +08:00
|
|
|
|
|
2024-08-29 16:37:09 +08:00
|
|
|
|
def flatten_nested_dicts(d):
|
|
|
|
|
"""平坦化嵌套字典,以便更简洁地保存为JSON."""
|
|
|
|
|
keys_to_remove = []
|
|
|
|
|
items_to_add = {}
|
|
|
|
|
|
|
|
|
|
for key, value in list(d.items()):
|
|
|
|
|
if isinstance(value, dict):
|
|
|
|
|
value = flatten_nested_dicts(value)
|
|
|
|
|
if len(value) == 1 and key in value:
|
|
|
|
|
keys_to_remove.append(key)
|
|
|
|
|
items_to_add[key] = value[key]
|
|
|
|
|
elif len(value) == 1 and list(value.keys())[0] == list(value.values())[0]:
|
|
|
|
|
items_to_add[key] = list(value.values())[0]
|
|
|
|
|
|
|
|
|
|
for key in keys_to_remove:
|
|
|
|
|
del d[key]
|
|
|
|
|
d.update(items_to_add)
|
|
|
|
|
|
|
|
|
|
return d
|
|
|
|
|
|
2024-09-06 17:00:35 +08:00
|
|
|
|
|
|
|
|
|
def save_data_to_json(data, output_folder):
|
|
|
|
|
filename = "truncate_output.json"
|
|
|
|
|
output_filepath = os.path.join(output_folder, filename)
|
2024-08-29 16:37:09 +08:00
|
|
|
|
"""将数据保存到JSON文件中."""
|
2024-09-06 17:00:35 +08:00
|
|
|
|
with open(output_filepath, 'w', encoding='utf-8') as file:
|
2024-08-29 16:37:09 +08:00
|
|
|
|
json.dump(data, file, ensure_ascii=False, indent=4)
|
2024-09-13 15:03:55 +08:00
|
|
|
|
print(f"table_content_extraction: The data has been processed and saved to '{output_filepath}'.")
|
2024-09-06 17:00:35 +08:00
|
|
|
|
return output_filepath
|
|
|
|
|
|
2024-08-29 16:37:09 +08:00
|
|
|
|
|
2024-09-06 17:00:35 +08:00
|
|
|
|
def extract_tables_main(path, output_folder):
|
|
|
|
|
if not os.path.exists(path):
|
2024-09-13 15:03:55 +08:00
|
|
|
|
print(f"table_content_extraction: The specified file does not exist: {path}")
|
2024-09-06 17:00:35 +08:00
|
|
|
|
return ""
|
2024-08-29 16:37:09 +08:00
|
|
|
|
# 读取文档表格数据
|
|
|
|
|
table_data = read_tables_from_docx(path)
|
|
|
|
|
|
|
|
|
|
# 平坦化嵌套字典
|
|
|
|
|
flattened_data = flatten_nested_dicts(table_data)
|
|
|
|
|
|
|
|
|
|
# 保存平坦化后的数据到JSON文件
|
2024-09-06 17:00:35 +08:00
|
|
|
|
return save_data_to_json(flattened_data, output_folder)
|
2024-08-29 16:37:09 +08:00
|
|
|
|
|
|
|
|
|
|
2024-10-10 11:52:37 +08:00
|
|
|
|
#货物标批量生成truncate_output.json
|
|
|
|
|
def process_all_part1_pdfs(folder_path, output_folder):
|
|
|
|
|
"""遍历指定文件夹中的所有以part1.docx结尾的文件,执行extract_tables_main操作并创建子文件夹."""
|
|
|
|
|
if not os.path.exists(folder_path):
|
|
|
|
|
print(f"指定的文件夹不存在: {folder_path}")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
for root, dirs, files in os.walk(folder_path):
|
|
|
|
|
for file in files:
|
|
|
|
|
if file.endswith("part1.docx"):
|
|
|
|
|
file_path = os.path.join(root, file)
|
|
|
|
|
print(f"正在处理文件: {file_path}")
|
|
|
|
|
|
|
|
|
|
# 获取文件名(去除扩展名)
|
|
|
|
|
base_filename = os.path.splitext(file)[0]
|
|
|
|
|
|
|
|
|
|
# 在output_folder中创建对应的子文件夹
|
|
|
|
|
subfolder_path = os.path.join(output_folder, base_filename)
|
|
|
|
|
os.makedirs(subfolder_path, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
# 调用extract_tables_main,将子文件夹作为output_folder传递
|
|
|
|
|
extract_tables_main(file_path, subfolder_path)
|
|
|
|
|
|
2024-08-29 16:37:09 +08:00
|
|
|
|
if __name__ == "__main__":
|
2024-10-21 17:31:48 +08:00
|
|
|
|
path = 'C:\\Users\\Administrator\\Desktop\\招标文件\\new_test\\zbtest2_tobidders_notice_table.docx'
|
|
|
|
|
output_folder = "C:\\Users\\Administrator\\Desktop\\招标文件\\new_test" # 前附表json文件
|
2024-10-30 16:56:05 +08:00
|
|
|
|
res=extract_tables_main("", output_folder)
|
|
|
|
|
print(res)
|
2024-10-10 11:52:37 +08:00
|
|
|
|
# folder_path='C:\\Users\\Administrator\\Desktop\\货物标\\output4'
|
|
|
|
|
# output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp2"
|
|
|
|
|
# process_all_part1_pdfs(folder_path, output_folder)
|