zbparse/flask_app/货物标/货物标截取pdf.py
2024-10-16 20:18:55 +08:00

567 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import glob
import fitz
from PyPDF2 import PdfReader, PdfWriter
import re # 导入正则表达式库
import os # 用于文件和文件夹操作
from flask_app.main.format_change import docx2pdf
def clean_page_content(text, common_header):
# 首先删除抬头公共部分
if common_header: # 确保有公共抬头才进行替换
for header_line in common_header.split('\n'):
if header_line.strip(): # 只处理非空行
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
return text
# PYPDF2版本
def extract_common_header(pdf_path):
pdf_document = PdfReader(pdf_path)
headers = []
total_pages = len(pdf_document.pages)
# 确定要读取的页数和起始页
if total_pages == 2:
pages_to_read = 2
start_page = 0
else:
pages_to_read = 3
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
for i in range(start_page, min(start_page + pages_to_read, total_pages)):
page = pdf_document.pages[i]
text = page.extract_text() or ""
if text:
# 只取每页的前三行
first_lines = text.strip().split('\n')[:3]
headers.append(first_lines)
if len(headers) < 2:
return "" # 如果没有足够的页来比较,返回空字符串
# 寻找每一行中的公共部分
common_headers = []
for lines in zip(*headers):
# 在每一行中寻找公共单词
common_line = set(lines[0].split()).intersection(*[set(line.split()) for line in lines[1:]])
if common_line:
common_headers.append(' '.join(common_line))
return '\n'.join(common_headers)
# fitz库版本
# def extract_common_header(pdf_path):
# doc = fitz.open(pdf_path)
# headers = []
# total_pages = len(doc)
#
# if total_pages == 2:
# pages_to_read = 2
# start_page = 0
# else:
# pages_to_read = 3
# middle_page = total_pages // 2
# start_page = max(0, middle_page - 1)
#
# for i in range(start_page, min(start_page + pages_to_read, total_pages)):
# page = doc[i]
# text = page.get_text()
# if text:
# first_lines = text.strip().split('\n')[:3]
# headers.append(first_lines)
#
# doc.close()
#
# if len(headers) < 2:
# return ""
#
# common_headers = []
# for lines in zip(*headers):
# common_line = set(lines[0].split()).intersection(*[set(line.split()) for line in lines[1:]])
# if common_line:
# common_headers.append(' '.join(common_line))
#
# return '\n'.join(common_headers)
def is_pdf_or_doc(filename):
# 判断文件是否为PDF或Word文档
return filename.lower().endswith(('.pdf', '.doc', '.docx'))
def convert_to_pdf(file_path):
# 假设 docx2pdf 函数已经被定义,这里仅根据文件扩展名来决定是否需要转换
if file_path.lower().endswith(('.doc', '.docx')):
return docx2pdf(file_path)
return file_path
def judge_file_exist(original_path, new_suffix):
# 提取目录路径和原始文件名
directory = os.path.dirname(original_path)
original_filename = os.path.basename(original_path)
# 替换文件名中的旧后缀为新后缀
# 假设原始文件名格式为 '2-招标文件_qualification.pdf'
# 需要替换 '_qualification' 部分为 '_qualification2'
new_filename = original_filename.replace("_qualification1", f"_{new_suffix}")
new_filename = new_filename.replace("_qualification2", f"_{new_suffix}")
# 生成新的文件路径
new_file_path = os.path.join(directory, new_filename)
# 检查新文件是否存在
if os.path.isfile(new_file_path):
return new_file_path
else:
return None
#合并PDF
def merge_pdfs(paths, output_path):
pdf_writer = PdfWriter()
last_page_text = None # 用于存储上一个PDF的最后一页的文本
for path in paths:
pdf_reader = PdfReader(path)
pages = pdf_reader.pages
start_index = 0 # 从第一页开始添加
# 如果这不是第一个文件,并且有上一个文件的最后一页文本
if last_page_text is not None and len(pages) > 0:
current_first_page_text = pages[0].extract_text() if pages[0].extract_text() else ""
# 比较当前文件的第一页和上一个文件的最后一页的文本
if current_first_page_text == last_page_text:
start_index = 1 # 如果相同,跳过当前文件的第一页
# 添加当前PDF的页面到写入器
for page in range(start_index, len(pages)):
pdf_writer.add_page(pages[page])
# 更新last_page_text为当前PDF的最后一页的文本
if len(pages) > 0:
last_page_text = pages[-1].extract_text() if pages[-1].extract_text() else ""
# 写入合并后的PDF到文件
with open(output_path, 'wb') as out:
pdf_writer.write(out)
def merge_and_cleanup(output_pdf_path, suffix_to_merge):
another_file_path = judge_file_exist(output_pdf_path, suffix_to_merge)
if another_file_path:
paths = [output_pdf_path, another_file_path] # 需要合并的 PDF 文件路径
merge_pdfs(paths, output_pdf_path)
os.remove(another_file_path)
print(f"文件 {another_file_path} 已删除。")
def process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
pdf_path = convert_to_pdf(file_path)
result = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result:
if output_suffix == "tobidders_notice":
return result # 返回包含两个路径的元组
elif output_suffix == "qualification1":
merge_and_cleanup(result, "qualification3")
return result
return None
def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
generated_files = []
# 判断输入是目录还是单个文件,并统一处理
if os.path.isdir(input_path):
for file_name in os.listdir(input_path):
file_path = os.path.join(input_path, file_name)
if is_pdf_or_doc(file_path):
result = process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result:
if isinstance(result, tuple):
generated_files.extend(result)
else:
generated_files.append(result)
elif os.path.isfile(input_path) and is_pdf_or_doc(input_path):
result = process_files(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result:
if isinstance(result, tuple):
generated_files.extend(result)
else:
generated_files.append(result)
else:
print("提供的路径既不是文件夹也不是PDF文件。")
return generated_files
# 默认逻辑是start_page匹配上就不再设置了一般不匹配上目录的原因是设置了begin_page=5但是匹配'第一章 招标公告'的时候start_page可能会错误匹配到目录。
def extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None,
output_suffix="normal"):
start_page = None
end_page = None
for i, page in enumerate(pdf_document.pages):
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
if output_suffix == "tobidders_notice":
if exclusion_pattern and re.search(exclusion_pattern, cleaned_text) and start_page is not None:
continue
else:
if exclusion_pattern and re.search(exclusion_pattern, cleaned_text):
continue
if output_suffix == "notice":
if re.search(begin_pattern, cleaned_text) and i > begin_page:
start_page = i
else:
if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page:
start_page = i
if start_page is not None and re.search(end_pattern, cleaned_text) and i > start_page:
end_page = i
break
return start_page, end_page
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
try:
common_header = extract_common_header(pdf_path)
pdf_document = PdfReader(pdf_path)
exclusion_pattern = None
if output_suffix == "tobidders_notice":
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
start_page, mid_page, end_page = extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern,
begin_page, common_header,
exclusion_pattern)
if start_page is None or mid_page is None or end_page is None:
print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。")
return extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix, common_header)
path1 = save_extracted_pages(pdf_document, start_page, mid_page, pdf_path, output_folder,
"tobidders_notice_part1")
path2 = save_extracted_pages(pdf_document, mid_page, end_page, pdf_path, output_folder,
"tobidders_notice_part2")
return path1, path2
else:
# 原有的处理逻辑保持不变
if output_suffix == "qualification1":
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
start_page, end_page = extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page,
common_header, exclusion_pattern, output_suffix)
if start_page is None or end_page is None:
print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。")
return extract_pages_twice(pdf_path, output_folder, output_suffix, common_header)
elif output_suffix == "qualification1":
truncate_pdf_main(pdf_path, output_folder, 2, "qualification3")
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
except Exception as e:
print(f"Error processing {pdf_path}: {e}")
return None
def extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern):
start_page = None
mid_page = None
end_page = None
for i, page in enumerate(pdf_document.pages):
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
if exclusion_pattern and re.search(exclusion_pattern, cleaned_text) and mid_page is not None:
continue
if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page:
start_page = i
if start_page is not None and mid_page is None and re.search(
r'^\s*[(]?\s*[一1]\s*[)]?\s*[、..]*\s*(说\s*明|总\s*则)', cleaned_text, re.MULTILINE):
mid_page = i
if start_page is not None and mid_page is not None and re.search(end_pattern, cleaned_text) and i > mid_page:
end_page = i
break
return start_page, mid_page, end_page
def get_patterns_for_procurement():
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|'
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:采购|技术标准).*|'
r'^[一二三四五六七八九十百千]+、\s*采购清单', re.MULTILINE)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
return begin_pattern, end_pattern
def get_patterns_for_evaluation_method():
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*', re.MULTILINE)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
return begin_pattern, end_pattern
def get_patterns_for_qualification():
# # 原始匹配逻辑
# begin_pattern_original = re.compile(
# r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE)
# end_pattern_original = re.compile(
# r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
# 新匹配逻辑
begin_pattern_new = re.compile(
r'^资格性检查', re.MULTILINE)
end_pattern_new = re.compile(
r'^附件\s*\d+|^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
return begin_pattern_new, end_pattern_new
def get_patterns_for_notice():
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书).*', re.MULTILINE
)
end_pattern = re.compile(
r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人须知|磋商须知|供应商须知)+|(?:一\s*、\s*)?(?:投标人须知|磋商须知|供应商须知)前附表)',
re.MULTILINE
)
return begin_pattern, end_pattern
def extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix, common_header): # 投标人须知前附表/正文二次提取
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知前附表)+'
)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
)
pdf_document = PdfReader(pdf_path)
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
# 提取第一部分
start_page1, end_page1 = extract_pages_generic(pdf_document, begin_pattern, end_pattern, -1, common_header)
if start_page1 is None or end_page1 is None:
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
return None, None
# 提取第二部分
start_page2 = end_page1 # 第二部分的开始页就是第一部分的结束页
_, end_page2 = extract_pages_generic(pdf_document, end_pattern, end_pattern, start_page2 - 1, common_header,
exclusion_pattern)
if end_page2 is None:
print(f"second: {output_suffix} 未找到第二部分的结束页在文件 {pdf_path} 中!")
return None, None
# 保存提取的页面
path1 = save_extracted_pages(pdf_document, start_page1, end_page1, pdf_path, output_folder,
"tobidders_notice_part1")
path2 = save_extracted_pages(pdf_document, start_page2, end_page2, pdf_path, output_folder,
"tobidders_notice_part2")
return path1, path2
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header):
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
pdf_document = PdfReader(pdf_path)
patterns = None
begin_page = 0
if output_suffix == "procurement":
patterns = [get_patterns_for_procurement()]
begin_page = 5
elif output_suffix == "evaluation_method" or output_suffix == "qualification2" or output_suffix == "qualification3":
patterns = [get_patterns_for_evaluation_method()]
begin_page = 5
elif output_suffix == "qualification1":
patterns = [get_patterns_for_qualification()] # This now returns a tuple of pattern pairs
begin_page = 5
elif output_suffix == "notice":
patterns = [get_patterns_for_notice()]
begin_page = 0
# Try each set of patterns until a valid range is found
for pattern_pair in patterns:
start_page, end_page = extract_pages_generic(pdf_document, pattern_pair[0], pattern_pair[1], begin_page,
common_header,
exclusion_pattern, output_suffix)
if start_page is not None and end_page is not None:
break
if start_page is None or end_page is None:
if output_suffix == "qualification1":
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
print("third:尝试提取评分办法章节...")
temp = truncate_pdf_main(pdf_path, output_folder, 2, "qualification2")
if len(temp) > 0:
return temp[0]
else:
return None
else:
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
# def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix):
# if output_suffix=='notice':
# print(start_page)
# base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
# output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
# output_doc = PdfWriter()
# for page_num in range(start_page, end_page + 1):
# output_doc.add_page(pdf_document.pages[page_num])
# with open(output_pdf_path, 'wb') as f:
# output_doc.write(f)
# print(f"{output_suffix} 已截取并保存页面从 {start_page} 到 {end_page} 为 {output_pdf_path}")
# return output_pdf_path
def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix):
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
if output_suffix == 'notice' and start_page - 1 >= 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
before_doc = PdfWriter()
for page_num in range(0, start_page):
before_doc.add_page(pdf_document.pages[page_num])
with open(before_pdf_path, 'wb') as f:
before_doc.write(f)
print(f"已保存页面从 0 到 {start_page - 1}{before_pdf_path}")
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
output_doc.add_page(pdf_document.pages[page_num])
with open(output_pdf_path, 'wb') as f:
output_doc.write(f)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
#合并封面+招标公告+投标人须知前附表+须知正文
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name):
"""
合并 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件,以及 truncate_files 中的指定文件。
参数:
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径。
- truncate_files (list): 包含 PDF 文件路径的列表。
- output_path (str): 合并后的 PDF 文件保存路径。
- base_file_name (str): 用于匹配文件名的基础名称。
"""
# 1. 查找 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件
pattern = f'*{base_file_name}_before.pdf'
before_pdfs = glob.glob(os.path.join(output_folder, pattern))
print(f"找到 {len(before_pdfs)} 个以 '{base_file_name}_before.pdf' 结尾的文件。")
# 2. 获取 truncate_files 中指定的文件索引5、3、4
selected_indices = [5, 3, 4] # 注意索引从0开始
selected_truncate_pdfs = []
for idx in selected_indices:
if idx < len(truncate_files):
selected_truncate_pdfs.append(truncate_files[idx])
print(f"选中 truncate_files[{idx}]: {truncate_files[idx]}")
else:
print(f"truncate_files 列表中没有索引为 {idx} 的元素。")
# 3. 合并所有 PDF 文件
all_pdfs_to_merge = before_pdfs + selected_truncate_pdfs
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
if not all_pdfs_to_merge:
print("没有找到要合并的 PDF 文件。")
return
# 调用 merge_pdfs 函数
merge_pdfs(all_pdfs_to_merge, output_path)
def truncate_pdf_main(input_path, output_folder, selection, output_suffix="default"):
if selection == 1:
# 更新的正则表达式以匹配"第x章"和"第x部分",考虑到可能的空格和其他文字
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|'
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?采购.*'
)
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
)
local_output_suffix = "procurement"
elif selection == 2:
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*'
)
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
)
local_output_suffix = "evaluation_method"
elif selection == 3:
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE
)
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
)
local_output_suffix = "qualification1"
elif selection == 4: # 投标人须知前附表和正文
begin_page = 1
begin_pattern = re.compile(
r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)',
re.MULTILINE
)
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
)
local_output_suffix = "tobidders_notice"
elif selection == 5: # 招标公告
begin_page = 0
begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书).*'
)
end_pattern = re.compile(
r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)'
)
local_output_suffix = "notice"
else:
print("无效的选择:请选择1-6")
return None
# 如果传入的 output_suffix 是 'default',则使用本地生成的 output_suffix
if output_suffix == "default":
output_suffix = local_output_suffix
# 调用相应的处理函数
return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
def truncate_pdf_multiple(pdf_path, output_folder):
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
truncate_files = []
for selection in range(1, 6):
files = truncate_pdf_main(pdf_path, output_folder, selection)
if files:
truncate_files.extend(files)
if truncate_files:
merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_baseinfo.pdf")
merge_selected_pdfs(output_folder, truncate_files, merged_output_path,base_file_name)
truncate_files.append(merged_output_path)
print(f"已生成合并文件: {merged_output_path}")
else:
print(f"没有文件需要合并 for {pdf_path}")
return truncate_files
# TODO:交通智能系统和招标(1)(1)文件有问题 sele=4的时候excludsion有问题
if __name__ == "__main__":
input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\6.2定版视频会议磋商文件.pdf"
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\truncate_all"
files = truncate_pdf_multiple(input_path, output_folder)
print(files)
# selection = 1 # 例如1 - 商务技术服务要求, 2 - 评标办法, 3 - 资格审查后缀有qualification1或qualification2与评标办法一致 4.投标人须知前附表part1 投标人须知正文part2 5-公告
# generated_files = truncate_pdf_main(input_path, output_folder, selection)