zbparse/flask_app/general/merge_pdfs.py

299 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import fitz
import tempfile
import logging
from PyPDF2 import PdfReader, PdfWriter, PdfMerger
from flask_app.general.读取文件.clean_pdf import create_get_text_function
def merge_pdf_to(original_pdf, to_pdf, logger) -> str:
""" merge original pdf to exists pdf """
merger = None
temp_filename = None
try:
# 初始化PdfMerger
merger = PdfMerger()
# 读取目标PDF的内容
with open(to_pdf, 'rb') as f:
merger.append(f)
# 读取原始PDF的内容
with open(original_pdf, 'rb') as f:
merger.append(f)
# 创建临时文件
with tempfile.NamedTemporaryFile(dir=os.path.dirname(to_pdf), suffix='.pdf', delete=False) as temp_file:
temp_filename = temp_file.name
merger.write(temp_file)
# 用临时文件替换原文件
os.replace(temp_filename, to_pdf)
return to_pdf
except Exception as e:
logger.error(f"merge_pdf_to 遇到错误: {str(e)}")
# 发生错误时删除临时文件
if temp_filename and os.path.exists(temp_filename):
try:
os.remove(temp_filename)
except:
pass
return to_pdf
finally:
# 确保merger和临时文件被正确关闭/清理
if merger is not None:
merger.close()
#合并PDF
# 主合并函数,尝试使用 PyPDF2若失败则使用 fitz
def merge_pdfs(paths, output_path):
# 检查所有路径是否为空或仅包含空白字符
if not any(path.strip() for path in paths):
return ""
# 优先尝试 PyPDF2 合并
try:
return merge_with_pypdf2(paths, output_path)
except Exception as e:
print(f"使用 PyPDF2 合并失败,尝试使用 fitz。错误: {e}")
# 若失败则尝试 fitz
try:
return merge_with_fitz(paths, output_path)
except Exception as ex:
print(f"使用 fitz 合并也失败了。错误: {ex}")
return ""
def merge_with_pypdf2(paths, output_path):
pdf_writer = PdfWriter()
last_page_text = None # 存储上一个 PDF 的最后一页文本
for path in paths:
if not path.strip():
continue
try:
with open(path, "rb") as f:
pdf_reader = PdfReader(f)
pages = pdf_reader.pages
get_text = create_get_text_function('pypdf2', pdf_reader)
start_index = 0
# 如果上一份 PDF 的最后一页文本与当前 PDF 的第一页文本相同,则跳过当前 PDF 的第一页
if last_page_text is not None and pages:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
for page_num in range(start_index, len(pages)):
pdf_writer.add_page(pages[page_num])
# 更新 last_page_text 为当前 PDF 的最后一页文本
if pages:
last_page_text = get_text(len(pages) - 1)
except Exception as e:
print(f"文件 '{path}' 无法使用 PyPDF2 处理,错误: {e}")
continue
if len(pdf_writer.pages) == 0:
return ""
with open(output_path, 'wb') as out:
pdf_writer.write(out)
return output_path
def merge_with_fitz(paths, output_path):
# 使用 fitz 创建新文档,作为上下文管理器自动释放资源
last_page_text = None
with fitz.open() as merged_pdf:
for path in paths:
if not path.strip():
continue
try:
# 使用 with 确保 pdf_doc 在处理后自动关闭
with fitz.open(path) as pdf_doc:
get_text = create_get_text_function('fitz', pdf_doc)
page_count = pdf_doc.page_count
start_index = 0
if last_page_text is not None and page_count > 0:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
# 插入页面到新文档
for page_num in range(start_index, page_count):
merged_pdf.insert_pdf(pdf_doc, from_page=page_num, to_page=page_num)
if page_count > 0:
last_page_text = get_text(page_count - 1)
except Exception as e:
print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}")
continue
# 若合并后的文档为空,则直接返回空字符串
if merged_pdf.page_count == 0:
return ""
try:
merged_pdf.save(output_path)
return output_path
except Exception as e:
print(f"无法写入输出文件,错误: {e}")
return ""
def judge_file_exist(original_path, new_suffix):
# 提取目录路径和原始文件名
directory = os.path.dirname(original_path)
original_filename = os.path.basename(original_path)
# 替换文件名中的旧后缀为新后缀
# 假设原始文件名格式为 '2-招标文件_qualification.pdf'
# 需要替换 '_qualification' 部分为 '_qualification2'
new_filename = original_filename.replace("_qualification1", f"_{new_suffix}")
new_filename = new_filename.replace("_qualification2", f"_{new_suffix}")
# 生成新的文件路径
new_file_path = os.path.join(directory, new_filename)
# 检查新文件是否存在
if os.path.isfile(new_file_path):
return new_file_path
else:
return None
def merge_and_cleanup(output_pdf_path, suffix_to_merge):
another_file_path = judge_file_exist(output_pdf_path, suffix_to_merge)
if another_file_path:
paths = [output_pdf_path, another_file_path] # 需要合并的 PDF 文件路径
merge_pdfs(paths, output_pdf_path)
os.remove(another_file_path)
print(f"文件 {another_file_path} 已删除。")
def find_and_merge(target_path, output_suffix):
# 获取 target_path 所在的目录
directory = os.path.dirname(target_path)
full_path=""
# 遍历目录中的所有文件,寻找以 output_suffix 结尾的文件
for filename in os.listdir(directory):
if filename.endswith(output_suffix):
# 拼接目录路径和文件名,生成完整路径
full_path = os.path.join(directory, filename)
if not full_path:
paths=[target_path]
else:
paths=[target_path,full_path]
merge_pdfs(paths,target_path)
#TODO: before应该是可选的
#如果报红 貌似会额外占内存
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'):
"""
通用的 PDF 合并函数,根据不同的模式合并指定的文件。
参数:
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径。
- truncate_files (list): 包含 PDF 文件路径的列表。
- output_path (str): 合并后的 PDF 文件保存路径。
- base_file_name (str): 用于匹配文件名的基础名称。
- mode (str): 合并模式,支持 'engineering''goods'
返回:
- str: 如果合并成功,返回 output_path否则返回空字符串 ""
"""
try:
all_output_files = os.listdir(output_folder)
except FileNotFoundError:
print(f"输出文件夹 '{output_folder}' 未找到。")
return ""
except PermissionError:
print(f"没有权限访问输出文件夹 '{output_folder}'")
return ""
if mode == 'engineering':
suffixes = [
(f'{base_file_name}_before.pdf', False), # 可选
(f'{base_file_name}_notice.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part1.pdf', True) # 必需
]
elif mode == 'goods':
suffixes = [
(f'{base_file_name}_before.pdf', False), # 可选
(f'{base_file_name}_notice.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part1.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part2.pdf', False) # 可选
]
else:
print(f"未知的合并模式: {mode}")
return ""
all_pdfs_to_merge = []
missing_files = []
for suffix, required in suffixes:
# 如果是 before 文件,则从 output_folder 中查找,否则从 truncate_files 中查找
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
else:
if required:
print(f"没有找到以 '{suffix}' 结尾的文件。")
missing_files.append(suffix)
else:
print(f"可选文件 '{suffix}' 未找到,继续合并。")
if missing_files:
print("缺少以下必要的 PDF 文件,无法进行合并:")
for missing in missing_files:
print(f" - {missing}")
return ""
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
# 过滤掉不存在或为空的文件路径
all_pdfs_to_merge = [f for f in all_pdfs_to_merge if os.path.isfile(f) and os.path.getsize(f) > 0]
if not all_pdfs_to_merge:
print("没有找到要合并的有效 PDF 文件。")
return ""
# 调用 merge_pdfs 函数进行合并
try:
merge_pdfs(all_pdfs_to_merge, output_path)
print(f"已成功合并 PDF 文件到 '{output_path}'")
except Exception as e:
print(f"合并 PDF 文件时出错: {e}")
return ""
# 检查合并后的文件是否存在且不为空
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
if os.path.exists(before_pdf_path):
try:
os.remove(before_pdf_path)
print(f"已删除文件: {before_pdf_path}")
except Exception as e:
print(f"删除文件 {before_pdf_path} 时出错: {e}")
else:
print(f"未找到要删除的文件: {before_pdf_path}")
return output_path
else:
print(f"合并失败,没有生成 '{output_path}'")
return ""
if __name__ == "__main__":
path1=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_evaluation_method.pdf'
path2=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_qualification.pdf'
output_folder=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp'
output=os.path.join(output_folder,"merged_qualification.pdf")
path=[path1,path2]
output_path=merge_pdfs(path,output)
print(output_path)