zbparse/flask_app/货物标/投标人须知正文条款提取成json文件货物标版.py
2024-09-30 16:23:39 +08:00

225 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import docx
import re
import os
from PyPDF2 import PdfReader
from flask_app.main.截取pdf import clean_page_content,extract_common_header
def extract_text_from_docx(file_path):
doc = docx.Document(file_path)
return '\n'.join([para.text for para in doc.paragraphs])
def extract_text_from_pdf(file_path, start_word, end_pattern):
# 从PDF文件中提取文本
common_header = extract_common_header(file_path)
pdf_document = PdfReader(file_path)
all_pages_text = []
start_index = None
# 处理所有页面
for i, page in enumerate(pdf_document.pages):
page_text = page.extract_text() if page.extract_text() else ""
cleaned_text = clean_page_content(page_text, common_header)
# 在第一页查找开始位置
if i == 0 and start_index is None:
start_match = re.search(start_word, cleaned_text, re.MULTILINE)
if start_match:
start_index = start_match.start()
cleaned_text = cleaned_text[start_index:]
# 在最后一页查找结束位置
if i == len(pdf_document.pages) - 1:
for pattern in end_pattern:
matches = list(re.finditer(pattern, cleaned_text, re.MULTILINE))
if matches:
end_index = matches[-1].start()
cleaned_text = cleaned_text[:end_index]
break
all_pages_text.append(cleaned_text)
# 合并所有页面的文本
full_text = "\n".join(all_pages_text)
# print(full_text)
return full_text
def compare_headings(current, new):
# 使用过滤来确保只处理非空且为数字的部分
current_nums = [int(num) for num in current.split('.') if num.isdigit()]
new_nums = [int(num) for num in new.split('.') if num.isdigit()]
# 比较数字序列以确定标题的层次关系
for c, n in zip(current_nums, new_nums):
if n > c:
return True
elif n < c:
return False
# 如果新标题有更多层次,认为是新的子章节
return len(new_nums) > len(current_nums)
def should_add_newline(content, keywords, max_length=20):
content_str = ''.join(content).strip()
return any(keyword in content_str for keyword in keywords) or len(content_str) <= max_length
def handle_content_append(current_content, line_content, append_newline, keywords):
if append_newline:
if should_add_newline(current_content, keywords):
current_content.append('\n') # 添加换行符
append_newline = False
current_content.append(line_content)
return append_newline
"""
保存换行符的具体逻辑:
对于二级标题(如 1.1),如果其后的内容包含关键词或内容较短(<=20字符会在内容前添加一个换行符。
这个换行符会被保留在 current_content 列表中。
当处理下一个标题时,之前的内容(包括可能存在的换行符)会被合并并保存到 data 字典中。
"""
#提取json主函数
def parse_text_by_heading(text):
keywords = ['包含', '以下']
data = {}
current_key = None
current_content = []
append_newline = False
lines = text.split('\n') #['一、说明', '1.适用范围', '招标文件仅适用于第一章“投标邀请书”中所述项目的货物、工程及服务的采购。']
for i, line in enumerate(lines): #由于本身就是按行读取处理,因此保存的时候不带'\n'
line_stripped = line.strip()
print("yes")
print(line_stripped)
# 匹配中文数字标题,包括带括号和不带括号的情况
chinese_match = re.match(r'^(?:\s*[(]?\s*([一二三四五六七八九十]+)\s*[)]?\s*[、]*)?\s*(.+)$', line_stripped)
if chinese_match:
chinese_key, chinese_value = chinese_match.groups()
if chinese_key:
chinese_key = f"{chinese_key}" # 统一格式为"数字、"
data[chinese_key] = chinese_value
current_key = None
current_content = []
continue
# 新增:匹配阿拉伯数字开头后跟顿号的情况,如 "7、竞争性磋商采购文件的修改"
arabic_match = re.match(r'^(\d+、)\s*(.+)$', line_stripped)
if arabic_match:
arabic_key, arabic_value = arabic_match.groups()
arabic_key = arabic_key.replace('', '.') # 将顿号替换为点号
if current_key is not None:
content_string = ''.join(current_content).strip()
data[current_key] = content_string.replace(' ', '')
current_key = arabic_key
current_content = [arabic_value]
append_newline = True
continue
# 匹配形如 '1.1'、'2.2.3' 等至少包含一个点的标题,并确保其前后没有字母或括号
match = re.match(r'^(?<![a-zA-Z(])(\d+(?:\.\d+)+)\s*(.*)', line_stripped)
if not match:
match = re.match(r'^(\d+\.)\s*(.+)$', line_stripped)
if match:
new_key, line_content = match.groups()
line_content = line_content.lstrip('.')
# 检查是否应该更新当前键和内容
if current_key is None or (compare_headings(current_key, new_key) and (
len(current_content) == 0 or current_content[-1][-1] != '')):
if current_key is not None:
# 将之前的内容保存到data中保留第一个换行符后续的换行符转为空字符
# print(current_content)
content_string = ''.join(current_content).strip()
# print(content_string)
data[current_key] = content_string.replace(' ', '')
current_key = new_key
current_content = [line_content]
# 当标题级别为一级1.)和两级(如 1.1)时标题,才设置 append_newline 为 True
append_newline = len(new_key.rstrip('.').split('.')) <= 2
else:
append_newline = handle_content_append(current_content, line_content, append_newline, keywords)
else:
if line_stripped:
append_newline = handle_content_append(current_content, line_stripped, append_newline, keywords)
if current_key is not None:
# 保存最后一部分内容
content_string = ''.join(current_content).strip()
data[current_key] = content_string.replace(' ', '')
return data
# def convert_to_json(file_path, start_word, end_phrases):
# if file_path.endswith('.docx'):
# text = extract_text_from_docx(file_path)
# elif file_path.endswith('.pdf'):
# text = extract_text_from_pdf(file_path,start_word,end_phrases)
# else:
# raise ValueError("Unsupported file format")
# # print(text)
# parsed_data = parse_text_by_heading(text)
# return parsed_data
def convert_clause_to_json(file_path,output_folder,type=1):
if not os.path.exists(file_path):
print(f"The specified file does not exist: {file_path}")
return ""
if type == 1:
start_word = r'^\s*[(]?\s*[一1]\s*[)]?\s*[、.]*\s*(说\s*明|总\s*则)'
end_pattern = [r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+']
else:
start_word = r'第[一二三四五六七八九十]+章\s*招标公告|第一卷|招标编号:|招标编号:'
end_pattern=[r'第[一二三四五六七八九十]+章\s*投标人须知',r'投标人须知前附表']
text = extract_text_from_pdf(file_path, start_word, end_pattern)
result = parse_text_by_heading(text)
# result = convert_to_json(input_path, start_word, end_pattern)
# 检查输出文件夹是否存在,如果不存在则创建
if not os.path.exists(output_folder):
os.makedirs(output_folder)
print(f"Created output folder: {output_folder}")
file_name = "clause1.json" if type == 1 else "clause2.json"
# file_name = f"clause{suffix_counter}.json"
output_path = os.path.join(output_folder, file_name)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print(f"投标人须知正文条款提取成json文件: The data has been processed and saved to '{output_path}'.")
return output_path
def process_folder(input_folder, output_folder):
# 获取输入文件夹中的所有文件,过滤掉不以 'part2' 结尾的文件
files = [f for f in os.listdir(input_folder) if os.path.isfile(os.path.join(input_folder, f)) and f.endswith('part2.pdf')]
# 初始化后缀计数器
suffix_counter = 1
# 遍历文件并对每个文件进行处理
for file_name in files:
file_path = os.path.join(input_folder, file_name)
suffix = str(suffix_counter) # 后缀为递增的数字
try:
# 调用 convert_clause_to_json传递文件路径、输出文件夹和递增的后缀
output_path = convert_clause_to_json(file_path, output_folder, 1, suffix_counter)
print(f"Processed file: {file_name}, JSON saved to: {output_path}")
except ValueError as e:
print(f"Error processing {file_name}: {e}")
# 后缀递增
suffix_counter += 1
if __name__ == "__main__":
# file_path = 'D:\\flask_project\\flask_app\\static\\output\\cfd4959d-5ea9-4112-8b50-9e543803f029\\ztbfile_tobidders_notice.pdf'
file_path='C:\\Users\\Administrator\\Desktop\\货物标\\output4\\ztbfile_tobidders_notice_part2.pdf'
output_folder = 'C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp1'
try:
output_path = convert_clause_to_json(file_path,output_folder)
print(f"Final JSON result saved to: {output_path}")
except ValueError as e:
print("Error:", e)
# input_folder = 'C:\\Users\\Administrator\\Desktop\\货物标\\output4'
# output_folder = 'C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp1'
#
# # 调用 process_folder 来处理整个文件夹中的所有文件
# process_folder(input_folder, output_folder)