300 lines
12 KiB
Python
Raw Normal View History

2024-11-01 14:28:10 +08:00
import re
import threading
import fitz
2024-12-05 15:01:37 +08:00
from PyPDF2 import PdfReader
from docx import Document
2025-01-03 17:36:23 +08:00
def extract_common_header(pdf_path):
"""
提取 PDF 文件的公共页眉
参数
- pdf_path: PDF 文件路径
返回
- common_header: 公共页眉内容字符串如果未找到则返回空字符串
"""
def get_headers(pdf_document, start_page, pages_to_read, is_pypdf2):
2024-11-01 14:28:10 +08:00
headers = []
for i in range(start_page, min(start_page + pages_to_read,
len(pdf_document.pages) if is_pypdf2 else pdf_document.page_count)):
try:
if is_pypdf2:
page = pdf_document.pages[i]
text = page.extract_text() or ""
else:
page = pdf_document.load_page(i)
text = page.get_text() or ""
if text:
# 只取每页的前三行,去除前后的空白字符
first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
headers.append(first_lines)
except Exception as e:
print(f"提取第 {i} 页文本时出错: {e}")
continue
2024-11-01 14:28:10 +08:00
return headers
def find_common_headers(headers):
if not headers:
return []
2025-01-03 17:36:23 +08:00
# 转置,使得每一行对应所有页的同一行
transposed_headers = list(zip(*headers))
2024-11-01 14:28:10 +08:00
common_headers = []
2025-01-03 17:36:23 +08:00
for lines in transposed_headers:
# 将每行按空格分割成部分
split_lines = [line.split() for line in lines]
# 找出所有行中最短的部分数
min_parts = min(len(parts) for parts in split_lines)
if min_parts == 0:
continue
common_parts = []
for part_idx in range(min_parts):
# 获取当前部分在所有行中的值
current_parts = [parts[part_idx] for parts in split_lines]
# 检查所有部分是否相同
if all(part == current_parts[0] for part in current_parts[1:]):
common_parts.append(current_parts[0])
else:
break # 如果某部分不相同,停止进一步比较
if common_parts:
# 将共同的部分重新组合成字符串
common_header_line = ' '.join(common_parts)
if len(common_header_line) >= 5: # 可以根据实际情况调整最小长度
common_headers.append(common_header_line)
2024-11-01 14:28:10 +08:00
return common_headers
try:
# 尝试使用 PyPDF2 读取 PDF
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
is_pypdf2 = True
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
is_pypdf2 = False
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" # 或者根据需求抛出异常
# 定义两个提取策略
strategies = []
if total_pages >= 3:
# 策略1中间的3页
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
strategies.append((start_page, 3))
elif total_pages == 2:
# 策略12页
strategies.append((0, 2))
else:
# 策略11页
strategies.append((0, 1))
# 策略2前三页
if total_pages >= 3:
strategies.append((0, 3))
elif total_pages == 2:
strategies.append((0, 2))
elif total_pages == 1:
strategies.append((0, 1))
common_headers = []
for idx, (start, count) in enumerate(strategies):
headers = get_headers(pdf_document, start, count, is_pypdf2)
if len(headers) < 2:
continue # 需要至少2页来比较
current_common = find_common_headers(headers)
if current_common:
common_headers = current_common
# print(f"使用策略{idx + 1}找到共同的页眉: {common_headers}")
break # 找到共同部分后退出
# 如果没有找到,继续下一个策略
return '\n'.join(common_headers)
# 执行策略1和策略2先执行策略1中间3页或根据页数调整然后执行策略2前三页或根据页数调整
# 获取两个策略的公共前缀:分别获取两个策略提取的公共页眉。
# 选择最长的公共前缀如果两个策略的前缀之间存在包含关系选择最长的那个。如果没有包含关系则返回策略1提取的前缀。
# headers_results = []
#
# for idx, (start, count) in enumerate(strategies):
# headers = get_headers(pdf_document, start, count, is_pypdf2)
# if len(headers) < 2:
# continue # 需要至少2页来比较
#
# current_common = find_common_headers(headers)
# if current_common:
# headers_results.append(current_common)
# # 不再中断,继续执行下一个策略
# # 如果没有找到,继续下一个策略
#
# if not headers_results:
# return "" # 没有找到任何公共页眉
# # 现在有两个(或一个)策略的公共页眉,选择最长的公共前缀
# # 首先,取出所有策略的公共页眉,并将其转换为单个字符串
# headers_strategies = ['\n'.join(headers) for headers in headers_results]
# # 找到最长的公共前缀
# longest_common = max(headers_strategies, key=lambda s: len(s))
#
# # 检查其他策略的前缀是否是最长前缀的子集
# is_contained = all(longest_common.startswith(other) for other in headers_strategies)
#
# if is_contained:
# # 如果所有策略的前缀都是最长前缀的子集,返回最长前缀
# return longest_common
# else:
# # 如果没有包含关系返回策略1的前缀即第一个策略的结果
# return headers_strategies[0]
except Exception as e:
print(f"Error in extract_common_header: {e}")
return "" # 根据需求调整返回值
2024-11-01 14:28:10 +08:00
def clean_page_content(text, common_header):
# 首先删除抬头公共部分
if common_header: # 确保有公共抬头才进行替换
for header_line in common_header.split('\n'):
if header_line.strip(): # 只处理非空行
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
2025-01-03 17:36:23 +08:00
# 预处理:删除文本开头的所有空白字符(包括空格、制表符等)
text = text.lstrip()
# 删除文本开头的“第 x 页”格式的页码
text = re.sub(r'^第\s*\d+\s*页\s*', '', text)
2024-11-01 14:28:10 +08:00
# 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
2024-12-06 14:40:22 +08:00
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时 投标人须知这块, 页码和顶部序号混在一起的时候也会把序号给去除了。'2018.' 20为页码 18.为序号
2024-12-09 17:38:01 +08:00
text = re.sub(r'^\s*\/?\s*(共\s*)?\d+\s*(页)?\s*', '', text) #删除/123 /共123 /共123页 /123页
text = re.sub(r'^\s*[—-]\s*(第\s*)?\d{1,3}\s*(页)?\s*[—-]\s*', '', text) # 删除形如 '—2—', '-2-', 或 '-第2页-' 的页码
2024-12-05 15:01:37 +08:00
return text
def read_page_text_with_timeout(page, timeout=3):
"""
尝试在子线程里执行 page.extract_text()
如果超过 `timeout` 秒没有完成就返回 None表示超时
如果正常完成就返回提取到的文本
2024-12-06 09:25:24 +08:00
"""
done = threading.Event()
result = []
def wrapper():
try:
txt = page.extract_text() # 可能会卡住的操作
result.append(txt)
except Exception as e:
print("提取文本时出错:", e)
result.append("")
finally:
done.set()
# 启动后台线程
thread = threading.Thread(target=wrapper, daemon=True)
thread.start()
2024-12-06 09:25:24 +08:00
# 等待提取结果,超过 timeout 秒则视为超时
finished_in_time = done.wait(timeout)
if not finished_in_time:
print(f"单页文本提取超时(超过 {timeout} 秒)!")
return None
2024-12-06 09:25:24 +08:00
return result[0] if result else ""
def is_scanned_pdf(file_path, max_pages=15, page_timeout=3, overall_timeout=30):
"""
检查 PDF 是否为扫描件逻辑:
1. 逐页读取若某页提取文本超时(>page_timeout秒)直接判定为 True(假设超时为扫描件)
2. 如果正常提取到文本且文本不为空则判定为 False非扫描件立即返回
3. 如果前 max_pages 页都检测完成均无可见文本则返回 True认为是扫描件
4. 如果需要整体超时overall_timeout则在最外层加一个封装线程进行控制
2024-12-06 09:25:24 +08:00
"""
def core_check(result_container, done_event):
"""
真正的核心逻辑执行完后把结果塞进 result_container[0]
然后调用 done_event.set() 告知整个检查流程结束
"""
try:
with open(file_path, 'rb') as f:
reader = PdfReader(f)
for i, page in enumerate(reader.pages):
if i >= max_pages:
break
# 尝试在 page_timeout 内获取文本
text = read_page_text_with_timeout(page, page_timeout)
if text is None:
print(f"{i+1} 页文本提取超时,直接判定为扫描件。")
result_container[0] = True
done_event.set()
return
if text.strip():
print(f"{i+1} 页检测到文本,判定为非扫描件。")
result_container[0] = False
done_event.set()
return
except Exception as e:
print("处理 PDF 文件时发生异常:", e)
result_container[0] = True
done_event.set()
return
print(f"{max_pages} 页均未检测到文本,判定为扫描件。")
result_container[0] = True
done_event.set()
result_container = [None] # 用于在子线程中传递结果
done = threading.Event()
thread = threading.Thread(target=core_check, args=(result_container, done), daemon=True)
thread.start()
# 等待整体流程结束,最多等待 overall_timeout 秒
finished_in_time = done.wait(overall_timeout)
if not finished_in_time:
print(f"整体检查超时(超过 {overall_timeout} 秒),返回默认结果。")
return True # 或者根据需要返回其它默认值
# 打印最终结果调试信息
if result_container[0]:
print("最终结果:认为是扫描件(未检测到有效文本或发生单页超时)")
else:
print("最终结果:认为非扫描件(检测到有效文本)")
return result_container[0]
def is_pure_image(docx_path, percentage=0.3):
"""
判断 docx 文件是否为纯图片
先计算文档中段落数量然后取前 percentage(默认30%)的段落进行判断
如果这些段落中没有文本则视为纯图片
"""
document = Document(docx_path)
paragraphs = document.paragraphs
total_paragraphs = len(paragraphs)
# 计算需要判断的段落数至少判断1个段落
check_count = max(1, int(total_paragraphs * percentage))
# 判断这部分段落是否含有文本
for paragraph in paragraphs[:check_count]:
if paragraph.text.strip():
return False
return True
2024-12-06 09:25:24 +08:00
2024-12-05 15:01:37 +08:00
if __name__ == '__main__':
pdf_path=r"C:\Users\Administrator\Desktop\ztbfile.pdf"
res=is_scanned_pdf(pdf_path)
2024-12-05 15:01:37 +08:00
if res:
print("扫描型")
else:
print("普通型")