324 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import threading
import fitz
from PyPDF2 import PdfReader
from docx import Document
import queue
from queue import Queue, Empty
# 定义一个函数用于提取文本
def get_text_pypdf2(pdf_doc, page_num):
return pdf_doc.pages[page_num].extract_text() or ""
def get_text_fitz(pdf_doc, page_num):
return pdf_doc.load_page(page_num).get_text() or ""
# 定义一个内部函数来绑定 pdf_document 对象
def create_get_text_function(pdf_lib, pdf_doc):
if pdf_lib == 'pypdf2':
return lambda page_num: get_text_pypdf2(pdf_doc, page_num)
elif pdf_lib == 'fitz':
return lambda page_num: get_text_fitz(pdf_doc, page_num)
else:
raise ValueError("不支持的 PDF 库。")
def extract_common_header(pdf_path):
"""
提取 PDF 文件的公共页眉。
参数:
- pdf_path: PDF 文件路径。
返回:
- common_header: 公共页眉内容,字符串。如果未找到,则返回空字符串。
"""
def get_headers(get_text, start_page, pages_to_read,total_pages):
headers = []
for i in range(start_page, min(start_page + pages_to_read,total_pages)):
try:
text=get_text(i)
if text:
# 只取每页的前三行,去除前后的空白字符
first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
headers.append(first_lines)
except Exception as e:
print(f"提取第 {i} 页文本时出错: {e}")
continue
return headers
def find_common_headers(headers):
if not headers:
return []
# 转置,使得每一行对应所有页的同一行
transposed_headers = list(zip(*headers))
common_headers = []
for lines in transposed_headers:
# 将每行按空格分割成部分
split_lines = [line.split() for line in lines]
# 找出所有行中最短的部分数
min_parts = min(len(parts) for parts in split_lines)
if min_parts == 0:
continue
common_parts = []
for part_idx in range(min_parts):
# 获取当前部分在所有行中的值
current_parts = [parts[part_idx] for parts in split_lines]
# 检查所有部分是否相同
if all(part == current_parts[0] for part in current_parts[1:]):
common_parts.append(current_parts[0])
else:
break # 如果某部分不相同,停止进一步比较
if common_parts:
# 将共同的部分重新组合成字符串
common_header_line = ' '.join(common_parts)
if len(common_header_line) >= 5: # 可以根据实际情况调整最小长度
common_headers.append(common_header_line)
return common_headers
def extract_common_header_main(get_text,total_pages):
# 定义两个提取策略
strategies = []
if total_pages >= 3:
# 策略1中间的3页
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
strategies.append((start_page, 3))
elif total_pages == 2:
# 策略12页
strategies.append((0, 2))
else:
# 策略11页
strategies.append((0, 1))
# 策略2前三页
if total_pages >= 3:
strategies.append((0, 3))
elif total_pages == 2:
strategies.append((0, 2))
elif total_pages == 1:
strategies.append((0, 1))
common_headers = []
for idx, (start, pages_to_read) in enumerate(strategies):
headers = get_headers(get_text, start, pages_to_read,total_pages)
if len(headers) < 2:
continue # 需要至少2页来比较
current_common = find_common_headers(headers)
if current_common:
common_headers = current_common
# print(f"使用策略{idx + 1}找到共同的页眉: {common_headers}")
break # 找到共同部分后退出
# 如果没有找到,继续下一个策略
return '\n'.join(common_headers)
try:
# 尝试使用 PyPDF2 读取 PDF
try:
with open(pdf_path, "rb") as pdf_file:
pdf_document = PdfReader(pdf_file)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
return extract_common_header_main(get_text,total_pages)
except Exception as e_pypdf2:
print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
return extract_common_header_main(get_text,total_pages)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" # 或者根据需求抛出异常
# 执行策略1和策略2先执行策略1中间3页或根据页数调整然后执行策略2前三页或根据页数调整
# 获取两个策略的公共前缀:分别获取两个策略提取的公共页眉。
# 选择最长的公共前缀如果两个策略的前缀之间存在包含关系选择最长的那个。如果没有包含关系则返回策略1提取的前缀。
# headers_results = []
#
# for idx, (start, count) in enumerate(strategies):
# headers = get_headers(pdf_document, start, count, is_pypdf2)
# if len(headers) < 2:
# continue # 需要至少2页来比较
#
# current_common = find_common_headers(headers)
# if current_common:
# headers_results.append(current_common)
# # 不再中断,继续执行下一个策略
# # 如果没有找到,继续下一个策略
#
# if not headers_results:
# return "" # 没有找到任何公共页眉
# # 现在有两个(或一个)策略的公共页眉,选择最长的公共前缀
# # 首先,取出所有策略的公共页眉,并将其转换为单个字符串
# headers_strategies = ['\n'.join(headers) for headers in headers_results]
# # 找到最长的公共前缀
# longest_common = max(headers_strategies, key=lambda s: len(s))
#
# # 检查其他策略的前缀是否是最长前缀的子集
# is_contained = all(longest_common.startswith(other) for other in headers_strategies)
#
# if is_contained:
# # 如果所有策略的前缀都是最长前缀的子集,返回最长前缀
# return longest_common
# else:
# # 如果没有包含关系返回策略1的前缀即第一个策略的结果
# return headers_strategies[0]
except Exception as e:
print(f"Error in extract_common_header: {e}")
return "" # 根据需求调整返回值
def clean_page_content(text, common_header):
# 首先删除抬头公共部分
if common_header: # 确保有公共抬头才进行替换
for header_line in common_header.split('\n'):
if header_line.strip(): # 只处理非空行
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 预处理:删除文本开头的所有空白字符(包括空格、制表符等)
text = text.lstrip()
# 删除文本开头的“第 x 页”格式的页码
text = re.sub(r'^第\s*\d+\s*页\s*', '', text)
# 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时 投标人须知这块, 页码和顶部序号混在一起的时候也会把序号给去除了。'2018.' 20为页码 18.为序号
text = re.sub(r'^\s*\/?\s*(共\s*)?\d+\s*(页)?\s*', '', text) #删除/123 /共123 /共123页 /123页
text = re.sub(r'^\s*[—-]\s*(第\s*)?\d{1,3}\s*(页)?\s*[—-]\s*', '', text) # 删除形如 '—2—', '-2-', 或 '-第2页-' 的页码
return text
def read_page_text_with_timeout(page, timeout=3):
"""
线程安全的带超时文本提取(改进版)
特性:
1. 使用Queue保证线程间通信安全
2. 添加双重超时控制
3. 完善的资源清理机制
"""
result_queue = Queue(maxsize=1)
done_flag = threading.Event()
def extract_worker():
try:
# 强制设置文本提取上限(防止底层库卡死)
txt = page.extract_text() or ""
result_queue.put(txt)
except Exception as e:
print(f"文本提取异常: {str(e)}")
result_queue.put("")
finally:
done_flag.set() # 确保事件触发
# 启动工作线程
worker = threading.Thread(
target=extract_worker,
daemon=True,
name=f"PDFTextExtract-{id(page)}"
)
worker.start()
# 双重等待机制
try:
# 第一阶段:等待事件标志
if done_flag.wait(timeout):
# 第二阶段:立即获取结果
return result_queue.get_nowait()
# 超时处理流程
print(f"文本提取超时({timeout}s终止操作")
return None
except Empty:
# 罕见情况:事件触发但队列无数据
print("队列数据异常,返回空文本")
return ""
finally:
# 资源清理
if worker.is_alive():
try:
worker._stop() # 强制终止(慎用)
except Exception:
pass
def is_scanned_pdf(file_path, max_pages=15, page_timeout=3, overall_timeout=30):
"""
检查 PDF 是否为扫描件。逻辑:
1. 逐页读取,若某页提取文本超时(>page_timeout秒),直接判定为 True(假设超时为扫描件)。
2. 如果正常提取到文本且文本不为空,则判定为 False非扫描件立即返回。
3. 如果前 max_pages 页都检测完成,均无可见文本,则返回 True认为是扫描件
4. 如果需要整体超时overall_timeout则在最外层加一个封装线程进行控制。
"""
result_queue = queue.Queue()
done_event = threading.Event()
def core_check():
try:
with open(file_path, 'rb') as f:
reader = PdfReader(f)
for i, page in enumerate(reader.pages):
if i >= max_pages or done_event.is_set():
break
text = read_page_text_with_timeout(page, page_timeout)
if text is None:
print(f"Page {i + 1} timeout")
result_queue.put(True)
return
if text.strip():
print(f"Text found on page {i + 1}")
result_queue.put(False)
return
result_queue.put(True)
except Exception as e:
print(f"Processing error: {str(e)}")
result_queue.put(True)
finally:
done_event.set()
thread = threading.Thread(target=core_check, daemon=True)
thread.start()
try:
result = result_queue.get(timeout=overall_timeout)
except queue.Empty:
print(f"Overall timeout after {overall_timeout}s")
return True # 或定义特殊超时状态码
return result
def is_pure_image(docx_path, percentage=0.3):
"""
判断 docx 文件是否为纯图片。
先计算文档中段落数量,然后取前 percentage(默认30%)的段落进行判断。
如果这些段落中没有文本,则视为纯图片。
"""
document = Document(docx_path)
paragraphs = document.paragraphs
total_paragraphs = len(paragraphs)
# 计算需要判断的段落数至少判断1个段落
check_count = max(1, int(total_paragraphs * percentage))
# 判断这部分段落是否含有文本
for paragraph in paragraphs[:check_count]:
if paragraph.text.strip():
return False
return True
if __name__ == '__main__':
pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\971a834e-5cf5-4259-ad85-320aaca051e0\ztbfile.pdf"
res=is_scanned_pdf(pdf_path)
if res:
print("扫描型")
else:
print("普通型")