# flask_app/general/截取pdf通用函数.py import concurrent.futures import os import fitz import regex from PyPDF2 import PdfReader, PdfWriter from flask_app.general.clean_pdf import extract_common_header, clean_page_content from flask_app.general.format_change import docx2pdf import concurrent.futures # 定义一个函数用于提取文本 def get_text_pypdf2(pdf, page_num): return pdf.pages[page_num].extract_text() or "" def get_text_fitz(pdf, page_num): return pdf.load_page(page_num).get_text() or "" # 定义一个内部函数来绑定 pdf_document 对象 def create_get_text_function(pdf_lib, pdf_doc): if pdf_lib == 'pypdf2': return lambda page_num: get_text_pypdf2(pdf_doc, page_num) elif pdf_lib == 'fitz': return lambda page_num: get_text_fitz(pdf_doc, page_num) else: raise ValueError("不支持的 PDF 库。") def get_start_and_common_header(input_path, end_page): """ 提取 PDF 文件的公共页眉和起始页索引。 参数: - input_path: PDF 文件路径。 - end_page: 遍历的结束页码。 返回: - common_header: 公共页眉内容。 - last_begin_index: 起始页的页码索引。 """ try: # 提取公共页眉 common_header = extract_common_header(input_path) # 假设该函数已定义 last_begin_index = 0 begin_pattern = regex.compile( r'.*(? end_page: return common_header, 0 # 如果页码大于 end_page,直接返回 last_begin_index=0 text = get_text(i) if text: cleaned_text = clean_page_content(text, common_header) # 检查是否存在"目录" if catalog_pattern.search(cleaned_text): # print(f"页面 {i} 包含目录,跳过。") continue # 如果存在目录,跳过当前页面 if begin_pattern.search(cleaned_text): last_begin_index = i # 更新第一个匹配的索引,页码从0开始 print(f"匹配到起始模式,设置 last_begin_index 为: {last_begin_index}") return common_header, last_begin_index return common_header, last_begin_index except Exception as e: print(f"Error in get_start_and_common_header: {e}") return "", 0 # 根据需求调整返回值 def check_pdf_pages(pdf_path, mode, logger): try: try: pdf_document = PdfReader(pdf_path) num_pages = len(pdf_document.pages) # logger.info(f"使用 PyPDF2 成功读取 PDF '{pdf_path}' 的总页数为: {num_pages}") except Exception as e_pypdf2: print(f"check_pdf_pages:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}") pdf_document = fitz.open(pdf_path) num_pages = pdf_document.page_count if num_pages <= 30: logger.info("PDF(invalid_path)页数小于或等于30页,跳过切分逻辑。") if mode == 'goods': return True, ['', '', '', '', '', '', pdf_path, ''] else: return True, ['', '', '', '', '', pdf_path, ''] # 若页数大于30页,返回None表示继续处理 return False, [] except Exception as e: logger.error(f"无法读取 PDF 页数: {e}") # 返回空列表意味着无法执行后续处理逻辑 if mode == 'goods': return True, ['', '', '', '', '', '', pdf_path, ''] else: return True, ['', '', '', '', '', pdf_path, ''] def save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header): try: if start_page is None or end_page is None: print("错误: start_page 或 end_page 为 None") return "" # 尝试使用 PyPDF2 读取 PDF try: pdf_document = PdfReader(pdf_path) total_pages = len(pdf_document.pages) get_text = lambda p: pdf_document.pages[p].extract_text() using_pypdf2 = True # print("使用 PyPDF2 成功读取 PDF 文件。") except Exception as e_pypdf2: print(f"save_extracted_pages:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}") # 尝试使用 PyMuPDF 读取 PDF try: pdf_document = fitz.open(pdf_path) total_pages = pdf_document.page_count get_text = lambda p: pdf_document.load_page(p).get_text() using_pypdf2 = False # print("使用 PyMuPDF 成功读取 PDF 文件。") except Exception as e_fitz: print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}") return "" # 检查页面范围的有效性 if start_page < 0 or end_page >= total_pages or start_page > end_page: print(f"无效的页面范围: {start_page} 到 {end_page}") return "" base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf") if output_suffix == 'notice' and start_page > 0: before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf") toc_page = -1 for page_num in range(min(start_page, total_pages)): try: text = get_text(page_num) except Exception as e_extract: print(f"提取第 {page_num} 页文本时出错: {e_extract}") continue cleaned_text = clean_page_content(text, common_header) if regex.search(r'目\s*录', cleaned_text, regex.MULTILINE): toc_page = page_num break pages_to_extract = toc_page + 1 if toc_page != -1 else start_page if using_pypdf2: before_doc = PdfWriter() for page_num in range(pages_to_extract): try: before_doc.add_page(pdf_document.pages[page_num]) except Exception as e_page: print(f"添加第 {page_num} 页到 before_doc 时出错: {e_page}") continue try: with open(before_pdf_path, 'wb') as f_before: before_doc.write(f_before) print(f"目录前的页面已保存到 {before_pdf_path}") except Exception as e_write_before: print(f"保存目录前的页面时出错: {e_write_before}") else: before_doc = fitz.open() for page_num in range(pages_to_extract): try: before_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num) except Exception as e_page: print(f"添加第 {page_num} 页到 before_doc 时出错: {e_page}") continue try: before_doc.save(before_pdf_path) before_doc.close() print(f"目录前的页面已保存到 {before_pdf_path}") except Exception as e_write_before: print(f"保存目录前的页面时出错: {e_write_before}") # 提取并保存目标页面 if using_pypdf2: output_doc = PdfWriter() for page_num in range(start_page, end_page + 1): try: output_doc.add_page(pdf_document.pages[page_num]) except Exception as e_page: print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}") continue try: with open(output_pdf_path, 'wb') as f: output_doc.write(f) print(f"{output_suffix} 已截取并保存页面从 {start_page} 到 {end_page} 为 {output_pdf_path}") return output_pdf_path except Exception as e_write: print(f"保存截取的页面时出错: {e_write}") return "" else: output_doc = fitz.open() for page_num in range(start_page, end_page + 1): try: output_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num) except Exception as e_page: print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}") continue try: output_doc.save(output_pdf_path) output_doc.close() print(f"{output_suffix} 已截取并保存页面从 {start_page} 到 {end_page} 为 {output_pdf_path}") return output_pdf_path except Exception as e_write: print(f"保存截取的页面时出错: {e_write}") return "" except Exception as e: print(f"发生未知错误: {e}") return "" def is_pdf_or_doc(filename): # 判断文件是否为PDF或Word文档 return filename.lower().endswith(('.pdf', '.doc', '.docx')) def convert_to_pdf(file_path): # 假设 docx2pdf 函数已经被定义,这里仅根据文件扩展名来决定是否需要转换 if file_path.lower().endswith(('.doc', '.docx')): return docx2pdf(file_path) return file_path def get_invalid_file(file_path, output_folder, common_header, begin_page): """ 从指定的PDF文件中提取无效部分并保存到输出文件夹中。 begin_pattern匹配开头 end_pattern匹配结尾 页数小于100不进行begin_pattern,默认start_page=0 页数大于200时end_pattern从前往后匹配 Args: file_path (str): 输入的PDF文件路径。 output_folder (str): 提取后文件的输出文件夹路径。 common_header (str): 公共头部文本,用于清理每页的内容。 Returns: list: 包含保存的文件路径的列表,如果提取失败则返回包含空字符串的列表。 """ # 定义开始模式 begin_patterns = [ regex.compile( r'.*(? 200: # print("总页数大于200页,结束页从前往后查找,跳过前30页。") #且先匹配合同 for pattern in end_patterns: for i in range(30, total_pages): text = page_texts[i] exclusion_pattern = regex.compile( r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制|评审要素|评审标准)\s*$', regex.MULTILINE) if regex.search(exclusion_pattern,text): continue if regex.search(pattern, text): # print(f"结束模式在第 {i + 1} 页匹配: {pattern.pattern}") return i else: # print("结束页从后往前查找,确保只剩前30页时停止。") for pattern in end_patterns: for i in range(total_pages - 1, 29, -1): text = page_texts[i] if regex.search(pattern, text): # print(f"结束模式在第 {i + 1} 页匹配: {pattern.pattern}") return i # 如果没有匹配到,设置end_page逻辑 if total_pages > 200: print(f"未找到结束模式,总页数大于200,平滑调整结束页为 {end_page}。" + file_path) return max(100, int(total_pages * 1 / 2)) elif 100 < total_pages <= 200: print("未找到结束模式,总页数在100到200之间,设置结束页为总页数的三分之二。" + file_path) return max(100, int(total_pages * 2 / 3)) else: print("未找到结束模式,设置结束页为最后一页。" + file_path) return total_pages - 1 # 根据总页数决定是否查找起始页 if total_pages < 100: # print("总页数少于100页,默认起始页为第一页。") start_page = 0 with concurrent.futures.ThreadPoolExecutor() as executor: future_end = executor.submit(find_end_page) end_page = future_end.result() else: with concurrent.futures.ThreadPoolExecutor() as executor: future_start = executor.submit(find_start_page, begin_page) future_end = executor.submit(find_end_page) start_page = future_start.result() end_page = future_end.result() # print(f"起始页: {start_page + 1}, 结束页: {end_page + 1}") # 验证页码范围 if start_page > end_page: print(f"无效的页码范围: 起始页 ({start_page + 1}) > 结束页 ({end_page + 1})") return "", 0 # 调用已实现的保存函数 output_path = save_extracted_pages( pdf_path=file_path, output_folder=output_folder, start_page=start_page, end_page=end_page, output_suffix="invalid", common_header=common_header ) # print(f"提取的页面 {start_page} 到 {end_page} 已保存至 {output_path}") return output_path, end_page except Exception as e: print(f"处理文件 {file_path} 时发生错误: {e}") return "", 0 def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None, output_suffix="normal", invalid_endpage=-1): start_page = None end_page = None flag = True try: # 尝试使用 PyPDF2 读取 PDF pdf_document = PdfReader(pdf_path) total_pages = len(pdf_document.pages) get_text = create_get_text_function('pypdf2', pdf_document) # print("使用 PyPDF2 成功读取 PDF 文件。") except Exception as e_pypdf2: print(f"extract_pages_generic:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}") try: # 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF pdf_document = fitz.open(pdf_path) total_pages = pdf_document.page_count get_text = create_get_text_function('fitz', pdf_document) # print("使用 PyMuPDF 成功读取 PDF 文件。") except Exception as e_fitz: print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}") return None, None # 或者根据需求抛出异常 end_limit = total_pages if invalid_endpage == -1 else min(invalid_endpage, total_pages) for i in range(begin_page, end_limit): text = get_text(i) if text: cleaned_text = clean_page_content(text, common_header) if output_suffix == "tobidders_notice2": #extract_pages_twice_tobidders_notice会调用该代码 catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE) if exclusion_pattern and flag and (start_page is not None) and regex.search(exclusion_pattern,cleaned_text): flag = False continue if begin_page == 0 and catalog_pattern.search(cleaned_text): continue # 如果存在目录,跳过当前页面 else: # 一般投标文件的编制、组成在'投标人须知前附表/正文'中,需要防止begin_pattern匹配到这部分内容,所以在start_page is None的时候使用该exclusion_pattern if exclusion_pattern and flag and (start_page is None) and regex.search(exclusion_pattern, cleaned_text): flag = False continue if start_page is None and regex.search(begin_pattern, cleaned_text): if (output_suffix == "notice" and i >= begin_page) or (output_suffix != "notice" and i > begin_page): start_page = i continue if start_page is not None: if output_suffix in ["tobidders_notice", "notice", "tobidders_notice2"]: # 使用列表判断多个匹配项 # 判断 end_pattern 是否匹配且当前页大于起始页 if regex.search(end_pattern, cleaned_text) and i > start_page: end_page = i break else: # 如果 end_pattern 匹配且 begin_pattern 不匹配 if regex.search(end_pattern, cleaned_text) and not regex.search(begin_pattern, cleaned_text): end_page = i break return start_page, end_page def generate_mid_pattern(chapter_type=None): """ 根据章节类型动态生成 mid_pattern。 参数: chapter_type (str, optional): 章节类型,如 '章' 或 '部分'。 返回: regex.Pattern: 编译后的正则表达式模式。 """ # 定义基础的 mid_pattern base_mid_pattern = ( r'^\s*(?:[((]\s*[一二12]?\s*[))]\s*[、..]*|' r'[一二12][、..]+|[、..]+)\s*(说\s*明|总\s*则|名\s*词\s*解\s*释|定\s*义)|' r'(? begin_page: start_page = i matched_text = match.group(0) # 获取整个匹配的文本 # 根据匹配到的内容识别是“章”还是“部分”,用于后续组合 mid_pattern if '章' in matched_text: chapter_type = '章' elif '部分' in matched_text: chapter_type = '部分' else: chapter_type = None # 未匹配到“章”或“部分” # ===== 动态生成 end_pattern_dynamic ===== end_pattern_dynamic = generate_end_pattern(extraction_stage, chapter_type) # ========== 动态生成 mid_pattern ========== combined_mid_pattern = generate_mid_pattern(chapter_type) # 找到 start_page 后,继续循环,进入下一页 continue # ========== 识别中间页 ========== if start_page is not None and mid_page is None and combined_mid_pattern: if regex.search(combined_mid_pattern, cleaned_text): mid_page = i # ========== 识别结束页 ========== if start_page is not None and mid_page is not None: # 当前使用的 end_pattern(上面根据 extraction_stage 和 chapter_type 选择好了) if regex.search(end_pattern_dynamic, cleaned_text): if i > mid_page: end_page = i break return start_page, mid_page, end_page def perform_extraction_and_save(start_page, mid_page, end_page): path1 = save_extracted_pages(pdf_path, output_folder, start_page, mid_page, "tobidders_notice_part1", common_header) if mid_page != end_page: path2 = save_extracted_pages(pdf_path, output_folder, mid_page, end_page, "tobidders_notice_part2", common_header) else: path2 = path1 return path1, path2 try: pdf_document = PdfReader(pdf_path) total_pages = len(pdf_document.pages) except Exception as e_pypdf2: print(f"extract_pages_tobidders_notice:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}") try: pdf_document = fitz.open(pdf_path) total_pages = pdf_document.page_count # print("使用 PyMuPDF 成功读取 PDF 文件。") except Exception as e_fitz: print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}") return "", "" # 或者根据需求抛出异常 # 第一次提取尝试,使用初始的 begin_pattern begin_pattern = regex.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+|' r'(?start_page>begin_page start_page2, end_page2 = extract_pages_generic(pdf_path, second_begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern, output_suffix) if start_page2 is None or end_page2 is None: return start_page1, end_page1, end_page1,end_page1 return start_page1, end_page1, start_page2,end_page2 #return start_page1, end_page1, end_page1 def get_notice(pdf_path, output_folder, begin_page, common_header,invalid_endpage): begin_pattern = regex.compile( r'.*(?