zbparse/flask_app/general/merge_pdfs.py

218 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from PyPDF2 import PdfReader, PdfWriter
#合并PDF
def merge_pdfs(paths, output_path):
# 检查是否所有路径都是空字符串或仅包含空白字符
if not any(path.strip() for path in paths):
return ""
pdf_writer = PdfWriter()
last_page_text = None # 用于存储上一个PDF的最后一页的文本
for path in paths:
# 跳过空字符串或仅包含空白字符的路径
if not path.strip():
continue
try:
pdf_reader = PdfReader(path)
pages = pdf_reader.pages
start_index = 0 # 从第一页开始添加
# 如果这不是第一个文件,并且有上一个文件的最后一页文本
if last_page_text is not None and len(pages) > 0:
current_first_page_text = pages[0].extract_text() or ""
# 比较当前文件的第一页和上一个文件的最后一页的文本
if current_first_page_text == last_page_text:
start_index = 1 # 如果相同,跳过当前文件的第一页
# 添加当前PDF的页面到写入器
for page_num in range(start_index, len(pages)):
pdf_writer.add_page(pages[page_num])
# 更新 last_page_text 为当前PDF的最后一页的文本
if len(pages) > 0:
last_page_text = pages[-1].extract_text() or ""
except Exception as e:
print(f"文件 '{path}' 无法处理,错误: {e}")
continue
# 如果没有添加任何页面,返回空字符串
if len(pdf_writer.pages) == 0:
return ""
# 写入合并后的PDF到文件
try:
with open(output_path, 'wb') as out:
pdf_writer.write(out)
return output_path
except Exception as e:
print(f"无法写入输出文件,错误: {e}")
return ""
def judge_file_exist(original_path, new_suffix):
# 提取目录路径和原始文件名
directory = os.path.dirname(original_path)
original_filename = os.path.basename(original_path)
# 替换文件名中的旧后缀为新后缀
# 假设原始文件名格式为 '2-招标文件_qualification.pdf'
# 需要替换 '_qualification' 部分为 '_qualification2'
new_filename = original_filename.replace("_qualification1", f"_{new_suffix}")
new_filename = new_filename.replace("_qualification2", f"_{new_suffix}")
# 生成新的文件路径
new_file_path = os.path.join(directory, new_filename)
# 检查新文件是否存在
if os.path.isfile(new_file_path):
return new_file_path
else:
return None
def merge_and_cleanup(output_pdf_path, suffix_to_merge):
another_file_path = judge_file_exist(output_pdf_path, suffix_to_merge)
if another_file_path:
paths = [output_pdf_path, another_file_path] # 需要合并的 PDF 文件路径
merge_pdfs(paths, output_pdf_path)
os.remove(another_file_path)
print(f"文件 {another_file_path} 已删除。")
def find_and_merge(target_path, output_suffix):
# 获取 target_path 所在的目录
directory = os.path.dirname(target_path)
full_path=""
# 遍历目录中的所有文件,寻找以 output_suffix 结尾的文件
for filename in os.listdir(directory):
if filename.endswith(output_suffix):
# 拼接目录路径和文件名,生成完整路径
full_path = os.path.join(directory, filename)
if not full_path:
paths=[target_path]
else:
paths=[target_path,full_path]
merge_pdfs(paths,target_path)
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'):
"""
通用的 PDF 合并函数,根据不同的模式合并指定的文件。
参数:
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径。
- truncate_files (list): 包含 PDF 文件路径的列表。
- output_path (str): 合并后的 PDF 文件保存路径。
- base_file_name (str): 用于匹配文件名的基础名称。
- mode (str): 合并模式,支持 'engineering''goods'
返回:
- str: 如果合并成功,返回 output_path否则返回空字符串 ""
"""
try:
all_output_files = os.listdir(output_folder)
except FileNotFoundError:
print(f"输出文件夹 '{output_folder}' 未找到。")
return ""
except PermissionError:
print(f"没有权限访问输出文件夹 '{output_folder}'")
return ""
if mode == 'engineering':
required_suffixes = [
f'{base_file_name}_before.pdf',
f'{base_file_name}_notice.pdf',
f'{base_file_name}_tobidders_notice_table.pdf'
]
optional_suffixes = []
elif mode == 'goods':
required_suffixes = [
f'{base_file_name}_before.pdf',
f'{base_file_name}_notice.pdf',
f'{base_file_name}_tobidders_notice_part1.pdf'
]
optional_suffixes = [
f'{base_file_name}_tobidders_notice_part2.pdf'
]
else:
print(f"未知的合并模式: {mode}")
return ""
all_pdfs_to_merge = []
missing_files = []
# 处理必需的文件
for suffix in required_suffixes:
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"没有找到以 '{suffix}' 结尾的文件。")
missing_files.append(suffix)
# 处理可选的文件
for suffix in optional_suffixes:
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"可选文件 '{suffix}' 未找到,继续合并。")
# 检查是否所有必需的文件都已找到
if missing_files:
print("缺少以下必要的 PDF 文件,无法进行合并:")
for missing in missing_files:
print(f" - {missing}")
return ""
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
# 过滤掉不存在或为空的文件路径
all_pdfs_to_merge = [f for f in all_pdfs_to_merge if os.path.isfile(f) and os.path.getsize(f) > 0]
if not all_pdfs_to_merge:
print("没有找到要合并的有效 PDF 文件。")
return ""
# 调用 merge_pdfs 函数进行合并
try:
merge_pdfs(all_pdfs_to_merge, output_path)
print(f"已成功合并 PDF 文件到 '{output_path}'")
except Exception as e:
print(f"合并 PDF 文件时出错: {e}")
return ""
# 检查合并后的文件是否存在且不为空
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
if os.path.exists(before_pdf_path):
try:
os.remove(before_pdf_path)
print(f"已删除文件: {before_pdf_path}")
except Exception as e:
print(f"删除文件 {before_pdf_path} 时出错: {e}")
else:
print(f"未找到要删除的文件: {before_pdf_path}")
return output_path
else:
print(f"合并失败,没有生成 '{output_path}'")
return ""