zbparse/flask_app/货物标/货物标截取pdf.py
2024-09-27 17:03:46 +08:00

398 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PyPDF2 import PdfReader, PdfWriter
import re # 导入正则表达式库
import os # 用于文件和文件夹操作
from flask_app.main.format_change import docx2pdf
def clean_page_content(text, common_header):
# 首先删除抬头公共部分
if common_header: # 确保有公共抬头才进行替换
for header_line in common_header.split('\n'):
if header_line.strip(): # 只处理非空行
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
return text
def extract_common_header(pdf_path):
pdf_document = PdfReader(pdf_path)
headers = []
total_pages = len(pdf_document.pages)
# 确定要读取的页数和起始页
if total_pages == 2:
pages_to_read = 2
start_page = 0
else:
pages_to_read = 3
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
for i in range(start_page, min(start_page + pages_to_read, total_pages)):
page = pdf_document.pages[i]
text = page.extract_text() or ""
if text:
# 只取每页的前三行
first_lines = text.strip().split('\n')[:3]
headers.append(first_lines)
if len(headers) < 2:
return "" # 如果没有足够的页来比较,返回空字符串
# 寻找每一行中的公共部分
common_headers = []
for lines in zip(*headers):
# 在每一行中寻找公共单词
common_line = set(lines[0].split()).intersection(*[set(line.split()) for line in lines[1:]])
if common_line:
common_headers.append(' '.join(common_line))
return '\n'.join(common_headers)
def is_pdf_or_doc(filename):
# 判断文件是否为PDF或Word文档
return filename.lower().endswith(('.pdf', '.doc', '.docx'))
def convert_to_pdf(file_path):
# 假设 docx2pdf 函数已经被定义,这里仅根据文件扩展名来决定是否需要转换
if file_path.lower().endswith(('.doc', '.docx')):
return docx2pdf(file_path)
return file_path
def judge_file_exist(original_path, new_suffix):
# 提取目录路径和原始文件名
directory = os.path.dirname(original_path)
original_filename = os.path.basename(original_path)
# 替换文件名中的旧后缀为新后缀
# 假设原始文件名格式为 '2-招标文件_qualification.pdf'
# 需要替换 '_qualification' 部分为 '_qualification2'
new_filename = original_filename.replace("_qualification1", f"_{new_suffix}")
new_filename = new_filename.replace("_qualification2", f"_{new_suffix}")
# 生成新的文件路径
new_file_path = os.path.join(directory, new_filename)
# 检查新文件是否存在
if os.path.isfile(new_file_path):
return new_file_path
else:
return None
def merge_pdfs(paths, output_path):
pdf_writer = PdfWriter()
last_page_text = None # 用于存储上一个PDF的最后一页的文本
for path in paths:
pdf_reader = PdfReader(path)
pages = pdf_reader.pages
start_index = 0 # 从第一页开始添加
# 如果这不是第一个文件,并且有上一个文件的最后一页文本
if last_page_text is not None and len(pages) > 0:
current_first_page_text = pages[0].extract_text() if pages[0].extract_text() else ""
# 比较当前文件的第一页和上一个文件的最后一页的文本
if current_first_page_text == last_page_text:
start_index = 1 # 如果相同,跳过当前文件的第一页
# 添加当前PDF的页面到写入器
for page in range(start_index, len(pages)):
pdf_writer.add_page(pages[page])
# 更新last_page_text为当前PDF的最后一页的文本
if len(pages) > 0:
last_page_text = pages[-1].extract_text() if pages[-1].extract_text() else ""
# 写入合并后的PDF到文件
with open(output_path, 'wb') as out:
pdf_writer.write(out)
def merge_and_cleanup(output_pdf_path, suffix_to_merge):
another_file_path = judge_file_exist(output_pdf_path, suffix_to_merge)
if another_file_path:
paths = [output_pdf_path, another_file_path] # 需要合并的 PDF 文件路径
merge_pdfs(paths, output_pdf_path)
os.remove(another_file_path)
print(f"文件 {another_file_path} 已删除。")
def process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
pdf_path = convert_to_pdf(file_path)
result = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result:
if output_suffix == "tobidders_notice":
return result # 返回包含两个路径的元组
elif output_suffix == "qualification1":
merge_and_cleanup(result, "qualification3")
return result
return None
def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
generated_files = []
# 判断输入是目录还是单个文件,并统一处理
if os.path.isdir(input_path):
for file_name in os.listdir(input_path):
file_path = os.path.join(input_path, file_name)
if is_pdf_or_doc(file_path):
result = process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result:
if isinstance(result, tuple):
generated_files.extend(result)
else:
generated_files.append(result)
elif os.path.isfile(input_path) and is_pdf_or_doc(input_path):
result = process_files(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result:
if isinstance(result, tuple):
generated_files.extend(result)
else:
generated_files.append(result)
else:
print("提供的路径既不是文件夹也不是PDF文件。")
return generated_files
def extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None):
start_page = None
end_page = None
for i, page in enumerate(pdf_document.pages):
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
if exclusion_pattern and re.search(exclusion_pattern, cleaned_text):
continue
if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page:
start_page = i
if start_page is not None and re.search(end_pattern, cleaned_text) and i > start_page:
end_page = i
break
return start_page, end_page
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
try:
common_header = extract_common_header(pdf_path)
pdf_document = PdfReader(pdf_path)
exclusion_pattern = None
if output_suffix == "tobidders_notice":
start_page, mid_page, end_page = extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern,
begin_page, common_header)
if start_page is None or mid_page is None or end_page is None:
print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。")
return extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix, common_header)
path1 = save_extracted_pages(pdf_document, start_page, mid_page, pdf_path, output_folder,
"tobidders_notice_part1")
path2 = save_extracted_pages(pdf_document, mid_page, end_page, pdf_path, output_folder,
"tobidders_notice_part2")
return path1, path2
else:
# 原有的处理逻辑保持不变
if output_suffix == "qualification1":
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
start_page, end_page = extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern)
if start_page is None or end_page is None:
print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。")
return extract_pages_twice(pdf_path, output_folder, output_suffix, common_header)
elif output_suffix == "qualification1":
truncate_pdf_main(pdf_path, output_folder, 2, "qualification3")
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
except Exception as e:
print(f"Error processing {pdf_path}: {e}")
return None
def extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern, begin_page, common_header):
start_page = None
mid_page = None
end_page = None
for i, page in enumerate(pdf_document.pages):
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page:
start_page = i
if start_page is not None and mid_page is None and re.search(
r'^\s*[(]?\s*[一1]\s*[)]?\s*[、.]*\s*(说\s*明|总\s*则)', cleaned_text, re.MULTILINE):
mid_page = i
if start_page is not None and mid_page is not None and re.search(end_pattern, cleaned_text) and i > mid_page:
end_page = i
break
return start_page, mid_page, end_page
def get_patterns_for_procurement():
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|'
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:采购|技术标准).*|'
r'^[一二三四五六七八九十百千]+、\s*采购清单', re.MULTILINE)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
return begin_pattern, end_pattern
def get_patterns_for_evaluation_method():
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*', re.MULTILINE)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
return begin_pattern, end_pattern
def get_patterns_for_qualification():
# # 原始匹配逻辑
# begin_pattern_original = re.compile(
# r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE)
# end_pattern_original = re.compile(
# r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
# 新匹配逻辑
begin_pattern_new = re.compile(
r'^资格性检查', re.MULTILINE)
end_pattern_new = re.compile(
r'^附件\s*\d+|^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
return begin_pattern_new, end_pattern_new
def extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix, common_header): #投标人须知前附表/正文二次提取
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知前附表)+'
)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
)
pdf_document = PdfReader(pdf_path)
# 提取第一部分
start_page1, end_page1 = extract_pages_generic(pdf_document, begin_pattern, end_pattern, -1, common_header)
if start_page1 is None or end_page1 is None:
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
return None, None
# 提取第二部分
start_page2 = end_page1 # 第二部分的开始页就是第一部分的结束页
_, end_page2 = extract_pages_generic(pdf_document, end_pattern, end_pattern, start_page2 - 1, common_header)
if end_page2 is None:
print(f"second: {output_suffix} 未找到第二部分的结束页在文件 {pdf_path} 中!")
return None, None
# 保存提取的页面
path1 = save_extracted_pages(pdf_document, start_page1, end_page1, pdf_path, output_folder, "tobidders_notice_part1")
path2 = save_extracted_pages(pdf_document, start_page2, end_page2, pdf_path, output_folder, "tobidders_notice_part2")
return path1, path2
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header):
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
pdf_document = PdfReader(pdf_path)
patterns = None
if output_suffix == "procurement":
patterns = [get_patterns_for_procurement()]
elif output_suffix == "evaluation_method" or output_suffix=="qualification2" or output_suffix=="qualification3":
patterns = [get_patterns_for_evaluation_method()]
elif output_suffix == "qualification1":
patterns = [get_patterns_for_qualification()] # This now returns a tuple of pattern pairs
# Try each set of patterns until a valid range is found
for pattern_pair in patterns:
start_page, end_page = extract_pages_generic(pdf_document, pattern_pair[0], pattern_pair[1], 5, common_header,
exclusion_pattern)
if start_page is not None and end_page is not None:
break
if start_page is None or end_page is None:
if output_suffix == "qualification1":
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
print("third:尝试提取评分办法章节...")
temp=truncate_pdf_main(pdf_path,output_folder,2,"qualification2")
if len(temp) > 0:
return temp[0]
else:
return None
else:
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix):
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
output_doc.add_page(pdf_document.pages[page_num])
with open(output_pdf_path, 'wb') as f:
output_doc.write(f)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
def truncate_pdf_main(input_path, output_folder, selection, output_suffix="default"):
if selection == 1:
# 更新的正则表达式以匹配"第x章"和"第x部分",考虑到可能的空格和其他文字
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|'
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?采购.*'
)
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
)
local_output_suffix = "procurement"
elif selection == 2:
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*'
)
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
)
local_output_suffix = "evaluation_method"
elif selection == 3:
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE
)
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
)
local_output_suffix = "qualification1"
elif selection ==4: #投标人须知前附表和正文
begin_page=1
begin_pattern = re.compile(
r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人须知|磋商须知|供应商须知)+|(?:一\s*、\s*)?(?:投标人须知|磋商须知|供应商须知)前附表)', re.MULTILINE
)
end_pattern=re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
)
local_output_suffix = "tobidders_notice"
else:
print("无效的选择")
return None
# 如果传入的 output_suffix 是 'default',则使用本地生成的 output_suffix
if output_suffix == "default":
output_suffix = local_output_suffix
# 调用相应的处理函数
return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
def truncate_pdf_multiple(input_path, output_folder):
truncate_files = []
for selection in range(1, 5):
files = truncate_pdf_main(input_path, output_folder, selection)
truncate_files.extend(files)
return truncate_files
# TODO:交通智能系统和招标(1)(1)文件有问题
if __name__ == "__main__":
input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles"
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output4"
# truncate_pdf_multiple(input_path,output_folder)
selection = 4 # 例如1 - 商务技术服务要求, 2 - 评标办法, 3 - 资格审查后缀有qualification1和qualification2 4.投标人须知前附表
generated_files = truncate_pdf_main(input_path, output_folder, selection)