2.16 尝试解决内存泄漏

This commit is contained in:
zy123 2025-02-16 18:09:45 +08:00
parent 71f6f9baf7
commit 90e79d312c
7 changed files with 552 additions and 602 deletions

View File

@ -1,8 +1,7 @@
import os
import fitz
from PyPDF2 import PdfReader, PdfWriter
from flask_app.general.截取pdf通用函数 import create_get_text_function
from flask_app.general.读取文件.clean_pdf import create_get_text_function
#合并PDF
# 主合并函数,尝试使用 PyPDF2若失败则使用 fitz
@ -31,21 +30,24 @@ def merge_with_pypdf2(paths, output_path):
if not path.strip():
continue
try:
pdf_reader = PdfReader(path)
pages = pdf_reader.pages
get_text = create_get_text_function('pypdf2', pdf_reader)
with open(path, "rb") as f:
pdf_reader = PdfReader(f)
pages = pdf_reader.pages
get_text = create_get_text_function('pypdf2', pdf_reader)
start_index = 0
if last_page_text is not None and len(pages) > 0:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
start_index = 0
# 如果上一份 PDF 的最后一页文本与当前 PDF 的第一页文本相同,则跳过当前 PDF 的第一页
if last_page_text is not None and pages:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
for page_num in range(start_index, len(pages)):
pdf_writer.add_page(pages[page_num])
for page_num in range(start_index, len(pages)):
pdf_writer.add_page(pages[page_num])
if len(pages) > 0:
last_page_text = get_text(len(pages) - 1)
# 更新 last_page_text 为当前 PDF 的最后一页文本
if pages:
last_page_text = get_text(len(pages) - 1)
except Exception as e:
print(f"文件 '{path}' 无法使用 PyPDF2 处理,错误: {e}")
continue
@ -58,51 +60,44 @@ def merge_with_pypdf2(paths, output_path):
return output_path
def merge_with_fitz(paths, output_path):
# 使用 fitz 创建新文档
merged_pdf = fitz.open()
# 使用 fitz 创建新文档,作为上下文管理器自动释放资源
last_page_text = None
with fitz.open() as merged_pdf:
for path in paths:
if not path.strip():
continue
try:
# 使用 with 确保 pdf_doc 在处理后自动关闭
with fitz.open(path) as pdf_doc:
get_text = create_get_text_function('fitz', pdf_doc)
page_count = pdf_doc.page_count
start_index = 0
if last_page_text is not None and page_count > 0:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
# 插入页面到新文档
for page_num in range(start_index, page_count):
merged_pdf.insert_pdf(pdf_doc, from_page=page_num, to_page=page_num)
if page_count > 0:
last_page_text = get_text(page_count - 1)
except Exception as e:
print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}")
continue
# 若合并后的文档为空,则直接返回空字符串
if merged_pdf.page_count == 0:
return ""
for path in paths:
if not path.strip():
continue
pdf_doc = None
try:
pdf_doc = fitz.open(path)
get_text = create_get_text_function('fitz', pdf_doc)
page_count = pdf_doc.page_count
start_index = 0
if last_page_text is not None and page_count > 0:
current_first_page_text = get_text(0)
if current_first_page_text == last_page_text:
start_index = 1
# 插入页面到新文档
for page_num in range(start_index, page_count):
merged_pdf.insert_pdf(pdf_doc, from_page=page_num, to_page=page_num)
if page_count > 0:
last_page_text = get_text(page_count - 1)
pdf_doc.close()
merged_pdf.save(output_path)
return output_path
except Exception as e:
print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}")
finally:
if pdf_doc is not None:
pdf_doc.close()
if merged_pdf.page_count == 0:
merged_pdf.close()
return ""
try:
merged_pdf.save(output_path)
merged_pdf.close()
return output_path
except Exception as e:
print(f"无法写入输出文件,错误: {e}")
merged_pdf.close()
return ""
print(f"无法写入输出文件,错误: {e}")
return ""
def judge_file_exist(original_path, new_suffix):
# 提取目录路径和原始文件名
@ -145,6 +140,9 @@ def find_and_merge(target_path, output_suffix):
paths=[target_path,full_path]
merge_pdfs(paths,target_path)
#TODO: before应该是可选的
#如果报红 貌似会额外占内存
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'):
"""
通用的 PDF 合并函数根据不同的模式合并指定的文件
@ -169,20 +167,17 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
return ""
if mode == 'engineering':
required_suffixes = [
f'{base_file_name}_before.pdf',
f'{base_file_name}_notice.pdf',
f'{base_file_name}_tobidders_notice_part1.pdf'
suffixes = [
(f'{base_file_name}_before.pdf', False), # 可选
(f'{base_file_name}_notice.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part1.pdf', True) # 必需
]
optional_suffixes = []
elif mode == 'goods':
required_suffixes = [
f'{base_file_name}_before.pdf',
f'{base_file_name}_notice.pdf',
f'{base_file_name}_tobidders_notice_part1.pdf'
]
optional_suffixes = [
f'{base_file_name}_tobidders_notice_part2.pdf'
suffixes = [
(f'{base_file_name}_before.pdf', False), # 可选
(f'{base_file_name}_notice.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part1.pdf', True), # 必需
(f'{base_file_name}_tobidders_notice_part2.pdf', False) # 可选
]
else:
print(f"未知的合并模式: {mode}")
@ -191,8 +186,8 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
all_pdfs_to_merge = []
missing_files = []
# 处理必需的文件
for suffix in required_suffixes:
for suffix, required in suffixes:
# 如果是 before 文件,则从 output_folder 中查找,否则从 truncate_files 中查找
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
@ -208,35 +203,19 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"没有找到以 '{suffix}' 结尾的文件。")
missing_files.append(suffix)
if required:
print(f"没有找到以 '{suffix}' 结尾的文件。")
missing_files.append(suffix)
else:
print(f"可选文件 '{suffix}' 未找到,继续合并。")
# 处理可选的文件
for suffix in optional_suffixes:
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"可选文件 '{suffix}' 未找到,继续合并。")
# 检查是否所有必需的文件都已找到
if missing_files:
print("缺少以下必要的 PDF 文件,无法进行合并:")
for missing in missing_files:
print(f" - {missing}")
return ""
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
# 过滤掉不存在或为空的文件路径

View File

@ -96,17 +96,13 @@ def truncate_pdf_multiple(pdf_path, output_folder, logger,mode='goods',selection
)
if invalid_path:
truncate_files.append(invalid_path)
logger.info(f"已添加 invalid_path: {invalid_path}")
if merged_path:
# 合并成功,添加合并后的文件路径
truncate_files.append(merged_path)
logger.info(f"已成功合并 PDF 文件到 '{merged_output_path}'")
else:
# 合并失败,添加空字符串
truncate_files.append("")
logger.warning(f"合并失败,没有生成合并文件 for {pdf_path}")
logger.info("已截取文件路径: " + str(truncate_files))
return truncate_files

View File

@ -1,31 +1,15 @@
# flask_app/general/截取pdf通用函数.py
import concurrent.futures
import io
import os
import fitz
import regex
from PyPDF2 import PdfReader, PdfWriter
from flask_app.general.读取文件.clean_pdf import extract_common_header, clean_page_content
from flask_app.general.读取文件.clean_pdf import extract_common_header, clean_page_content, create_get_text_function, \
get_text_pypdf2
from flask_app.general.format_change import docx2pdf
import concurrent.futures
# 定义一个函数用于提取文本
def get_text_pypdf2(pdf, page_num):
return pdf.pages[page_num].extract_text() or ""
def get_text_fitz(pdf, page_num):
return pdf.load_page(page_num).get_text() or ""
# 定义一个内部函数来绑定 pdf_document 对象
def create_get_text_function(pdf_lib, pdf_doc):
if pdf_lib == 'pypdf2':
return lambda page_num: get_text_pypdf2(pdf_doc, page_num)
elif pdf_lib == 'fitz':
return lambda page_num: get_text_fitz(pdf_doc, page_num)
else:
raise ValueError("不支持的 PDF 库。")
def get_start_and_common_header(input_path, end_page):
"""
提取 PDF 文件的公共页眉和起始页索引
@ -38,237 +22,198 @@ def get_start_and_common_header(input_path, end_page):
- common_header: 公共页眉内容
- last_begin_index: 起始页的页码索引
"""
pdf_document = None
try:
# 提取公共页眉
# 提取公共页眉(假设该函数已定义)
common_header = extract_common_header(input_path)
except Exception as e:
print(f"提取公共页眉时出错: {e}")
return "", -1
common_header = extract_common_header(input_path) # 假设该函数已定义
# 定义匹配模式
begin_pattern = regex.compile(
r'.*(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:招标公告|磋商公告|谈判公告|邀请书|邀请函|投标邀请|磋商邀请|谈判邀请|采购公告)[\)]?\s*$',
regex.MULTILINE
)
catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE)
last_begin_index = -1
begin_pattern = regex.compile(
r'.*(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:招标公告|磋商公告|谈判公告|邀请书|邀请函|投标邀请|磋商邀请|谈判邀请|采购公告)[\)]?\s*$',
regex.MULTILINE
)
# 新增目录匹配模式
catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE)
try:
# 尝试使用 PyPDF2 读取 PDF
pdf_document = PdfReader(input_path)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"get_start_and_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(input_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return common_header, -1 # 或者根据需求抛出异常
# 遍历页面
# 内部函数:对 PDF 每一页进行遍历处理
def process_pages(get_text, total_pages):
for i in range(total_pages):
if i > end_page:
return common_header, -1 # 如果页码大于 end_page直接返回 last_begin_index=0
return -1 # 超出 end_page 范围则返回 -1
text = get_text(i)
if text:
cleaned_text = clean_page_content(text, common_header)
# 检查是否存在"目录"
# 如果页面中包含“目录”,则跳过
if catalog_pattern.search(cleaned_text):
# print(f"页面 {i} 包含目录,跳过。")
continue # 如果存在目录,跳过当前页面
continue
if begin_pattern.search(cleaned_text):
last_begin_index = i # 更新第一个匹配的索引页码从0开始
# print(f"匹配到起始模式,设置 last_begin_index 为: {last_begin_index}")
return common_header, last_begin_index
return common_header, last_begin_index
except Exception as e:
print(f"Error in get_start_and_common_header: {e}")
return "", -1 # 根据需求调整返回值
finally:
# 如果使用的是 PyMuPDF记得关闭文档
if pdf_document and hasattr(pdf_document, "close"):
pdf_document.close()
return i # 找到匹配的起始页返回页码从0开始
return -1 # 未找到匹配页
# 先尝试使用 PyPDF2 读取 PDF
try:
with open(input_path, "rb") as f:
pdf_document = PdfReader(f)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
last_begin_index = process_pages(get_text, total_pages)
return common_header, last_begin_index
except Exception as e_pypdf2:
print(f"get_start_and_common_header: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 读取失败,尝试使用 PyMuPDFfitz
try:
with fitz.open(input_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
last_begin_index = process_pages(get_text, total_pages)
return common_header, last_begin_index
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return common_header, -1
def check_pdf_pages(pdf_path, mode, logger):
pdf_document = None
def generate_invalid_return():
# 根据 mode 返回不同数量的空字符串及 pdf_path
return (True, ['', '', '', '', '', '', pdf_path, '']) if mode == 'goods' else (
True, ['', '', '', '', '', pdf_path, ''])
try:
try:
# 尝试使用 PyPDF2 读取 PDF新版可直接传入文件路径
pdf_document = PdfReader(pdf_path)
num_pages = len(pdf_document.pages)
# logger.info(f"使用 PyPDF2 成功读取 PDF '{pdf_path}' 的总页数为: {num_pages}")
# 尝试使用 PyPDF2 读取 PDF新版可直接传入文件对象
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
num_pages = len(pdf_document.pages)
except Exception as e_pypdf2:
logger.warning(f"check_pdf_pages: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 读取失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path)
num_pages = pdf_document.page_count
with fitz.open(pdf_path) as pdf_document:
num_pages = pdf_document.page_count
if num_pages <= 30:
logger.info("PDFinvalid_path页数小于或等于30页跳过切分逻辑。")
if mode == 'goods':
return True, ['', '', '', '', '', '', pdf_path, '']
else:
return True, ['', '', '', '', '', pdf_path, '']
return generate_invalid_return()
# 若页数大于30页返回 False 表示继续处理
return False, []
except Exception as e:
logger.error(f"无法读取 PDF 页数: {e}")
if mode == 'goods':
return True, ['', '', '', '', '', '', pdf_path, '']
else:
return True, ['', '', '', '', '', pdf_path, '']
finally:
# 如果使用的是 PyMuPDF 或其他需要关闭的对象,确保关闭资源
if pdf_document and hasattr(pdf_document, "close"):
pdf_document.close()
return generate_invalid_return()
def save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header):
pdf_document = None
using_pypdf2 = False
try:
if start_page is None or end_page is None:
print("错误: start_page 或 end_page 为 None")
return ""
# 尝试使用 PyPDF2 读取 PDF新版可以直接传入路径
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
# 定义提取文本函数
get_text = create_get_text_function('pypdf2', pdf_document)
using_pypdf2 = True
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"save_extracted_pages: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
def extract_and_save_pages(pdf_document, from_page, to_page, output_path, is_using_pypdf2):
"""
抽取 PDF 某个范围 [from_page, to_page] 的页面并保存到 output_path
根据 is_using_pypdf2 判断使用 PyPDF2 还是 PyMuPDF
"""
if is_using_pypdf2:
# 使用 PyPDF2
writer = PdfWriter()
for page_num in range(from_page, to_page + 1):
try:
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
using_pypdf2 = False
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"save_extracted_pages: 使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return ""
writer.add_page(pdf_document.pages[page_num])
except Exception as e_page:
print(f"添加第 {page_num} 页到 writer 时出错: {e_page}")
continue
# 检查页面范围的有效性
try:
with open(output_path, 'wb') as f_out:
writer.write(f_out)
print(f"已截取并保存页面 {from_page} - {to_page}{output_path}")
except Exception as e_write:
print(f"保存时出错: {e_write}")
return ""
return output_path
else:
# 使用 PyMuPDF
output_doc = fitz.open()
try:
for page_num in range(from_page, to_page + 1):
try:
output_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num)
except Exception as e_page:
print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}")
continue
output_doc.save(output_path)
print(f"已截取并保存页面 {from_page} - {to_page}{output_path}")
return output_path
except Exception as e_write:
print(f"保存时出错: {e_write}")
return ""
finally:
output_doc.close()
def save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header):
# 若 start_page 或 end_page 无效,直接退出
if start_page is None or end_page is None:
print("错误: start_page 或 end_page 为 None")
return ""
def process_document(pdf_document, total_pages, using_pypdf2, get_text):
# 检查页面范围
if start_page < 0 or end_page >= total_pages or start_page > end_page:
print(f"无效的页面范围: {start_page}{end_page}")
return ""
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
# 如果 output_suffix 为 'notice' 并且 start_page > 0则另存“目录前”的页面
# 如果 output_suffix 为 'notice' 并且 start_page > 0则额外保存“目录前”的页面
if output_suffix == 'notice' and start_page > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
toc_page = -1
# 先遍历从 0 到 start_page - 1尝试查找“目录”文字
for page_num in range(min(start_page, total_pages)):
try:
text = get_text(page_num)
cleaned_text = clean_page_content(text, common_header)
if regex.search(r'\s*录', cleaned_text, regex.MULTILINE):
toc_page = page_num
break
except Exception as e_extract:
print(f"提取第 {page_num} 页文本时出错: {e_extract}")
continue
cleaned_text = clean_page_content(text, common_header)
if regex.search(r'\s*录', cleaned_text, regex.MULTILINE):
toc_page = page_num
break
# toc_page 找到则提取到 toc_page否则提取到 start_page - 1
pages_to_extract = toc_page + 1 if toc_page != -1 else start_page
if using_pypdf2:
before_doc = PdfWriter()
for page_num in range(pages_to_extract):
try:
before_doc.add_page(pdf_document.pages[page_num])
except Exception as e_page:
print(f"添加第 {page_num} 页到 before_doc 时出错: {e_page}")
continue
try:
with open(before_pdf_path, 'wb') as f_before:
before_doc.write(f_before)
print(f"目录前的页面已保存到 {before_pdf_path}")
except Exception as e_write_before:
print(f"保存目录前的页面时出错: {e_write_before}")
else:
before_doc = fitz.open()
for page_num in range(pages_to_extract):
try:
before_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num)
except Exception as e_page:
print(f"添加第 {page_num} 页到 before_doc 时出错: {e_page}")
continue
try:
before_doc.save(before_pdf_path)
before_doc.close()
print(f"目录前的页面已保存到 {before_pdf_path}")
except Exception as e_write_before:
print(f"保存目录前的页面时出错: {e_write_before}")
# 如果找到了页面可提取,且 pages_to_extract > 0
if pages_to_extract > 0:
extract_and_save_pages(pdf_document, 0, pages_to_extract - 1, before_pdf_path, using_pypdf2)
# 提取并保存目标页面
if using_pypdf2:
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
try:
output_doc.add_page(pdf_document.pages[page_num])
except Exception as e_page:
print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}")
continue
try:
with open(output_pdf_path, 'wb') as f_out:
output_doc.write(f_out)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
except Exception as e_write:
print(f"保存截取的页面时出错: {e_write}")
return ""
else:
output_doc = fitz.open()
try:
for page_num in range(start_page, end_page + 1):
try:
output_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num)
except Exception as e_page:
print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}")
continue
output_doc.save(output_pdf_path)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
except Exception as e_write:
print(f"保存截取的页面时出错: {e_write}")
return ""
finally:
output_doc.close()
except Exception as e:
print(f"发生未知错误: {e}")
# 最后提取并保存目标页面
return extract_and_save_pages(pdf_document, start_page, end_page, output_pdf_path, using_pypdf2)
# 第一阶段:尝试使用 PyPDF2
try:
with open(pdf_path, "rb") as pdf_file:
# 这里构造 PdfReader确保上下文结束后 pdf_file 会被关闭
pdf_document = PdfReader(pdf_file)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
using_pypdf2 = True
# 在该 with 块内调用处理函数
return process_document(pdf_document, total_pages, using_pypdf2, get_text)
except Exception as e_pypdf2:
print(f"使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 第二阶段PyPDF2 失败后再尝试 PyMuPDF
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
using_pypdf2 = False
# 在该 with 块内调用处理函数
return process_document(pdf_document, total_pages, using_pypdf2, get_text)
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return ""
finally:
# 如果使用的是 PyMuPDF 或其他需要关闭的对象,确保关闭资源
if pdf_document and not using_pypdf2 and hasattr(pdf_document, "close"):
pdf_document.close()
def is_pdf_or_doc(filename):
# 判断文件是否为PDF或Word文档
return filename.lower().endswith(('.pdf', '.doc', '.docx'))
def convert_to_pdf(file_path):
# 假设 docx2pdf 函数已经被定义,这里仅根据文件扩展名来决定是否需要转换
if file_path.lower().endswith(('.doc', '.docx')):
return docx2pdf(file_path)
return file_path
def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
"""
从指定的PDF文件中提取无效部分并保存到输出文件夹中
@ -313,26 +258,37 @@ def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
pdf_document = None
try:
# 尝试使用 PyPDF2 读取 PDF
# 尝试使用 PyPDF2 读取 PDF,使用 with open 打开文件
try:
pdf_document = PdfReader(pdf_path)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
if total_pages <= 50:
print(f"PDF页数({total_pages}) <= 50直接返回文件路径: {pdf_path}")
return pdf_path, 0
# 提取并清理每页的文本内容
page_texts = []
for i in range(total_pages):
text = get_text(i)
cleaned_text = clean_page_content(text, common_header) if text else ""
page_texts.append(cleaned_text)
except Exception as e:
pdf_document = fitz.open(pdf_path)
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
if total_pages <= 50:
print(f"PDF页数({total_pages}) <= 50直接返回文件路径: {pdf_path}")
return pdf_path, 0
# 提取并清理每页的文本内容
page_texts = []
for i in range(total_pages):
text = get_text(i)
cleaned_text = clean_page_content(text, common_header) if text else ""
page_texts.append(cleaned_text)
# 如果 PyPDF2 读取失败,则使用 PyMuPDF 读取
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
if total_pages <= 50:
print(f"PDF页数({total_pages}) <= 50直接返回文件路径: {pdf_path}")
return pdf_path, 0
# 提取并清理每页的文本内容
page_texts = []
for i in range(total_pages):
text = get_text(i)
cleaned_text = clean_page_content(text, common_header) if text else ""
page_texts.append(cleaned_text)
# 内部函数:查找起始页
def find_start_page(begin_page):
@ -420,66 +376,70 @@ def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
finally:
# 确保释放资源(对 fitz 的 pdf_document 需要关闭)
if pdf_document and hasattr(pdf_document, "close"):
if pdf_document and isinstance(pdf_document, fitz.Document):
pdf_document.close()
def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None,
output_suffix="normal", invalid_endpage=-1):
start_page = None
end_page = None
flag = True
# 先尝试使用 PyPDF2
def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern=None, output_suffix="normal", invalid_endpage=-1):
def process_pages(get_text, total_pages):
start_page = None
end_page = None
flag = True
end_limit = total_pages if invalid_endpage == -1 else min(invalid_endpage, total_pages)
for i in range(begin_page, end_limit):
text = get_text(i)
if text:
cleaned_text = clean_page_content(text, common_header)
if output_suffix == "tobidders_notice2":
catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE)
if exclusion_pattern and flag and (start_page is not None) and regex.search(exclusion_pattern, cleaned_text):
flag = False
continue
if begin_page == 0 and catalog_pattern.search(cleaned_text):
continue # 如果存在目录,跳过当前页面
else:
# 防止匹配到正文前附表/正文部分
if exclusion_pattern and flag and (start_page is None) and regex.search(exclusion_pattern, cleaned_text):
flag = False
continue
if start_page is None and regex.search(begin_pattern, cleaned_text):
if (output_suffix == "notice" and i >= begin_page) or (output_suffix != "notice" and i > begin_page):
start_page = i
continue
if start_page is not None:
if output_suffix in ["tobidders_notice", "notice", "tobidders_notice2"]:
if regex.search(end_pattern, cleaned_text) and i > start_page:
end_page = i
break
else:
if regex.search(end_pattern, cleaned_text) and not regex.search(begin_pattern, cleaned_text):
end_page = i
break
return start_page, end_page
# 尝试使用 PyPDF2并使用 with open 打开文件
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
start_page, end_page = process_pages(get_text, total_pages)
return start_page, end_page
except Exception as e_pypdf2:
print(f"extract_pages_generic:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 失败,就再试 PyMuPDF
try:
pdf_document = fitz.open(pdf_path)
print(f"extract_pages_generic: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 失败,尝试使用 PyMuPDF
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return None, None
end_limit = total_pages if invalid_endpage == -1 else min(invalid_endpage, total_pages)
for i in range(begin_page, end_limit):
text = get_text(i)
if text:
cleaned_text = clean_page_content(text, common_header)
if output_suffix == "tobidders_notice2": #extract_pages_twice_tobidders_notice会调用该代码
catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE)
if exclusion_pattern and flag and (start_page is not None) and regex.search(exclusion_pattern,cleaned_text):
flag = False
continue
if begin_page == 0 and catalog_pattern.search(cleaned_text):
continue # 如果存在目录,跳过当前页面
else:
# 一般投标文件的编制、组成在'投标人须知前附表/正文'中需要防止begin_pattern匹配到这部分内容所以在start_page is None的时候使用该exclusion_pattern
if exclusion_pattern and flag and (start_page is None) and regex.search(exclusion_pattern, cleaned_text):
flag = False
continue
if start_page is None and regex.search(begin_pattern, cleaned_text):
if (output_suffix == "notice" and i >= begin_page) or (output_suffix != "notice" and i > begin_page):
start_page = i
continue
if start_page is not None:
if output_suffix in ["tobidders_notice", "notice", "tobidders_notice2"]: # 使用列表判断多个匹配项
# 判断 end_pattern 是否匹配且当前页大于起始页
if regex.search(end_pattern, cleaned_text) and i > start_page:
end_page = i
break
else:
# 如果 end_pattern 匹配且 begin_pattern 不匹配
if regex.search(end_pattern, cleaned_text) and not regex.search(begin_pattern, cleaned_text):
end_page = i
break
# 如果使用的是 PyMuPDF需要手动关闭文档
if pdf_document and isinstance(pdf_document, fitz.Document):
pdf_document.close()
return start_page, end_page
start_page, end_page = process_pages(get_text, total_pages)
return start_page, end_page
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return None, None
def generate_mid_pattern(chapter_type=None):
@ -565,7 +525,7 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
"""
exclusion_pattern = regex.compile(
r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$', regex.MULTILINE)
def run_extraction(begin_pattern, extraction_stage='first'):
def run_extraction(begin_pattern,get_text,total_pages,extraction_stage='first'):
"""
使用提供的 begin_pattern 运行提取过程
根据 extraction_stage 判断是第一次提取还是第三次提取
@ -649,23 +609,7 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
path2 = path1
return path1, path2
pdf_document = None
try:
try:
pdf_document = PdfReader(pdf_path)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
except Exception as e_pypdf2:
print(f"extract_pages_tobidders_notice:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
pdf_document = fitz.open(pdf_path)
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "", "" # 或者根据需求抛出异常
def process_extraction(get_text, total_pages):
# 第一次提取尝试,使用初始的 begin_pattern
begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+|'
@ -673,22 +617,29 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
r'(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+\s*$',
regex.MULTILINE
)
start_page, mid_page, end_page = run_extraction(begin_pattern, extraction_stage='first')
# 如果第一次提取成功,保存并返回路径
start_page, mid_page, end_page = run_extraction(begin_pattern,get_text,total_pages, extraction_stage='first')
if start_page is not None and mid_page is not None and end_page is not None:
return perform_extraction_and_save(start_page, mid_page, end_page)
print(f"第一次提取tobidders_notice失败尝试第二次提取: {pdf_path}")
start_page1, end_page1, start_page2, end_page2 = extract_pages_twice_tobidders_notice(pdf_path, common_header, begin_page)
if start_page1 is not None and end_page1 is not None and start_page2 is not None and end_page2 is not None:
if end_page1==start_page2:
mid_page=end_page1
start_page1, end_page1, start_page2, end_page2 = extract_pages_twice_tobidders_notice(
pdf_path, common_header, begin_page
)
if (start_page1 is not None and end_page1 is not None and
start_page2 is not None and end_page2 is not None):
if end_page1 == start_page2:
mid_page = end_page1
return perform_extraction_and_save(start_page1, mid_page, end_page2)
else:
path1=save_extracted_pages(pdf_path,output_folder,start_page1,end_page1,"tobidders_notice_part1",common_header)
path2=save_extracted_pages(pdf_path,output_folder,start_page2,end_page2,"tobidders_notice_part2",common_header)
return path1,path2
path1 = save_extracted_pages(
pdf_path, output_folder, start_page1, end_page1,
"tobidders_notice_part1", common_header
)
path2 = save_extracted_pages(
pdf_path, output_folder, start_page2, end_page2,
"tobidders_notice_part2", common_header
)
return path1, path2
print("第二次提取tobidders_notice失败尝试第三次提取!")
new_begin_pattern = regex.compile(
@ -696,73 +647,97 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
r'(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知前附表\s*$',
regex.MULTILINE
)
start_page, mid_page, end_page = run_extraction(new_begin_pattern, extraction_stage='third')
start_page, mid_page, end_page = run_extraction(new_begin_pattern, get_text, total_pages, extraction_stage='third')
if start_page is not None and mid_page is not None and end_page is not None:
return perform_extraction_and_save(start_page, mid_page, end_page)
else:
# 所有提取尝试均失败
print(f"三次提取tobidders_notice均失败!! {pdf_path}")
return "", ""
try:
try:
# 尝试使用 PyPDF2 读取 PDF 文件
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
return process_extraction(get_text, total_pages)
except Exception as e_pypdf2:
print(f"extract_pages_tobidders_notice: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 读取失败,则使用 PyMuPDF 读取 PDF 文件
with fitz.open(pdf_path) as pdf_document:
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
return process_extraction(get_text, total_pages)
except Exception as e:
print(f"处理文件 {pdf_path} 时发生错误: {e}")
return "", ""
finally:
# 无论如何都关闭 pdf_document对于 fitz 对象)
if pdf_document and hasattr(pdf_document, "close"):
pdf_document.close()
def extract_pages_twice_tobidders_notice(pdf_path, common_header, begin_page):
# 使用 with open 确保文件流在读取后关闭,避免文件句柄泄露
try:
pdf_document = PdfReader(pdf_path)
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
# 内部函数:设置所有正则匹配模式和排除规则
def setup_patterns():
begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知)+',
regex.MULTILINE
)
second_begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*([\u4e00-\u9fff]+)',
regex.MULTILINE
)
# 此处 end_pattern 默认采用 second_begin_pattern
end_pattern = second_begin_pattern
exclusion_pattern = regex.compile(
r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$',
regex.MULTILINE
)
exclusion_words = ["合同", "评标", "开标", "评审", "采购", "资格"]
return begin_pattern, second_begin_pattern, end_pattern, exclusion_pattern, exclusion_words
# 初始化正则模式和排除规则
begin_pattern, second_begin_pattern, end_pattern, exclusion_pattern, exclusion_words = setup_patterns()
output_suffix = "tobidders_notice"
# 提取第一部分
start_page1, end_page1 = extract_pages_generic(
pdf_path, begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern, output_suffix
)
if start_page1 is None or end_page1 is None:
return None, None, None, None
# 提取第二部分:先检查 end_page1 页的内容
text = get_text_pypdf2(pdf_document, end_page1)
cleaned_text = clean_page_content(text, common_header)
match = end_pattern.search(cleaned_text)
if match:
# 获取匹配到的中文部分
chapter_title = match.group(1) if match.groups() else ""
# 检查是否包含排除关键词
if any(word in chapter_title for word in exclusion_words):
# 如果包含排除关键词,则将第二部分的起始模式设置为与第一部分相同
second_begin_pattern = begin_pattern
output_suffix = "tobidders_notice2"
# 对于第二部分,传入的 begin_page 改为 end_page1 - 1保证 start_page > begin_page
begin_page2 = end_page1 - 1
start_page2, end_page2 = extract_pages_generic(
pdf_path, second_begin_pattern, end_pattern, begin_page2, common_header,
exclusion_pattern, output_suffix
)
if start_page2 is None or end_page2 is None:
return start_page1, end_page1, end_page1, end_page1
return start_page1, end_page1, start_page2, end_page2
except Exception as e:
print(f"读取 PDF 文件失败: {e}")
return None, None, None, None
# 投标须知前附表占一章、投标须知正文占一章
output_suffix = "tobidders_notice"
begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知)+',
regex.MULTILINE
)
second_begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*([\u4e00-\u9fff]+)', regex.MULTILINE # 捕获中文部分
)
end_pattern = second_begin_pattern
exclusion_words = ["合同", "评标", "开标", "评审", "采购", "资格"] # 在这里添加需要排除的关键词
exclusion_pattern = regex.compile(
r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$', regex.MULTILINE)
# 提取第一部分
start_page1, end_page1 = extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern, output_suffix)
if start_page1 is None or end_page1 is None:
return None, None, None,None
# 提取第二部分 先检查end_page1页面的内容
text = get_text_pypdf2(pdf_document,end_page1)
cleaned_text = clean_page_content(text, common_header)
match = end_pattern.search(cleaned_text)
if match:
# 获取匹配到的中文部分
chapter_title = match.group(1) if match.groups() else ""
# 检查是否包含排除关键词
if any(word in chapter_title for word in exclusion_words):
# 如果包含排除关键词,直接返回相同的路径
second_begin_pattern=begin_pattern #投标人须知前附表和投标人须知正文不是紧挨着的eg:(发布稿)恒丰理财有限责任公司智能投研终端项目竞争性磋商
# 如果不包含排除关键词,继续提取第二部分
output_suffix='tobidders_notice2'
begin_page=end_page1-1 #传入start_page2-1是因为非notice截取->start_page>begin_page
start_page2, end_page2 = extract_pages_generic(pdf_path, second_begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern, output_suffix)
if start_page2 is None or end_page2 is None:
return start_page1, end_page1, end_page1,end_page1
return start_page1, end_page1, start_page2,end_page2
def get_notice(pdf_path, output_folder, begin_page, common_header,invalid_endpage):
begin_pattern = regex.compile(

View File

@ -5,6 +5,23 @@ from PyPDF2 import PdfReader
from docx import Document
import queue
from queue import Queue, Empty
# 定义一个函数用于提取文本
def get_text_pypdf2(pdf_doc, page_num):
return pdf_doc.pages[page_num].extract_text() or ""
def get_text_fitz(pdf_doc, page_num):
return pdf_doc.load_page(page_num).get_text() or ""
# 定义一个内部函数来绑定 pdf_document 对象
def create_get_text_function(pdf_lib, pdf_doc):
if pdf_lib == 'pypdf2':
return lambda page_num: get_text_pypdf2(pdf_doc, page_num)
elif pdf_lib == 'fitz':
return lambda page_num: get_text_fitz(pdf_doc, page_num)
else:
raise ValueError("不支持的 PDF 库。")
def extract_common_header(pdf_path):
"""
提取 PDF 文件的公共页眉
@ -16,17 +33,11 @@ def extract_common_header(pdf_path):
- common_header: 公共页眉内容字符串如果未找到则返回空字符串
"""
def get_headers(pdf_document, start_page, pages_to_read, is_pypdf2):
def get_headers(get_text, start_page, pages_to_read,total_pages):
headers = []
for i in range(start_page, min(start_page + pages_to_read,
len(pdf_document.pages) if is_pypdf2 else pdf_document.page_count)):
for i in range(start_page, min(start_page + pages_to_read,total_pages)):
try:
if is_pypdf2:
page = pdf_document.pages[i]
text = page.extract_text() or ""
else:
page = pdf_document.load_page(i)
text = page.get_text() or ""
text=get_text(i)
if text:
# 只取每页的前三行,去除前后的空白字符
first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
@ -71,25 +82,7 @@ def extract_common_header(pdf_path):
return common_headers
try:
# 尝试使用 PyPDF2 读取 PDF
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
is_pypdf2 = True
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
is_pypdf2 = False
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" # 或者根据需求抛出异常
def extract_common_header_main(get_text,total_pages):
# 定义两个提取策略
strategies = []
if total_pages >= 3:
@ -114,8 +107,8 @@ def extract_common_header(pdf_path):
common_headers = []
for idx, (start, count) in enumerate(strategies):
headers = get_headers(pdf_document, start, count, is_pypdf2)
for idx, (start, pages_to_read) in enumerate(strategies):
headers = get_headers(get_text, start, pages_to_read,total_pages)
if len(headers) < 2:
continue # 需要至少2页来比较
@ -125,9 +118,29 @@ def extract_common_header(pdf_path):
# print(f"使用策略{idx + 1}找到共同的页眉: {common_headers}")
break # 找到共同部分后退出
# 如果没有找到,继续下一个策略
return '\n'.join(common_headers)
try:
# 尝试使用 PyPDF2 读取 PDF
try:
with open(pdf_path, "rb") as pdf_file:
pdf_document = PdfReader(pdf_file)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
return extract_common_header_main(get_text,total_pages)
except Exception as e_pypdf2:
print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
return extract_common_header_main(get_text,total_pages)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" # 或者根据需求抛出异常
# 执行策略1和策略2先执行策略1中间3页或根据页数调整然后执行策略2前三页或根据页数调整
# 获取两个策略的公共前缀:分别获取两个策略提取的公共页眉。
# 选择最长的公共前缀如果两个策略的前缀之间存在包含关系选择最长的那个。如果没有包含关系则返回策略1提取的前缀。
@ -303,7 +316,7 @@ def is_pure_image(docx_path, percentage=0.3):
return True
if __name__ == '__main__':
pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\bf0346dc-8711-43f5-839e-e4076acea84a\ztbfile.pdf"
pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\971a834e-5cf5-4259-ad85-320aaca051e0\ztbfile.pdf"
res=is_scanned_pdf(pdf_path)
if res:
print("扫描型")

View File

@ -141,7 +141,7 @@ def save_extracted_text_to_txt(pdf_path, txt_path):
if __name__ == '__main__':
# file_path='D:\\flask_project\\flask_app\\static\\output\\output1\\648e094b-e677-47ce-9073-09e0c82af210\\ztbfile_tobidders_notice_part2.pdf'
pdf_path=r"C:\Users\Administrator\Desktop\新建文件夹 (3)\ztbfile(1).pdf"
pdf_path=r'C:\Users\Administrator\Downloads\完全没有解析进度1_南网超高压公司2024年第四批服务公开招标项目2024-FW-4-S-ZB2 (1).pdf'
# pdf_path=r"C:\Users\Administrator\Desktop\货物标\output2\广水市妇幼招标文件最新W改_evaluation_method.pdf"
# file_path = 'C:\\Users\\Administrator\\Desktop\\货物标\\截取test\\交警支队机动车查验监管系统项目采购_tobidders_notice_part1.pdf'
# file_path = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest8.pdf"

View File

@ -4,7 +4,7 @@ import regex
import os
import time
from PyPDF2 import PdfReader
from flask_app.general.读取文件.clean_pdf import clean_page_content
from flask_app.general.读取文件.clean_pdf import clean_page_content,create_get_text_function
from flask_app.general.截取pdf通用函数 import get_start_and_common_header, save_extracted_pages, get_invalid_file, \
extract_pages_tobidders_notice, get_notice, extract_pages_generic
from flask_app.general.通用功能函数 import get_global_logger
@ -24,95 +24,92 @@ def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_patter
print(f"Error processing {pdf_path}: {e}")
return ""
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, last_begin_index):
"""
处理 PDF 文件作为 truncate_pdf_main 的后备方法
此函数将在所有模式对都失败后调用
当所有模式均失败后调用此方法
"""
pdf_document=None
try:
try:
pdf_document = PdfReader(pdf_path)
is_pypdf2 = True
total_pages = len(pdf_document.pages)
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_pages_twice:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
pdf_document = fitz.open(pdf_path)
is_pypdf2 = False
total_pages = pdf_document.page_count
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return ""
if output_suffix == "qualification":
print("最后一次尝试获取qualification!")
# 动态设置 include_keys
include_keys = ["资格审查", "资质审查", "符合性审查", "资格性检查", "符合性检查", "资格检查", "能力","信誉"]
# 定义起始匹配模式,仅匹配“附录”、“附件”或“附表”
begin_pattern = r'^(?:附录(?:[一1])?[:]|附件(?:[一1])?[:]|附表(?:[一1])?[:])'
# 定义结束匹配模式 - 附录、附件、附表等(移除负向前瞻)
end_pattern_attachment = regex.compile(
r'^(?:附录[一二三四五六七八九1-9]*[:]|附件[一二三四五六七八九1-9]*[:]|附表[一二三四五六七八九1-9]*[:]).*$',
regex.MULTILINE
)
# 定义结束匹配模式 - 章节标题、评标附表、投标人须知等
end_pattern_chapter = regex.compile(
r'第[一二三四五六七八九]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$',
regex.MULTILINE
)
start_page = None
end_page = None
def get_text(page_num):
try:
if is_pypdf2:
return pdf_document.pages[page_num].extract_text() or ""
if output_suffix != "qualification":
return ""
print("最后一次尝试获取qualification!")
# 动态设置关键字
include_keys = ["资格审查", "资质审查", "符合性审查", "资格性检查", "符合性检查", "资格检查", "能力", "信誉"]
# 定义起始匹配模式,仅匹配“附录”、“附件”或“附表”
begin_pattern = r'^(?:附录(?:[一1])?[:]|附件(?:[一1])?[:]|附表(?:[一1])?[:])'
# 定义结束匹配模式 - 附录、附件、附表等(移除负向前瞻)
end_pattern_attachment = regex.compile(
r'^(?:附录[一二三四五六七八九1-9]*[:]|附件[一二三四五六七八九1-9]*[:]|附表[一二三四五六七八九1-9]*[:]).*$',
regex.MULTILINE
)
# 定义结束匹配模式 - 章节标题、评标附表、投标人须知等
end_pattern_chapter = regex.compile(
r'第[一二三四五六七八九]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$',
regex.MULTILINE
)
def process_pdf(total_pages, get_text, include_keys, begin_pattern, end_pattern_attachment, end_pattern_chapter,
common_header, last_begin_index):
"""
遍历 PDF 的各页依据匹配规则确定起始页和结束页
"""
start_page = None
end_page = None
for i in range(last_begin_index, total_pages):
text = get_text(i)
if not text:
continue
cleaned_text = clean_page_content(text, common_header)
# 确定起始页:页面包含 include_keys 且匹配 begin_pattern
if start_page is None and any(key in cleaned_text for key in include_keys):
if regex.search(begin_pattern, cleaned_text, regex.MULTILINE):
start_page = i
continue
# 一旦起始页确定,则查找结束页
if start_page is not None:
# 结束模式:附件类
if end_pattern_attachment.search(cleaned_text):
# 如果页面不包含 include_keys 且页码大于起始页,则认为找到了结束页
if not any(key in cleaned_text for key in include_keys) and i > start_page:
end_page = i
break
else:
return pdf_document.load_page(page_num).get_text() or ""
except Exception as e:
print(f"提取第 {page_num} 页文本时出错: {e}")
return ""
# 从指定的开始页开始遍历
for i in range(last_begin_index, total_pages):
text = get_text(i)
if text:
cleaned_text = clean_page_content(text, common_header)
# 确定起始页需在last_begin_index之后
if any(key in cleaned_text for key in include_keys):
if regex.search(begin_pattern, cleaned_text, regex.MULTILINE):
if start_page is None:
start_page = i # 确保起始页不小于章节的开始页码
continue
# print(f"匹配到附录/附件/附表,设置起始页为: {start_page}")
# 确定结束页 - 附录、附件、附表等
if start_page is not None and end_pattern_attachment.search(cleaned_text):
# 额外检查当前页面是否不包含任何 include_keys
if not any(key in cleaned_text for key in include_keys):
if i > start_page:
end_page = i
# print(f"找到结束页 (附件类): {end_page}")
break # 找到结束页后退出循环
else:
print(f"当前页面匹配附件结束模式但包含 include_keys继续查找。页面: {i}")
# 确定结束页 - 章节标题、评标附表、投标人须知等
elif start_page is not None and end_pattern_chapter.search(cleaned_text):
if i > start_page:
end_page = i
# print(f"找到结束页 (章节标题等): {end_page}")
break # 找到结束页后退出循环
if start_page is None or end_page is None:
print(f"{output_suffix} twice: 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
else:
return save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header)
print(f"当前页面匹配附件结束模式但包含 include_keys继续查找。页面: {i}")
# 结束模式:章节标题等
elif end_pattern_chapter.search(cleaned_text) and i > start_page:
end_page = i
break
return start_page, end_page
# 尝试使用 PyPDF2 方式读取 PDF
try:
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
start_page, end_page = process_pdf(total_pages, get_text, include_keys, begin_pattern,
end_pattern_attachment, end_pattern_chapter,
common_header, last_begin_index)
except Exception as e_pypdf2:
print(f"extract_pages_twice: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 使用 PyMuPDF 作为备用方案
try:
with fitz.open(pdf_path) as pdf_document:
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
start_page, end_page = process_pdf(total_pages, get_text, include_keys, begin_pattern,
end_pattern_attachment, end_pattern_chapter,
common_header, last_begin_index)
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return ""
if start_page is None or end_page is None:
print(f"{output_suffix} twice: 未找到起始或结束页在文件 {pdf_path} 中!")
return ""
except Exception as e:
print(f"Error in extract_pages_twice: {e}")
return ""
finally:
if pdf_document and hasattr(pdf_document, 'close'):
pdf_document.close()
else:
return save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header)
def truncate_pdf_main_engineering(input_path, output_folder, selection, logger, output_suffix="default",invalid_endpage=-1):

View File

@ -2,10 +2,10 @@ import fitz
from PyPDF2 import PdfReader
import regex # 导入正则表达式库
import os # 用于文件和文件夹操作
from flask_app.general.读取文件.clean_pdf import clean_page_content
from flask_app.general.读取文件.clean_pdf import clean_page_content,create_get_text_function
from flask_app.general.merge_pdfs import merge_and_cleanup
from flask_app.general.截取pdf通用函数 import get_start_and_common_header, save_extracted_pages, is_pdf_or_doc, \
get_invalid_file, extract_pages_tobidders_notice, extract_pages_generic, get_notice, create_get_text_function
get_invalid_file, extract_pages_tobidders_notice, extract_pages_generic, get_notice
from flask_app.general.通用功能函数 import get_global_logger
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, common_header,output_suffix,logger,invalid_endpage=-1):
@ -37,10 +37,6 @@ def get_patterns_for_procurement():
end_pattern = regex.compile(r'(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)第[一二三四五六七八九1-9]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$', regex.MULTILINE)
return begin_pattern, end_pattern
# """
# 第四章
# 评标
# """ 也能匹配上
def get_patterns_for_evaluation_method():
begin_pattern = regex.compile(
r'(?:(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!"\s*)(?<!“\s*)(?<!“\s*)第[一二三四五六七八九十1-9]+(?:章|部分)\s*' # 第一种模式
@ -79,73 +75,67 @@ def extract_pages_qualification(pdf_path, begin_page, common_header):
)
print("第二次尝试 qualification:匹配附件")
start_page = None
end_page = None
include_keywords = ["资格审查", "资质审查", "符合性审查","资格性审查", "符合性检查","资格性检查", "资格检查"]
exclude_keywords = ["声明函", "承诺函"]
pdf_document = None
try:
try:
# 尝试使用 PyPDF2 读取 PDF
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_pages_qualification:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return None, None # 或者根据需求抛出异常
# 从指定的开始页开始遍历
def _extract_pages_loop(get_text, total_pages, begin_page, common_header):
start_page = None
end_page = None
for i in range(begin_page, total_pages):
text = get_text(i)
if text:
cleaned_text = clean_page_content(text, common_header)
# 优先检查是否匹配优先模式
# 1. 优先判断是否匹配优先模式
priority_match = priority_pattern.search(cleaned_text)
if priority_match and start_page is None:
start_page = i
print(f"匹配到priority_pattern设置起始页为: {start_page}")
print(f"Matched priority_pattern, setting start_page to: {start_page}")
continue
else:
# 如果未匹配优先模式,则按照一般模式进行判断
if (
any(keyword in cleaned_text for keyword in include_keywords) and
# 2. 若未匹配优先模式,则判断是否同时满足包含和排除关键词条件
if (any(keyword in cleaned_text for keyword in include_keywords) and
all(keyword not in cleaned_text for keyword in exclude_keywords) and
start_page is None
):
start_page is None):
if begin_pattern.search(cleaned_text):
start_page = i
print(f"匹配到附录等模式begin_pattern设置起始页为: {start_page}")
print(f"Matched begin_pattern, setting start_page to: {start_page}")
continue
# 确定结束页 - 附录、附件、附表等
# 3. 判断结束页:附件/附表结束模式
if start_page is not None and end_pattern_attachment.search(cleaned_text):
# 额外检查当前页面是否不包含任何 include_keywords
# 若当前页面不包含任何 include_keywords则认为找到了结束页
if not any(keyword in cleaned_text for keyword in include_keywords):
if i > start_page:
end_page = i
print(f"qualification找到结束页 (附件类): {end_page}")
break # 找到结束页后退出循环
print(f"Found end_page (attachment): {end_page}")
break
else:
print(f"当前页面匹配附件结束模式但包含 include_keywords继续查找。页面: {i}")
# 确定结束页 - 章节标题
print(f"Page {i} matches attachment pattern but contains include_keywords, continue.")
# 4. 判断结束页:章节标题结束模式
elif start_page is not None and end_pattern_chapter.search(cleaned_text):
if i > start_page:
end_page = i
print(f"qualification找到结束页 (章节标题): {end_page}")
break # 找到结束页后退出循环
print(f"Found end_page (chapter title): {end_page}")
break
return start_page, end_page
finally:
if pdf_document and hasattr(pdf_document, 'close'):
pdf_document.close()
try:
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
return _extract_pages_loop(get_text, total_pages, begin_page, common_header)
except Exception as e_pypdf2:
print(f"extract_pages_qualification: Using PyPDF2 failed: {e_pypdf2}")
# 如果 PyPDF2 读取失败,尝试使用 PyMuPDF
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
return _extract_pages_loop(get_text, total_pages, begin_page, common_header)
except Exception as e_fitz:
print(f"Using PyMuPDF to read PDF also failed: {e_fitz}")
return None, None
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, begin_page, logger):