zbparse/flask_app/main/截取pdf_old.py

531 lines
25 KiB
Python
Raw Normal View History

2024-10-29 09:04:23 +08:00
from PyPDF2 import PdfReader, PdfWriter
import re # 导入正则表达式库
import os # 用于文件和文件夹操作
from flask_app.general.merge_pdfs import merge_pdfs
import concurrent.futures
import logging
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
logger=None
def clean_page_content(text, common_header):
# 首先删除抬头公共部分
if common_header: # 确保有公共抬头才进行替换
for header_line in common_header.split('\n'):
if header_line.strip(): # 只处理非空行
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
return text
#PYPDF2库
def extract_common_header(pdf_path):
from PyPDF2 import PdfReader
pdf_document = PdfReader(pdf_path)
headers = []
total_pages = len(pdf_document.pages)
# 确定要读取的页数和起始页
if total_pages == 2:
pages_to_read = 2
start_page = 0
else:
pages_to_read = 3
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
for i in range(start_page, min(start_page + pages_to_read, total_pages)):
page = pdf_document.pages[i]
text = page.extract_text() or ""
if text:
# 只取每页的前三行
first_lines = text.strip().split('\n')[:3]
headers.append(first_lines)
if len(headers) < 2:
return "" # 如果没有足够的页来比较,返回空字符串
# 寻找每一行中的公共部分,按顺序保留
common_headers = []
for lines in zip(*headers):
# 提取第一行的词汇顺序
first_words = lines[0].split()
# 筛选所有页面都包含的词汇,保持顺序
common_line = [word for word in first_words if all(word in line.split() for line in lines[1:])]
if common_line:
common_headers.append(' '.join(common_line))
return '\n'.join(common_headers)
def save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page,common_header):
"""
从原始PDF截取指定范围的页面并保存到新的PDF文件中
如果 output_suffix 'notice'则额外保存 start_page 之前的页面
参数:
pdf_path (str): 原始PDF文件路径
output_folder (str): 输出文件夹路径
output_suffix (str): 输出文件的后缀用于区分不同的提取
start_page (int): 起始页码0
end_page (int): 结束页码0
返回:
str: 保存的PDF文件路径如果提取失败返回空字符串
"""
try:
# 获取文件基本名称
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
# 构建主要输出文件路径
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
# 读取PDF文件
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
# 检查起始和结束页码是否有效
if start_page < 0 or end_page >= total_pages or start_page > end_page:
print(f"无效的页面范围: {start_page}{end_page}")
return ""
# 如果 output_suffix 是 'notice',保存 start_page 之前的页面(若遇到'目录',则截取到'目录')
if output_suffix == 'notice' and start_page > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
before_doc = PdfWriter()
toc_page = -1
# 查找目录页
for page_num in range(min(start_page, total_pages)):
page_text = pdf_document.pages[page_num].extract_text()
cleaned_text = clean_page_content(page_text, common_header)
if re.search(r'\s*录', cleaned_text, re.MULTILINE):
toc_page = page_num
break
# 确定截取的页数
pages_to_extract = toc_page + 1 if toc_page != -1 else start_page
# 提取页面
for page_num in range(pages_to_extract):
before_doc.add_page(pdf_document.pages[page_num])
with open(before_pdf_path, 'wb') as f_before:
before_doc.write(f_before)
print(f"已保存页面从 0 到 {pages_to_extract}{before_pdf_path}")
# 提取指定范围的页面
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
output_doc.add_page(pdf_document.pages[page_num])
# 保存新的PDF文件
with open(output_pdf_path, 'wb') as f_output:
output_doc.write(f_output)
print(f"{output_suffix} 已截取并保存页面从 {start_page + 1}{end_page + 1}{output_pdf_path}")
return output_pdf_path
except Exception as e:
print(f"Error in save_pages_to_new_pdf: {e}")
return "" # 返回空字符串
def extract_pages_twice(pdf_path, output_folder, output_suffix):
common_header = extract_common_header(pdf_path)
last_begin_index = 0
begin_pattern = re.compile(r'第[一二三四五六七八九十]+章\s*招标公告|第一卷|投标邀请书')
pdf_document = PdfReader(pdf_path)
for i, page in enumerate(pdf_document.pages):
text = page.extract_text()
if text:
cleaned_text = clean_page_content(text,common_header)
# 检查“第一章”开始的位置
if begin_pattern.search(cleaned_text):
last_begin_index = i # 更新最后匹配的索引页码从0开始
if output_suffix == "qualification":
common_pattern = r'^(?:附录(?:一)?[:]|附件(?:一)?[:]|附表(?:一)?[:])'
# end_pattern = r'^(第[一二三四五六七八九十]+章\s*投标人须知|评标办法|评标办法前附表)'
end_pattern = re.compile(
common_pattern + r'(?!.*(?:资质|能力|信誉)).*$|' # 排除资质、能力、信誉的描述
r'^(第[一二三四五六七八九十]+章\s*评标办法|评标办法前附表|投标人须知)', # 新增的匹配项
re.MULTILINE
)
start_page = None
end_page = None
# 从章节开始后的位置进行检查
for i, page in enumerate(pdf_document.pages[last_begin_index:], start=last_begin_index):
text = page.extract_text()
if text:
cleaned_text = clean_page_content(text,common_header)
# 确定起始页需在last_begin_index之后
if ("资格审查" in cleaned_text or "资质条件" in cleaned_text):
if re.search(common_pattern, cleaned_text, re.MULTILINE):
if start_page is None:
start_page = i # 确保起始页不小于章节的开始页码
# 确定结束页
if start_page is not None and re.search(end_pattern, cleaned_text):
if i > start_page:
end_page = i
break # 找到结束页后退出循环
if start_page is None or end_page is None:
print(f"{output_suffix} twice: 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
else:
return save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page,common_header)
elif output_suffix == "invalid":
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
# 计算总页数的三分之二
total = int(total_pages * 2 / 3)
start_page = last_begin_index
end_page = min(90, total)
return save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page,common_header)
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
common_header=extract_common_header(pdf_path)
# 打开PDF文件
pdf_document = PdfReader(pdf_path)
start_page = None
end_page = None
# 遍历文档的每一页,查找开始和结束短语的位置
for i in range(len(pdf_document.pages)):
page = pdf_document.pages[i]
text = page.extract_text()
if text:
cleaned_text = clean_page_content(text,common_header)
# print(cleaned_text)
if re.search(begin_pattern, cleaned_text) and i > begin_page:
if output_suffix == "invalid" and start_page: # 仅当提取Invalid的时候判断初始页码是第一个匹配到的页码因为招标编号可能存在多个后面的覆盖前面
continue
else:
start_page = i
if start_page is not None and re.search(end_pattern, cleaned_text):
# 如果output_suffix是"qualification",调整条件检查
condition = i > start_page
if condition:
is_invalid_condition = output_suffix == "invalid" and i > 30 # 这边默认无效投标至少有30页
if is_invalid_condition or output_suffix != "invalid":
end_page = i
break
# 确保找到了起始和结束页面
if start_page is None or end_page is None:
if output_suffix == "qualification" or output_suffix =="invalid":
return extract_pages_twice(pdf_path, output_folder, output_suffix)
else:
print(f"{output_suffix} first: 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
else:
return save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page,common_header)
def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
if os.path.isdir(input_path):
generated_files = []
# 遍历文件夹内的所有PDF文件
for file in os.listdir(input_path):
if file.endswith(".pdf"):
pdf_path = os.path.join(input_path, file)
output_pdf_path = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern,
output_suffix)
if output_pdf_path and os.path.isfile(output_pdf_path):
generated_files.append(output_pdf_path)
return generated_files
elif os.path.isfile(input_path) and input_path.endswith(".pdf"):
# 处理单个PDF文件
output_pdf_path = extract_pages(input_path, output_folder, begin_pattern, begin_page, end_pattern,
output_suffix)
if output_pdf_path and os.path.isfile(output_pdf_path):
return [output_pdf_path] # 以列表形式返回,以保持一致性
else:
print("提供的路径既不是文件夹也不是PDF文件。")
return []
def truncate_pdf_main(input_path, output_folder, selection):
try:
if selection == 1:
# Configure patterns and phrases for "投标人须知前附表"
begin_pattern = re.compile(r'第[一二三四五六七八九十]+章\s*投标人须知')
begin_page = 3
end_pattern = re.compile(r'投标人须知正文')
output_suffix = "tobidders_notice_table"
elif selection == 2:
# Configure patterns and phrases for "评标办法"
begin_pattern = re.compile(
r'第[一二三四五六七八九十]+章\s*评标办法') # 考虑到这种情况 '第三章 第三章 第三章 第三章 评标办法 评标办法 评标办法 评标办法'
begin_page = 10
end_pattern = re.compile(r'评标办法正文|评标办法')
output_suffix = "evaluation_method"
elif selection == 3:
# Configure patterns and phrases for "投标人须知正文"
begin_pattern = re.compile(r'投标人须知正文')
begin_page = 5
end_pattern = re.compile(
r'^第[一二三四五六七八九十]+章\s*评标办法|^评标办法前附表|^附录(?:一)?[:]|^附件(?:一)?[:]|^附表(?:一)?[:]',
re.MULTILINE)
output_suffix = "tobidders_notice"
elif selection == 4:
# 配置用于 "资格审查条件" 的正则表达式模式和短语
common_pattern = r'^(?:附录(?:一)?[:]|附件(?:一)?[:]|附表(?:一)?[:])'
begin_pattern = re.compile(common_pattern + r'.*(?:资质|能力|信誉).*$', re.MULTILINE)
begin_page = 5
end_pattern = re.compile(
common_pattern + r'(?!.*(?:资质|能力|信誉)).*$|' # 原有的模式
r'^(第[一二三四五六七八九十]+章\s*评标办法|评标办法前附表)', # 新增的匹配项
re.MULTILINE
)
output_suffix = "qualification"
elif selection == 5:
# 配置用于 "招标公告" 的正则表达式模式和短语
begin_pattern = re.compile(r'^第[一二三四五六七八九十]+章\s*(招标公告|.*邀请.*)|^第一卷|^投标邀请书')
begin_page = 0
end_pattern = re.compile(r'第[一二三四五六七八九十]+章\s*投标人须知')
output_suffix = "notice"
elif selection == 6:
# 配置用于 "无效标" 的正则表达式模式和短语
begin_pattern = re.compile(r'第[一二三四五六七八九十]+章\s*(招标公告|.*邀请.*)|第一卷|招标编号:|招标编号:')
begin_page = 0
end_pattern = re.compile(r'第[一二三四五六七八九十]+章\s*合同|[:]清标报告|^第二卷', re.MULTILINE)
output_suffix = "invalid"
else:
print("无效的选择:请选择1-6")
return [""]
# 调用 process_input 进行截取
output_paths = process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if not output_paths:
return [""] # 截取失败时返回空字符串
return output_paths
except Exception as e:
print(f"Error in truncate_pdf_main for selection {selection}: {e}")
return [""] # 捕获异常时返回空字符串
def truncate_pdf_multiple_old(input_path, output_folder):
truncate_files = []
for selection in range(1, 5):
files = truncate_pdf_main(input_path, output_folder, selection)
truncate_files.extend(files)
return truncate_files
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name):
"""
合并 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件
以及 truncate_files 中以指定后缀结尾的文件按照指定顺序合并
参数
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径
- truncate_files (list): 包含 PDF 文件路径的列表
- output_path (str): 合并后的 PDF 文件保存路径
- base_file_name (str): 用于匹配文件名的基础名称
返回:
- str: 合并后的 PDF 文件路径如果没有合并则返回 ""
"""
# 1. 获取 output_folder 中所有文件
try:
all_output_files = os.listdir(output_folder)
except FileNotFoundError:
print(f"输出文件夹 '{output_folder}' 未找到。")
return ""
except PermissionError:
print(f"没有权限访问输出文件夹 '{output_folder}'")
return ""
# 2. 定义要选择的文件后缀及合并顺序,包括 before 文件
desired_suffixes = [
f'{base_file_name}_before.pdf',
f'{base_file_name}_notice.pdf',
f'{base_file_name}_tobidders_notice_table.pdf'
]
all_pdfs_to_merge = []
for suffix in desired_suffixes:
if suffix == f'{base_file_name}_before.pdf':
# 从 output_folder 中选择以 {base_file_name}_before.pdf 结尾的文件
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
# 从 truncate_files 中选择以指定后缀结尾的文件
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
# 如果找到多个匹配的文件,按名称排序并添加
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"没有找到以 '{suffix}' 结尾的文件。")
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
if not all_pdfs_to_merge:
print("没有找到要合并的 PDF 文件。")
return ""
# 过滤掉不存在或为空的文件路径
all_pdfs_to_merge = [f for f in all_pdfs_to_merge if os.path.isfile(f)]
if not all_pdfs_to_merge:
print("没有有效的 PDF 文件需要合并。")
return ""
# 调用 merge_pdfs 函数进行合并
try:
merge_pdfs(all_pdfs_to_merge, output_path)
print(f"已成功合并 PDF 文件到 '{output_path}'")
return output_path
except Exception as e:
print(f"合并 PDF 文件时出错: {e}")
return ""
def truncate_pdf_multiple(input_path, output_folder, unique_id="123"):
"""
处理 PDF 文件选择 selection 1-6 的部分并合并结果
Args:
input_path (str): 要处理的 PDF 文件路径
output_folder (str): 截取后的文件保存文件夹路径
Returns:
list: 截取的文件路径列表包括合并后的文件路径如果有
"""
global logger
logger = get_global_logger(unique_id)
base_file_name = os.path.splitext(os.path.basename(input_path))[0] # 纯文件名
truncate_files = []
selections = range(1, 7) # 选择 1 到 6
# 使用 ThreadPoolExecutor 进行多线程处理
with concurrent.futures.ThreadPoolExecutor(max_workers=len(selections)) as executor:
# 提交所有任务并保持 selection 顺序
future_to_selection = {selection: executor.submit(truncate_pdf_main, input_path, output_folder, selection) for selection in selections}
# 按 selection 顺序收集结果
for selection in selections:
future = future_to_selection.get(selection)
try:
files = future.result()
if files and any(f for f in files):
# 过滤空字符串
valid_files = [f for f in files if f]
truncate_files.extend(valid_files)
else:
logger.error(f"Selection {selection}: 截取失败,已添加空字符串。")
truncate_files.append("") # 截取失败时添加空字符串
except Exception as e:
logger.error(f"Selection {selection} 生成了一个异常: {e}")
truncate_files.append("") # 发生异常时添加空字符串
if any(f for f in truncate_files if f): # 检查是否有有效的文件路径
merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_baseinfo.pdf")
merged_result = merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name)
if merged_result:
truncate_files.append(merged_result)
logger.info(f"merged_baseinfo: 已生成合并文件: {merged_output_path}")
else:
truncate_files.append("") # 如果merged_result未生成添加空字符串
logger.warning("merged_baseinfo: 未生成合并文件,因为没有找到需要合并的 PDF 文件。")
else:
truncate_files.append("") # 如果没有文件需要合并,也添加空字符串
logger.warning(f"merged_baseinfo: 没有文件需要合并 for {input_path}")
return truncate_files
def truncate_pdf_specific_engineering(pdf_path, output_folder, selections, unique_id="123"):
"""
处理 PDF 文件选择指定的 selections并合并结果
Args:
pdf_path (str): 要处理的 PDF 文件路径
output_folder (str): 截取后的文件保存文件夹路径
selections (list): 需要截取的部分例如 [4, 5]
unique_id (str): 用于日志记录的唯一标识符
Returns:
list: 截取的文件路径列表包括合并后的文件路径如果有
"""
try:
global logger
logger = get_global_logger(unique_id)
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
truncate_files = []
# 使用 ThreadPoolExecutor 进行多线程处理
with concurrent.futures.ThreadPoolExecutor(max_workers=len(selections)) as executor:
# 提交所有任务并保持 selection 顺序
future_to_selection = {selection: executor.submit(truncate_pdf_main, pdf_path, output_folder, selection) for selection in selections}
# 按 selection 顺序收集结果
for selection in selections:
future = future_to_selection.get(selection)
try:
files = future.result()
if files and any(f for f in files):
# 过滤空字符串
valid_files = [f for f in files if f]
truncate_files.extend(valid_files)
else:
logger.error(f"Selection {selection}: 截取失败,已添加空字符串。")
truncate_files.append("") # 截取失败时添加空字符串
except Exception as e:
logger.error(f"Selection {selection} 生成了一个异常: {e}")
truncate_files.append("") # 发生异常时添加空字符串
if any(f for f in truncate_files if f): # 检查是否有有效的文件路径
merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_specific.pdf")
merged_result = merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name)
if merged_result:
truncate_files.append(merged_result)
logger.info(f"merged_specific: 已生成合并文件: {merged_output_path}")
else:
truncate_files.append("") # 如果 merged_result 未生成,添加空字符串
logger.warning("merged_specific: 未生成合并文件,因为没有找到需要合并的 PDF 文件。")
else:
truncate_files.append("") # 如果没有文件需要合并,也添加空字符串
logger.warning(f"merged_specific: 没有文件需要合并 for {pdf_path}")
return truncate_files
except Exception as e:
# 假设 selections 的长度是已知的,可以通过传递参数或其他方式获取
logger.error(f"Error in truncate_pdf_specific_engineering: {e}")
return [""] * len(selections) # 返回与 selections 数量相同的空字符串列表
#TODO:zbtest8 zbtest18有问题 后期需要完善,截取需要截两次,第一次严格第二次宽松
if __name__ == "__main__":
2024-10-29 20:40:14 +08:00
input_path = "C:\\Users\\Administrator\\Desktop\\招标文件\\new_test\\zbtest8.pdf"
2024-10-29 09:04:23 +08:00
# input_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\68549b0b-e892-41a9-897c-c3694535ee61\\ztbfile.pdf"
# input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\2-招标文件.pdf"
output_folder="C:\\Users\\Administrator\\Desktop\\招标文件\\output6"
files=truncate_pdf_multiple(input_path,output_folder)
# selections = [5, 1] # 仅处理 selection 5、1 和 3
# files=truncate_pdf_specific_engineering(input_path,output_folder,selections)
print(files)
# selection = 6 # 例如1 - 投标人须知前附表, 2 - 评标办法, 3 - 投标人须知正文 4-资格审查条件 5-招标公告 6-无效标
# generated_files = truncate_pdf_main(input_path, output_folder, selection)
# print(generated_files)
# print("生成的文件:", generated_files)