import re import threading import fitz from PyPDF2 import PdfReader from docx import Document import queue from queue import Queue, Empty def extract_common_header(pdf_path): """ 提取 PDF 文件的公共页眉。 参数: - pdf_path: PDF 文件路径。 返回: - common_header: 公共页眉内容,字符串。如果未找到,则返回空字符串。 """ def get_headers(pdf_document, start_page, pages_to_read, is_pypdf2): headers = [] for i in range(start_page, min(start_page + pages_to_read, len(pdf_document.pages) if is_pypdf2 else pdf_document.page_count)): try: if is_pypdf2: page = pdf_document.pages[i] text = page.extract_text() or "" else: page = pdf_document.load_page(i) text = page.get_text() or "" if text: # 只取每页的前三行,去除前后的空白字符 first_lines = [line.strip() for line in text.strip().split('\n')[:3]] headers.append(first_lines) except Exception as e: print(f"提取第 {i} 页文本时出错: {e}") continue return headers def find_common_headers(headers): if not headers: return [] # 转置,使得每一行对应所有页的同一行 transposed_headers = list(zip(*headers)) common_headers = [] for lines in transposed_headers: # 将每行按空格分割成部分 split_lines = [line.split() for line in lines] # 找出所有行中最短的部分数 min_parts = min(len(parts) for parts in split_lines) if min_parts == 0: continue common_parts = [] for part_idx in range(min_parts): # 获取当前部分在所有行中的值 current_parts = [parts[part_idx] for parts in split_lines] # 检查所有部分是否相同 if all(part == current_parts[0] for part in current_parts[1:]): common_parts.append(current_parts[0]) else: break # 如果某部分不相同,停止进一步比较 if common_parts: # 将共同的部分重新组合成字符串 common_header_line = ' '.join(common_parts) if len(common_header_line) >= 5: # 可以根据实际情况调整最小长度 common_headers.append(common_header_line) return common_headers try: # 尝试使用 PyPDF2 读取 PDF try: pdf_document = PdfReader(pdf_path) total_pages = len(pdf_document.pages) is_pypdf2 = True # print("使用 PyPDF2 成功读取 PDF 文件。") except Exception as e_pypdf2: print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}") try: # 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF pdf_document = fitz.open(pdf_path) total_pages = pdf_document.page_count is_pypdf2 = False # print("使用 PyMuPDF 成功读取 PDF 文件。") except Exception as e_fitz: print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}") return "" # 或者根据需求抛出异常 # 定义两个提取策略 strategies = [] if total_pages >= 3: # 策略1:中间的3页 middle_page = total_pages // 2 start_page = max(0, middle_page - 1) strategies.append((start_page, 3)) elif total_pages == 2: # 策略1:2页 strategies.append((0, 2)) else: # 策略1:1页 strategies.append((0, 1)) # 策略2:前三页 if total_pages >= 3: strategies.append((0, 3)) elif total_pages == 2: strategies.append((0, 2)) elif total_pages == 1: strategies.append((0, 1)) common_headers = [] for idx, (start, count) in enumerate(strategies): headers = get_headers(pdf_document, start, count, is_pypdf2) if len(headers) < 2: continue # 需要至少2页来比较 current_common = find_common_headers(headers) if current_common: common_headers = current_common # print(f"使用策略{idx + 1}找到共同的页眉: {common_headers}") break # 找到共同部分后退出 # 如果没有找到,继续下一个策略 return '\n'.join(common_headers) # 执行策略1和策略2:先执行策略1(中间3页或根据页数调整),然后执行策略2(前三页或根据页数调整)。 # 获取两个策略的公共前缀:分别获取两个策略提取的公共页眉。 # 选择最长的公共前缀:如果两个策略的前缀之间存在包含关系,选择最长的那个。如果没有包含关系,则返回策略1提取的前缀。 # headers_results = [] # # for idx, (start, count) in enumerate(strategies): # headers = get_headers(pdf_document, start, count, is_pypdf2) # if len(headers) < 2: # continue # 需要至少2页来比较 # # current_common = find_common_headers(headers) # if current_common: # headers_results.append(current_common) # # 不再中断,继续执行下一个策略 # # 如果没有找到,继续下一个策略 # # if not headers_results: # return "" # 没有找到任何公共页眉 # # 现在有两个(或一个)策略的公共页眉,选择最长的公共前缀 # # 首先,取出所有策略的公共页眉,并将其转换为单个字符串 # headers_strategies = ['\n'.join(headers) for headers in headers_results] # # 找到最长的公共前缀 # longest_common = max(headers_strategies, key=lambda s: len(s)) # # # 检查其他策略的前缀是否是最长前缀的子集 # is_contained = all(longest_common.startswith(other) for other in headers_strategies) # # if is_contained: # # 如果所有策略的前缀都是最长前缀的子集,返回最长前缀 # return longest_common # else: # # 如果没有包含关系,返回策略1的前缀(即第一个策略的结果) # return headers_strategies[0] except Exception as e: print(f"Error in extract_common_header: {e}") return "" # 根据需求调整返回值 def clean_page_content(text, common_header): # 首先删除抬头公共部分 if common_header: # 确保有公共抬头才进行替换 for header_line in common_header.split('\n'): if header_line.strip(): # 只处理非空行 # 替换首次出现的完整行 text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1) # 预处理:删除文本开头的所有空白字符(包括空格、制表符等) text = text.lstrip() # 删除文本开头的“第 x 页”格式的页码 text = re.sub(r'^第\s*\d+\s*页\s*', '', text) # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除 text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时 投标人须知这块, 页码和顶部序号混在一起的时候也会把序号给去除了。'2018.' 20为页码 18.为序号 text = re.sub(r'^\s*\/?\s*(共\s*)?\d+\s*(页)?\s*', '', text) #删除/123 /共123 /共123页 /123页 text = re.sub(r'^\s*[—-]\s*(第\s*)?\d{1,3}\s*(页)?\s*[—-]\s*', '', text) # 删除形如 '—2—', '-2-', 或 '-第2页-' 的页码 return text def read_page_text_with_timeout(page, timeout=3): """ 线程安全的带超时文本提取(改进版) 特性: 1. 使用Queue保证线程间通信安全 2. 添加双重超时控制 3. 完善的资源清理机制 """ result_queue = Queue(maxsize=1) done_flag = threading.Event() def extract_worker(): try: # 强制设置文本提取上限(防止底层库卡死) txt = page.extract_text() or "" result_queue.put(txt) except Exception as e: print(f"文本提取异常: {str(e)}") result_queue.put("") finally: done_flag.set() # 确保事件触发 # 启动工作线程 worker = threading.Thread( target=extract_worker, daemon=True, name=f"PDFTextExtract-{id(page)}" ) worker.start() # 双重等待机制 try: # 第一阶段:等待事件标志 if done_flag.wait(timeout): # 第二阶段:立即获取结果 return result_queue.get_nowait() # 超时处理流程 print(f"文本提取超时({timeout}s),终止操作") return None except Empty: # 罕见情况:事件触发但队列无数据 print("队列数据异常,返回空文本") return "" finally: # 资源清理 if worker.is_alive(): try: worker._stop() # 强制终止(慎用) except Exception: pass def is_scanned_pdf(file_path, max_pages=15, page_timeout=3, overall_timeout=30): """ 检查 PDF 是否为扫描件。逻辑: 1. 逐页读取,若某页提取文本超时(>page_timeout秒),直接判定为 True(假设超时为扫描件)。 2. 如果正常提取到文本且文本不为空,则判定为 False(非扫描件),立即返回。 3. 如果前 max_pages 页都检测完成,均无可见文本,则返回 True(认为是扫描件)。 4. 如果需要整体超时(overall_timeout),则在最外层加一个封装线程进行控制。 """ result_queue = queue.Queue() done_event = threading.Event() def core_check(): try: with open(file_path, 'rb') as f: reader = PdfReader(f) for i, page in enumerate(reader.pages): if i >= max_pages or done_event.is_set(): break text = read_page_text_with_timeout(page, page_timeout) if text is None: print(f"Page {i + 1} timeout") result_queue.put(True) return if text.strip(): print(f"Text found on page {i + 1}") result_queue.put(False) return result_queue.put(True) except Exception as e: print(f"Processing error: {str(e)}") result_queue.put(True) finally: done_event.set() thread = threading.Thread(target=core_check, daemon=True) thread.start() try: result = result_queue.get(timeout=overall_timeout) except queue.Empty: print(f"Overall timeout after {overall_timeout}s") return True # 或定义特殊超时状态码 return result def is_pure_image(docx_path, percentage=0.3): """ 判断 docx 文件是否为纯图片。 先计算文档中段落数量,然后取前 percentage(默认30%)的段落进行判断。 如果这些段落中没有文本,则视为纯图片。 """ document = Document(docx_path) paragraphs = document.paragraphs total_paragraphs = len(paragraphs) # 计算需要判断的段落数,至少判断1个段落 check_count = max(1, int(total_paragraphs * percentage)) # 判断这部分段落是否含有文本 for paragraph in paragraphs[:check_count]: if paragraph.text.strip(): return False return True if __name__ == '__main__': pdf_path=r"C:\Users\Administrator\Desktop\ztbfile.pdf" res=is_scanned_pdf(pdf_path) if res: print("扫描型") else: print("普通型")