zbparse/flask_app/general/merge_pdfs.py

257 lines
10 KiB
Python
Raw Normal View History

2024-10-17 19:07:57 +08:00
import os
import fitz
2024-10-17 19:07:57 +08:00
from PyPDF2 import PdfReader, PdfWriter
2025-02-16 18:09:45 +08:00
from flask_app.general.读取文件.clean_pdf import create_get_text_function
2024-10-17 19:07:57 +08:00
#合并PDF
# 主合并函数,尝试使用 PyPDF2若失败则使用 fitz
2024-10-17 19:07:57 +08:00
def merge_pdfs(paths, output_path):
# 检查所有路径是否为空或仅包含空白字符
if not any(path.strip() for path in paths):
return ""
# 优先尝试 PyPDF2 合并
try:
return merge_with_pypdf2(paths, output_path)
except Exception as e:
print(f"使用 PyPDF2 合并失败,尝试使用 fitz。错误: {e}")
# 若失败则尝试 fitz
try:
return merge_with_fitz(paths, output_path)
except Exception as ex:
print(f"使用 fitz 合并也失败了。错误: {ex}")
return ""
def merge_with_pypdf2(paths, output_path):
2024-10-17 19:07:57 +08:00
pdf_writer = PdfWriter()
last_page_text = None # 存储上一个 PDF 的最后一页文本
2024-10-17 19:07:57 +08:00
for path in paths:
if not path.strip():
continue
try:
2025-02-16 18:09:45 +08:00
with open(path, "rb") as f:
pdf_reader = PdfReader(f)
pages = pdf_reader.pages
get_text = create_get_text_function('pypdf2', pdf_reader)
start_index = 0
# 如果上一份 PDF 的最后一页文本与当前 PDF 的第一页文本相同,则跳过当前 PDF 的第一页
if last_page_text is not None and pages:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
for page_num in range(start_index, len(pages)):
pdf_writer.add_page(pages[page_num])
# 更新 last_page_text 为当前 PDF 的最后一页文本
if pages:
last_page_text = get_text(len(pages) - 1)
except Exception as e:
print(f"文件 '{path}' 无法使用 PyPDF2 处理,错误: {e}")
continue
2024-10-17 19:07:57 +08:00
if len(pdf_writer.pages) == 0:
return ""
with open(output_path, 'wb') as out:
pdf_writer.write(out)
return output_path
def merge_with_fitz(paths, output_path):
2025-02-16 18:09:45 +08:00
# 使用 fitz 创建新文档,作为上下文管理器自动释放资源
last_page_text = None
2025-02-16 18:09:45 +08:00
with fitz.open() as merged_pdf:
for path in paths:
if not path.strip():
continue
try:
# 使用 with 确保 pdf_doc 在处理后自动关闭
with fitz.open(path) as pdf_doc:
get_text = create_get_text_function('fitz', pdf_doc)
page_count = pdf_doc.page_count
start_index = 0
if last_page_text is not None and page_count > 0:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
# 插入页面到新文档
for page_num in range(start_index, page_count):
merged_pdf.insert_pdf(pdf_doc, from_page=page_num, to_page=page_num)
if page_count > 0:
last_page_text = get_text(page_count - 1)
except Exception as e:
print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}")
continue
# 若合并后的文档为空,则直接返回空字符串
if merged_pdf.page_count == 0:
return ""
try:
2025-02-16 18:09:45 +08:00
merged_pdf.save(output_path)
return output_path
except Exception as e:
2025-02-16 18:09:45 +08:00
print(f"无法写入输出文件,错误: {e}")
return ""
2024-10-17 19:07:57 +08:00
def judge_file_exist(original_path, new_suffix):
# 提取目录路径和原始文件名
directory = os.path.dirname(original_path)
original_filename = os.path.basename(original_path)
# 替换文件名中的旧后缀为新后缀
# 假设原始文件名格式为 '2-招标文件_qualification.pdf'
# 需要替换 '_qualification' 部分为 '_qualification2'
new_filename = original_filename.replace("_qualification1", f"_{new_suffix}")
new_filename = new_filename.replace("_qualification2", f"_{new_suffix}")
# 生成新的文件路径
new_file_path = os.path.join(directory, new_filename)
# 检查新文件是否存在
if os.path.isfile(new_file_path):
return new_file_path
else:
return None
def merge_and_cleanup(output_pdf_path, suffix_to_merge):
another_file_path = judge_file_exist(output_pdf_path, suffix_to_merge)
if another_file_path:
paths = [output_pdf_path, another_file_path] # 需要合并的 PDF 文件路径
merge_pdfs(paths, output_pdf_path)
os.remove(another_file_path)
2024-10-23 20:33:41 +08:00
print(f"文件 {another_file_path} 已删除。")
def find_and_merge(target_path, output_suffix):
# 获取 target_path 所在的目录
directory = os.path.dirname(target_path)
full_path=""
# 遍历目录中的所有文件,寻找以 output_suffix 结尾的文件
for filename in os.listdir(directory):
if filename.endswith(output_suffix):
# 拼接目录路径和文件名,生成完整路径
full_path = os.path.join(directory, filename)
if not full_path:
paths=[target_path]
else:
paths=[target_path,full_path]
2024-10-31 20:12:08 +08:00
merge_pdfs(paths,target_path)
2025-02-16 18:09:45 +08:00
#TODO: before应该是可选的
#如果报红 貌似会额外占内存
2024-12-17 18:44:58 +08:00
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'):
2024-10-31 20:12:08 +08:00
"""
2024-12-17 18:44:58 +08:00
通用的 PDF 合并函数根据不同的模式合并指定的文件
2024-10-31 20:12:08 +08:00
参数
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径
- truncate_files (list): 包含 PDF 文件路径的列表
- output_path (str): 合并后的 PDF 文件保存路径
- base_file_name (str): 用于匹配文件名的基础名称
2024-12-17 18:44:58 +08:00
- mode (str): 合并模式支持 'engineering' 'goods'
2024-10-31 20:12:08 +08:00
返回:
2024-12-17 18:44:58 +08:00
- str: 如果合并成功返回 output_path否则返回空字符串 ""
2024-10-31 20:12:08 +08:00
"""
try:
all_output_files = os.listdir(output_folder)
except FileNotFoundError:
print(f"输出文件夹 '{output_folder}' 未找到。")
return ""
except PermissionError:
print(f"没有权限访问输出文件夹 '{output_folder}'")
return ""
2024-12-17 18:44:58 +08:00
if mode == 'engineering':
2025-02-16 18:09:45 +08:00
suffixes = [
(f'{base_file_name}_before.pdf', False), # 可选
(f'{base_file_name}_notice.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part1.pdf', True) # 必需
2024-12-17 18:44:58 +08:00
]
elif mode == 'goods':
2025-02-16 18:09:45 +08:00
suffixes = [
(f'{base_file_name}_before.pdf', False), # 可选
(f'{base_file_name}_notice.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part1.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part2.pdf', False) # 可选
2024-12-17 18:44:58 +08:00
]
2024-10-31 20:12:08 +08:00
else:
2024-12-17 18:44:58 +08:00
print(f"未知的合并模式: {mode}")
2024-10-31 20:12:08 +08:00
return ""
all_pdfs_to_merge = []
2024-12-17 18:44:58 +08:00
missing_files = []
2024-10-31 20:12:08 +08:00
2025-02-16 18:09:45 +08:00
for suffix, required in suffixes:
# 如果是 before 文件,则从 output_folder 中查找,否则从 truncate_files 中查找
2024-10-31 20:12:08 +08:00
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
else:
2025-02-16 18:09:45 +08:00
if required:
print(f"没有找到以 '{suffix}' 结尾的文件。")
missing_files.append(suffix)
else:
print(f"可选文件 '{suffix}' 未找到,继续合并。")
2024-10-31 20:12:08 +08:00
if missing_files:
print("缺少以下必要的 PDF 文件,无法进行合并:")
for missing in missing_files:
print(f" - {missing}")
return ""
2025-02-16 18:09:45 +08:00
2024-10-31 20:12:08 +08:00
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
2024-12-17 18:44:58 +08:00
# 过滤掉不存在或为空的文件路径
2024-10-31 20:12:08 +08:00
all_pdfs_to_merge = [f for f in all_pdfs_to_merge if os.path.isfile(f) and os.path.getsize(f) > 0]
if not all_pdfs_to_merge:
print("没有找到要合并的有效 PDF 文件。")
return ""
2024-12-17 18:44:58 +08:00
# 调用 merge_pdfs 函数进行合并
2024-10-31 20:12:08 +08:00
try:
merge_pdfs(all_pdfs_to_merge, output_path)
print(f"已成功合并 PDF 文件到 '{output_path}'")
except Exception as e:
print(f"合并 PDF 文件时出错: {e}")
return ""
2024-12-17 18:44:58 +08:00
# 检查合并后的文件是否存在且不为空
2024-10-31 20:12:08 +08:00
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
if os.path.exists(before_pdf_path):
try:
os.remove(before_pdf_path)
print(f"已删除文件: {before_pdf_path}")
except Exception as e:
print(f"删除文件 {before_pdf_path} 时出错: {e}")
else:
print(f"未找到要删除的文件: {before_pdf_path}")
return output_path
else:
print(f"合并失败,没有生成 '{output_path}'")
2024-12-17 18:44:58 +08:00
return ""
if __name__ == "__main__":
path1=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_evaluation_method.pdf'
path2=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_qualification.pdf'
output_folder=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp'
output=os.path.join(output_folder,"merged_qualification.pdf")
path=[path1,path2]
output_path=merge_pdfs(path,output)
print(output_path)