import os import fitz from PyPDF2 import PdfReader, PdfWriter from flask_app.general.截取pdf通用函数 import create_get_text_function #合并PDF # 主合并函数,尝试使用 PyPDF2,若失败则使用 fitz def merge_pdfs(paths, output_path): # 检查所有路径是否为空或仅包含空白字符 if not any(path.strip() for path in paths): return "" # 优先尝试 PyPDF2 合并 try: return merge_with_pypdf2(paths, output_path) except Exception as e: print(f"使用 PyPDF2 合并失败,尝试使用 fitz。错误: {e}") # 若失败则尝试 fitz try: return merge_with_fitz(paths, output_path) except Exception as ex: print(f"使用 fitz 合并也失败了。错误: {ex}") return "" def merge_with_pypdf2(paths, output_path): pdf_writer = PdfWriter() last_page_text = None # 存储上一个 PDF 的最后一页文本 for path in paths: if not path.strip(): continue try: pdf_reader = PdfReader(path) pages = pdf_reader.pages get_text = create_get_text_function('pypdf2', pdf_reader) start_index = 0 if last_page_text is not None and len(pages) > 0: current_first_page_text = get_text(0) if current_first_page_text == last_page_text: start_index = 1 for page_num in range(start_index, len(pages)): pdf_writer.add_page(pages[page_num]) if len(pages) > 0: last_page_text = get_text(len(pages) - 1) except Exception as e: print(f"文件 '{path}' 无法使用 PyPDF2 处理,错误: {e}") continue if len(pdf_writer.pages) == 0: return "" with open(output_path, 'wb') as out: pdf_writer.write(out) return output_path def merge_with_fitz(paths, output_path): # 使用 fitz 创建新文档 merged_pdf = fitz.open() last_page_text = None for path in paths: if not path.strip(): continue pdf_doc = None try: pdf_doc = fitz.open(path) get_text = create_get_text_function('fitz', pdf_doc) page_count = pdf_doc.page_count start_index = 0 if last_page_text is not None and page_count > 0: current_first_page_text = get_text(0) if current_first_page_text == last_page_text: start_index = 1 # 插入页面到新文档 for page_num in range(start_index, page_count): merged_pdf.insert_pdf(pdf_doc, from_page=page_num, to_page=page_num) if page_count > 0: last_page_text = get_text(page_count - 1) pdf_doc.close() except Exception as e: print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}") finally: if pdf_doc is not None: pdf_doc.close() if merged_pdf.page_count == 0: merged_pdf.close() return "" try: merged_pdf.save(output_path) merged_pdf.close() return output_path except Exception as e: print(f"无法写入输出文件,错误: {e}") merged_pdf.close() return "" def judge_file_exist(original_path, new_suffix): # 提取目录路径和原始文件名 directory = os.path.dirname(original_path) original_filename = os.path.basename(original_path) # 替换文件名中的旧后缀为新后缀 # 假设原始文件名格式为 '2-招标文件_qualification.pdf' # 需要替换 '_qualification' 部分为 '_qualification2' new_filename = original_filename.replace("_qualification1", f"_{new_suffix}") new_filename = new_filename.replace("_qualification2", f"_{new_suffix}") # 生成新的文件路径 new_file_path = os.path.join(directory, new_filename) # 检查新文件是否存在 if os.path.isfile(new_file_path): return new_file_path else: return None def merge_and_cleanup(output_pdf_path, suffix_to_merge): another_file_path = judge_file_exist(output_pdf_path, suffix_to_merge) if another_file_path: paths = [output_pdf_path, another_file_path] # 需要合并的 PDF 文件路径 merge_pdfs(paths, output_pdf_path) os.remove(another_file_path) print(f"文件 {another_file_path} 已删除。") def find_and_merge(target_path, output_suffix): # 获取 target_path 所在的目录 directory = os.path.dirname(target_path) full_path="" # 遍历目录中的所有文件,寻找以 output_suffix 结尾的文件 for filename in os.listdir(directory): if filename.endswith(output_suffix): # 拼接目录路径和文件名,生成完整路径 full_path = os.path.join(directory, filename) if not full_path: paths=[target_path] else: paths=[target_path,full_path] merge_pdfs(paths,target_path) def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'): """ 通用的 PDF 合并函数,根据不同的模式合并指定的文件。 参数: - output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径。 - truncate_files (list): 包含 PDF 文件路径的列表。 - output_path (str): 合并后的 PDF 文件保存路径。 - base_file_name (str): 用于匹配文件名的基础名称。 - mode (str): 合并模式,支持 'engineering' 和 'goods'。 返回: - str: 如果合并成功,返回 output_path;否则,返回空字符串 ""。 """ try: all_output_files = os.listdir(output_folder) except FileNotFoundError: print(f"输出文件夹 '{output_folder}' 未找到。") return "" except PermissionError: print(f"没有权限访问输出文件夹 '{output_folder}'。") return "" if mode == 'engineering': required_suffixes = [ f'{base_file_name}_before.pdf', f'{base_file_name}_notice.pdf', f'{base_file_name}_tobidders_notice_part1.pdf' ] optional_suffixes = [] elif mode == 'goods': required_suffixes = [ f'{base_file_name}_before.pdf', f'{base_file_name}_notice.pdf', f'{base_file_name}_tobidders_notice_part1.pdf' ] optional_suffixes = [ f'{base_file_name}_tobidders_notice_part2.pdf' ] else: print(f"未知的合并模式: {mode}") return "" all_pdfs_to_merge = [] missing_files = [] # 处理必需的文件 for suffix in required_suffixes: if suffix == f'{base_file_name}_before.pdf': matching_files = [ os.path.join(output_folder, f) for f in all_output_files if f.endswith(suffix) ] else: matching_files = [f for f in truncate_files if f.endswith(suffix)] if matching_files: matching_files_sorted = sorted(matching_files) all_pdfs_to_merge.extend(matching_files_sorted) for f in matching_files_sorted: print(f"选中文件: {f}") else: print(f"没有找到以 '{suffix}' 结尾的文件。") missing_files.append(suffix) # 处理可选的文件 for suffix in optional_suffixes: if suffix == f'{base_file_name}_before.pdf': matching_files = [ os.path.join(output_folder, f) for f in all_output_files if f.endswith(suffix) ] else: matching_files = [f for f in truncate_files if f.endswith(suffix)] if matching_files: matching_files_sorted = sorted(matching_files) all_pdfs_to_merge.extend(matching_files_sorted) for f in matching_files_sorted: print(f"选中文件: {f}") else: print(f"可选文件 '{suffix}' 未找到,继续合并。") # 检查是否所有必需的文件都已找到 if missing_files: print("缺少以下必要的 PDF 文件,无法进行合并:") for missing in missing_files: print(f" - {missing}") return "" print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}") # 过滤掉不存在或为空的文件路径 all_pdfs_to_merge = [f for f in all_pdfs_to_merge if os.path.isfile(f) and os.path.getsize(f) > 0] if not all_pdfs_to_merge: print("没有找到要合并的有效 PDF 文件。") return "" # 调用 merge_pdfs 函数进行合并 try: merge_pdfs(all_pdfs_to_merge, output_path) print(f"已成功合并 PDF 文件到 '{output_path}'。") except Exception as e: print(f"合并 PDF 文件时出错: {e}") return "" # 检查合并后的文件是否存在且不为空 if os.path.exists(output_path) and os.path.getsize(output_path) > 0: before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf") if os.path.exists(before_pdf_path): try: os.remove(before_pdf_path) print(f"已删除文件: {before_pdf_path}") except Exception as e: print(f"删除文件 {before_pdf_path} 时出错: {e}") else: print(f"未找到要删除的文件: {before_pdf_path}") return output_path else: print(f"合并失败,没有生成 '{output_path}'。") return "" if __name__ == "__main__": path1=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_evaluation_method.pdf' path2=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_qualification.pdf' output_folder=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp' output=os.path.join(output_folder,"merged_qualification.pdf") path=[path1,path2] output_path=merge_pdfs(path,output) print(output_path)