Merge branch 'develop-test' into develop

This commit is contained in:
zy123 2025-02-16 18:10:26 +08:00
commit 494dde94cf
7 changed files with 552 additions and 603 deletions

View File

@ -1,8 +1,7 @@
import os import os
import fitz import fitz
from PyPDF2 import PdfReader, PdfWriter from PyPDF2 import PdfReader, PdfWriter
from flask_app.general.截取pdf通用函数 import create_get_text_function from flask_app.general.读取文件.clean_pdf import create_get_text_function
#合并PDF #合并PDF
# 主合并函数,尝试使用 PyPDF2若失败则使用 fitz # 主合并函数,尝试使用 PyPDF2若失败则使用 fitz
@ -31,12 +30,14 @@ def merge_with_pypdf2(paths, output_path):
if not path.strip(): if not path.strip():
continue continue
try: try:
pdf_reader = PdfReader(path) with open(path, "rb") as f:
pdf_reader = PdfReader(f)
pages = pdf_reader.pages pages = pdf_reader.pages
get_text = create_get_text_function('pypdf2', pdf_reader) get_text = create_get_text_function('pypdf2', pdf_reader)
start_index = 0 start_index = 0
if last_page_text is not None and len(pages) > 0: # 如果上一份 PDF 的最后一页文本与当前 PDF 的第一页文本相同,则跳过当前 PDF 的第一页
if last_page_text is not None and pages:
current_first_page_text = get_text(0) current_first_page_text = get_text(0)
if current_first_page_text == last_page_text: if current_first_page_text == last_page_text:
start_index = 1 start_index = 1
@ -44,7 +45,8 @@ def merge_with_pypdf2(paths, output_path):
for page_num in range(start_index, len(pages)): for page_num in range(start_index, len(pages)):
pdf_writer.add_page(pages[page_num]) pdf_writer.add_page(pages[page_num])
if len(pages) > 0: # 更新 last_page_text 为当前 PDF 的最后一页文本
if pages:
last_page_text = get_text(len(pages) - 1) last_page_text = get_text(len(pages) - 1)
except Exception as e: except Exception as e:
print(f"文件 '{path}' 无法使用 PyPDF2 处理,错误: {e}") print(f"文件 '{path}' 无法使用 PyPDF2 处理,错误: {e}")
@ -58,16 +60,15 @@ def merge_with_pypdf2(paths, output_path):
return output_path return output_path
def merge_with_fitz(paths, output_path): def merge_with_fitz(paths, output_path):
# 使用 fitz 创建新文档 # 使用 fitz 创建新文档,作为上下文管理器自动释放资源
merged_pdf = fitz.open()
last_page_text = None last_page_text = None
with fitz.open() as merged_pdf:
for path in paths: for path in paths:
if not path.strip(): if not path.strip():
continue continue
pdf_doc = None
try: try:
pdf_doc = fitz.open(path) # 使用 with 确保 pdf_doc 在处理后自动关闭
with fitz.open(path) as pdf_doc:
get_text = create_get_text_function('fitz', pdf_doc) get_text = create_get_text_function('fitz', pdf_doc)
page_count = pdf_doc.page_count page_count = pdf_doc.page_count
@ -83,25 +84,19 @@ def merge_with_fitz(paths, output_path):
if page_count > 0: if page_count > 0:
last_page_text = get_text(page_count - 1) last_page_text = get_text(page_count - 1)
pdf_doc.close()
except Exception as e: except Exception as e:
print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}") print(f"文件 '{path}' 无法使用 fitz 处理,错误: {e}")
finally: continue
if pdf_doc is not None:
pdf_doc.close()
# 若合并后的文档为空,则直接返回空字符串
if merged_pdf.page_count == 0: if merged_pdf.page_count == 0:
merged_pdf.close()
return "" return ""
try: try:
merged_pdf.save(output_path) merged_pdf.save(output_path)
merged_pdf.close()
return output_path return output_path
except Exception as e: except Exception as e:
print(f"无法写入输出文件,错误: {e}") print(f"无法写入输出文件,错误: {e}")
merged_pdf.close()
return "" return ""
def judge_file_exist(original_path, new_suffix): def judge_file_exist(original_path, new_suffix):
@ -145,6 +140,9 @@ def find_and_merge(target_path, output_suffix):
paths=[target_path,full_path] paths=[target_path,full_path]
merge_pdfs(paths,target_path) merge_pdfs(paths,target_path)
#TODO: before应该是可选的
#如果报红 貌似会额外占内存
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'): def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name, mode='engineering'):
""" """
通用的 PDF 合并函数根据不同的模式合并指定的文件 通用的 PDF 合并函数根据不同的模式合并指定的文件
@ -169,20 +167,17 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
return "" return ""
if mode == 'engineering': if mode == 'engineering':
required_suffixes = [ suffixes = [
f'{base_file_name}_before.pdf', (f'{base_file_name}_before.pdf', False), # 可选
f'{base_file_name}_notice.pdf', (f'{base_file_name}_notice.pdf', True), # 必需
f'{base_file_name}_tobidders_notice_part1.pdf' (f'{base_file_name}_tobidders_notice_part1.pdf', True) # 必需
] ]
optional_suffixes = []
elif mode == 'goods': elif mode == 'goods':
required_suffixes = [ suffixes = [
f'{base_file_name}_before.pdf', (f'{base_file_name}_before.pdf', False), # 可选
f'{base_file_name}_notice.pdf', (f'{base_file_name}_notice.pdf', True), # 必需
f'{base_file_name}_tobidders_notice_part1.pdf' (f'{base_file_name}_tobidders_notice_part1.pdf', True), # 必需
] (f'{base_file_name}_tobidders_notice_part2.pdf', False) # 可选
optional_suffixes = [
f'{base_file_name}_tobidders_notice_part2.pdf'
] ]
else: else:
print(f"未知的合并模式: {mode}") print(f"未知的合并模式: {mode}")
@ -191,8 +186,8 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
all_pdfs_to_merge = [] all_pdfs_to_merge = []
missing_files = [] missing_files = []
# 处理必需的文件 for suffix, required in suffixes:
for suffix in required_suffixes: # 如果是 before 文件,则从 output_folder 中查找,否则从 truncate_files 中查找
if suffix == f'{base_file_name}_before.pdf': if suffix == f'{base_file_name}_before.pdf':
matching_files = [ matching_files = [
os.path.join(output_folder, f) os.path.join(output_folder, f)
@ -208,35 +203,19 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
for f in matching_files_sorted: for f in matching_files_sorted:
print(f"选中文件: {f}") print(f"选中文件: {f}")
else: else:
if required:
print(f"没有找到以 '{suffix}' 结尾的文件。") print(f"没有找到以 '{suffix}' 结尾的文件。")
missing_files.append(suffix) missing_files.append(suffix)
# 处理可选的文件
for suffix in optional_suffixes:
if suffix == f'{base_file_name}_before.pdf':
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else: else:
print(f"可选文件 '{suffix}' 未找到,继续合并。") print(f"可选文件 '{suffix}' 未找到,继续合并。")
# 检查是否所有必需的文件都已找到
if missing_files: if missing_files:
print("缺少以下必要的 PDF 文件,无法进行合并:") print("缺少以下必要的 PDF 文件,无法进行合并:")
for missing in missing_files: for missing in missing_files:
print(f" - {missing}") print(f" - {missing}")
return "" return ""
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}") print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
# 过滤掉不存在或为空的文件路径 # 过滤掉不存在或为空的文件路径

View File

@ -1,7 +1,6 @@
import concurrent.futures import concurrent.futures
import os import os
import time import time
from memory_profiler import profile
from flask_app.general.merge_pdfs import merge_selected_pdfs from flask_app.general.merge_pdfs import merge_selected_pdfs
from flask_app.general.截取pdf通用函数 import check_pdf_pages from flask_app.general.截取pdf通用函数 import check_pdf_pages
from flask_app.general.通用功能函数 import get_global_logger from flask_app.general.通用功能函数 import get_global_logger
@ -97,17 +96,13 @@ def truncate_pdf_multiple(pdf_path, output_folder, logger,mode='goods',selection
) )
if invalid_path: if invalid_path:
truncate_files.append(invalid_path) truncate_files.append(invalid_path)
logger.info(f"已添加 invalid_path: {invalid_path}")
if merged_path: if merged_path:
# 合并成功,添加合并后的文件路径 # 合并成功,添加合并后的文件路径
truncate_files.append(merged_path) truncate_files.append(merged_path)
logger.info(f"已成功合并 PDF 文件到 '{merged_output_path}'")
else: else:
# 合并失败,添加空字符串 # 合并失败,添加空字符串
truncate_files.append("") truncate_files.append("")
logger.warning(f"合并失败,没有生成合并文件 for {pdf_path}") logger.warning(f"合并失败,没有生成合并文件 for {pdf_path}")
logger.info("已截取文件路径: " + str(truncate_files)) logger.info("已截取文件路径: " + str(truncate_files))
return truncate_files return truncate_files

View File

@ -1,31 +1,15 @@
# flask_app/general/截取pdf通用函数.py # flask_app/general/截取pdf通用函数.py
import concurrent.futures import concurrent.futures
import io
import os import os
import fitz import fitz
import regex import regex
from PyPDF2 import PdfReader, PdfWriter from PyPDF2 import PdfReader, PdfWriter
from flask_app.general.读取文件.clean_pdf import extract_common_header, clean_page_content from flask_app.general.读取文件.clean_pdf import extract_common_header, clean_page_content, create_get_text_function, \
get_text_pypdf2
from flask_app.general.format_change import docx2pdf from flask_app.general.format_change import docx2pdf
import concurrent.futures import concurrent.futures
# 定义一个函数用于提取文本
def get_text_pypdf2(pdf, page_num):
return pdf.pages[page_num].extract_text() or ""
def get_text_fitz(pdf, page_num):
return pdf.load_page(page_num).get_text() or ""
# 定义一个内部函数来绑定 pdf_document 对象
def create_get_text_function(pdf_lib, pdf_doc):
if pdf_lib == 'pypdf2':
return lambda page_num: get_text_pypdf2(pdf_doc, page_num)
elif pdf_lib == 'fitz':
return lambda page_num: get_text_fitz(pdf_doc, page_num)
else:
raise ValueError("不支持的 PDF 库。")
def get_start_and_common_header(input_path, end_page): def get_start_and_common_header(input_path, end_page):
""" """
提取 PDF 文件的公共页眉和起始页索引 提取 PDF 文件的公共页眉和起始页索引
@ -38,237 +22,198 @@ def get_start_and_common_header(input_path, end_page):
- common_header: 公共页眉内容 - common_header: 公共页眉内容
- last_begin_index: 起始页的页码索引 - last_begin_index: 起始页的页码索引
""" """
pdf_document = None
try: try:
# 提取公共页眉 # 提取公共页眉(假设该函数已定义)
common_header = extract_common_header(input_path)
except Exception as e:
print(f"提取公共页眉时出错: {e}")
return "", -1
common_header = extract_common_header(input_path) # 假设该函数已定义 # 定义匹配模式
last_begin_index = -1
begin_pattern = regex.compile( begin_pattern = regex.compile(
r'.*(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:招标公告|磋商公告|谈判公告|邀请书|邀请函|投标邀请|磋商邀请|谈判邀请|采购公告)[\)]?\s*$', r'.*(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:招标公告|磋商公告|谈判公告|邀请书|邀请函|投标邀请|磋商邀请|谈判邀请|采购公告)[\)]?\s*$',
regex.MULTILINE regex.MULTILINE
) )
# 新增目录匹配模式
catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE) catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE)
try:
# 尝试使用 PyPDF2 读取 PDF
pdf_document = PdfReader(input_path)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"get_start_and_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(input_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return common_header, -1 # 或者根据需求抛出异常
# 遍历页面 # 内部函数:对 PDF 每一页进行遍历处理
def process_pages(get_text, total_pages):
for i in range(total_pages): for i in range(total_pages):
if i > end_page: if i > end_page:
return common_header, -1 # 如果页码大于 end_page直接返回 last_begin_index=0 return -1 # 超出 end_page 范围则返回 -1
text = get_text(i) text = get_text(i)
if text: if text:
cleaned_text = clean_page_content(text, common_header) cleaned_text = clean_page_content(text, common_header)
# 检查是否存在"目录" # 如果页面中包含“目录”,则跳过
if catalog_pattern.search(cleaned_text): if catalog_pattern.search(cleaned_text):
# print(f"页面 {i} 包含目录,跳过。") continue
continue # 如果存在目录,跳过当前页面
if begin_pattern.search(cleaned_text): if begin_pattern.search(cleaned_text):
last_begin_index = i # 更新第一个匹配的索引页码从0开始 return i # 找到匹配的起始页返回页码从0开始
# print(f"匹配到起始模式,设置 last_begin_index 为: {last_begin_index}") return -1 # 未找到匹配页
# 先尝试使用 PyPDF2 读取 PDF
try:
with open(input_path, "rb") as f:
pdf_document = PdfReader(f)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
last_begin_index = process_pages(get_text, total_pages)
return common_header, last_begin_index return common_header, last_begin_index
except Exception as e_pypdf2:
print(f"get_start_and_common_header: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 读取失败,尝试使用 PyMuPDFfitz
try:
with fitz.open(input_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
last_begin_index = process_pages(get_text, total_pages)
return common_header, last_begin_index return common_header, last_begin_index
except Exception as e: except Exception as e_fitz:
print(f"Error in get_start_and_common_header: {e}") print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "", -1 # 根据需求调整返回值 return common_header, -1
finally:
# 如果使用的是 PyMuPDF记得关闭文档
if pdf_document and hasattr(pdf_document, "close"):
pdf_document.close()
def check_pdf_pages(pdf_path, mode, logger): def check_pdf_pages(pdf_path, mode, logger):
pdf_document = None def generate_invalid_return():
# 根据 mode 返回不同数量的空字符串及 pdf_path
return (True, ['', '', '', '', '', '', pdf_path, '']) if mode == 'goods' else (
True, ['', '', '', '', '', pdf_path, ''])
try: try:
try: try:
# 尝试使用 PyPDF2 读取 PDF新版可直接传入文件路径 # 尝试使用 PyPDF2 读取 PDF新版可直接传入文件对象
pdf_document = PdfReader(pdf_path) with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
num_pages = len(pdf_document.pages) num_pages = len(pdf_document.pages)
# logger.info(f"使用 PyPDF2 成功读取 PDF '{pdf_path}' 的总页数为: {num_pages}")
except Exception as e_pypdf2: except Exception as e_pypdf2:
logger.warning(f"check_pdf_pages: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}") logger.warning(f"check_pdf_pages: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 读取失败,尝试使用 PyMuPDF 读取 PDF # 如果 PyPDF2 读取失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path) with fitz.open(pdf_path) as pdf_document:
num_pages = pdf_document.page_count num_pages = pdf_document.page_count
if num_pages <= 30: if num_pages <= 30:
logger.info("PDFinvalid_path页数小于或等于30页跳过切分逻辑。") logger.info("PDFinvalid_path页数小于或等于30页跳过切分逻辑。")
if mode == 'goods': return generate_invalid_return()
return True, ['', '', '', '', '', '', pdf_path, '']
else:
return True, ['', '', '', '', '', pdf_path, '']
# 若页数大于30页返回 False 表示继续处理 # 若页数大于30页返回 False 表示继续处理
return False, [] return False, []
except Exception as e: except Exception as e:
logger.error(f"无法读取 PDF 页数: {e}") logger.error(f"无法读取 PDF 页数: {e}")
if mode == 'goods': return generate_invalid_return()
return True, ['', '', '', '', '', '', pdf_path, '']
else:
return True, ['', '', '', '', '', pdf_path, '']
finally:
# 如果使用的是 PyMuPDF 或其他需要关闭的对象,确保关闭资源
if pdf_document and hasattr(pdf_document, "close"):
pdf_document.close()
def save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header): def extract_and_save_pages(pdf_document, from_page, to_page, output_path, is_using_pypdf2):
pdf_document = None """
using_pypdf2 = False 抽取 PDF 某个范围 [from_page, to_page] 的页面并保存到 output_path
根据 is_using_pypdf2 判断使用 PyPDF2 还是 PyMuPDF
"""
if is_using_pypdf2:
# 使用 PyPDF2
writer = PdfWriter()
for page_num in range(from_page, to_page + 1):
try:
writer.add_page(pdf_document.pages[page_num])
except Exception as e_page:
print(f"添加第 {page_num} 页到 writer 时出错: {e_page}")
continue
try: try:
with open(output_path, 'wb') as f_out:
writer.write(f_out)
print(f"已截取并保存页面 {from_page} - {to_page}{output_path}")
except Exception as e_write:
print(f"保存时出错: {e_write}")
return ""
return output_path
else:
# 使用 PyMuPDF
output_doc = fitz.open()
try:
for page_num in range(from_page, to_page + 1):
try:
output_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num)
except Exception as e_page:
print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}")
continue
output_doc.save(output_path)
print(f"已截取并保存页面 {from_page} - {to_page}{output_path}")
return output_path
except Exception as e_write:
print(f"保存时出错: {e_write}")
return ""
finally:
output_doc.close()
def save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header):
# 若 start_page 或 end_page 无效,直接退出
if start_page is None or end_page is None: if start_page is None or end_page is None:
print("错误: start_page 或 end_page 为 None") print("错误: start_page 或 end_page 为 None")
return "" return ""
def process_document(pdf_document, total_pages, using_pypdf2, get_text):
# 尝试使用 PyPDF2 读取 PDF新版可以直接传入路径 # 检查页面范围
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
# 定义提取文本函数
get_text = create_get_text_function('pypdf2', pdf_document)
using_pypdf2 = True
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"save_extracted_pages: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
try:
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
using_pypdf2 = False
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"save_extracted_pages: 使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return ""
# 检查页面范围的有效性
if start_page < 0 or end_page >= total_pages or start_page > end_page: if start_page < 0 or end_page >= total_pages or start_page > end_page:
print(f"无效的页面范围: {start_page}{end_page}") print(f"无效的页面范围: {start_page}{end_page}")
return "" return ""
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf") output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
# 如果 output_suffix 为 'notice' 并且 start_page > 0则额外保存“目录前”的页面
# 如果 output_suffix 为 'notice' 并且 start_page > 0则另存“目录前”的页面
if output_suffix == 'notice' and start_page > 0: if output_suffix == 'notice' and start_page > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf") before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
toc_page = -1 toc_page = -1
# 先遍历从 0 到 start_page - 1尝试查找“目录”文字
for page_num in range(min(start_page, total_pages)): for page_num in range(min(start_page, total_pages)):
try: try:
text = get_text(page_num) text = get_text(page_num)
except Exception as e_extract:
print(f"提取第 {page_num} 页文本时出错: {e_extract}")
continue
cleaned_text = clean_page_content(text, common_header) cleaned_text = clean_page_content(text, common_header)
if regex.search(r'\s*录', cleaned_text, regex.MULTILINE): if regex.search(r'\s*录', cleaned_text, regex.MULTILINE):
toc_page = page_num toc_page = page_num
break break
except Exception as e_extract:
print(f"提取第 {page_num} 页文本时出错: {e_extract}")
continue
# toc_page 找到则提取到 toc_page否则提取到 start_page - 1
pages_to_extract = toc_page + 1 if toc_page != -1 else start_page pages_to_extract = toc_page + 1 if toc_page != -1 else start_page
if using_pypdf2: # 如果找到了页面可提取,且 pages_to_extract > 0
before_doc = PdfWriter() if pages_to_extract > 0:
for page_num in range(pages_to_extract): extract_and_save_pages(pdf_document, 0, pages_to_extract - 1, before_pdf_path, using_pypdf2)
try:
before_doc.add_page(pdf_document.pages[page_num])
except Exception as e_page:
print(f"添加第 {page_num} 页到 before_doc 时出错: {e_page}")
continue
try:
with open(before_pdf_path, 'wb') as f_before:
before_doc.write(f_before)
print(f"目录前的页面已保存到 {before_pdf_path}")
except Exception as e_write_before:
print(f"保存目录前的页面时出错: {e_write_before}")
else:
before_doc = fitz.open()
for page_num in range(pages_to_extract):
try:
before_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num)
except Exception as e_page:
print(f"添加第 {page_num} 页到 before_doc 时出错: {e_page}")
continue
try:
before_doc.save(before_pdf_path)
before_doc.close()
print(f"目录前的页面已保存到 {before_pdf_path}")
except Exception as e_write_before:
print(f"保存目录前的页面时出错: {e_write_before}")
# 提取并保存目标页面 # 最后提取并保存目标页面
if using_pypdf2: return extract_and_save_pages(pdf_document, start_page, end_page, output_pdf_path, using_pypdf2)
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1): # 第一阶段:尝试使用 PyPDF2
try: try:
output_doc.add_page(pdf_document.pages[page_num]) with open(pdf_path, "rb") as pdf_file:
except Exception as e_page: # 这里构造 PdfReader确保上下文结束后 pdf_file 会被关闭
print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}") pdf_document = PdfReader(pdf_file)
continue total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
using_pypdf2 = True
# 在该 with 块内调用处理函数
return process_document(pdf_document, total_pages, using_pypdf2, get_text)
except Exception as e_pypdf2:
print(f"使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 第二阶段PyPDF2 失败后再尝试 PyMuPDF
try: try:
with open(output_pdf_path, 'wb') as f_out: with fitz.open(pdf_path) as pdf_document:
output_doc.write(f_out) total_pages = pdf_document.page_count
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}") get_text = create_get_text_function('fitz', pdf_document)
return output_pdf_path using_pypdf2 = False
except Exception as e_write:
print(f"保存截取的页面时出错: {e_write}") # 在该 with 块内调用处理函数
return process_document(pdf_document, total_pages, using_pypdf2, get_text)
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" return ""
else:
output_doc = fitz.open()
try:
for page_num in range(start_page, end_page + 1):
try:
output_doc.insert_pdf(pdf_document, from_page=page_num, to_page=page_num)
except Exception as e_page:
print(f"添加第 {page_num} 页到 output_doc 时出错: {e_page}")
continue
output_doc.save(output_pdf_path)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
except Exception as e_write:
print(f"保存截取的页面时出错: {e_write}")
return ""
finally:
output_doc.close()
except Exception as e:
print(f"发生未知错误: {e}")
return ""
finally:
# 如果使用的是 PyMuPDF 或其他需要关闭的对象,确保关闭资源
if pdf_document and not using_pypdf2 and hasattr(pdf_document, "close"):
pdf_document.close()
def is_pdf_or_doc(filename): def is_pdf_or_doc(filename):
# 判断文件是否为PDF或Word文档 # 判断文件是否为PDF或Word文档
return filename.lower().endswith(('.pdf', '.doc', '.docx')) return filename.lower().endswith(('.pdf', '.doc', '.docx'))
def convert_to_pdf(file_path):
# 假设 docx2pdf 函数已经被定义,这里仅根据文件扩展名来决定是否需要转换
if file_path.lower().endswith(('.doc', '.docx')):
return docx2pdf(file_path)
return file_path
def get_invalid_file(pdf_path, output_folder, common_header, begin_page): def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
""" """
从指定的PDF文件中提取无效部分并保存到输出文件夹中 从指定的PDF文件中提取无效部分并保存到输出文件夹中
@ -313,15 +258,12 @@ def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
pdf_document = None pdf_document = None
try: try:
# 尝试使用 PyPDF2 读取 PDF # 尝试使用 PyPDF2 读取 PDF,使用 with open 打开文件
try: try:
pdf_document = PdfReader(pdf_path) with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
get_text = create_get_text_function('pypdf2', pdf_document) get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages) total_pages = len(pdf_document.pages)
except Exception as e:
pdf_document = fitz.open(pdf_path)
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
if total_pages <= 50: if total_pages <= 50:
print(f"PDF页数({total_pages}) <= 50直接返回文件路径: {pdf_path}") print(f"PDF页数({total_pages}) <= 50直接返回文件路径: {pdf_path}")
@ -333,6 +275,20 @@ def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
text = get_text(i) text = get_text(i)
cleaned_text = clean_page_content(text, common_header) if text else "" cleaned_text = clean_page_content(text, common_header) if text else ""
page_texts.append(cleaned_text) page_texts.append(cleaned_text)
except Exception as e:
# 如果 PyPDF2 读取失败,则使用 PyMuPDF 读取
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
if total_pages <= 50:
print(f"PDF页数({total_pages}) <= 50直接返回文件路径: {pdf_path}")
return pdf_path, 0
# 提取并清理每页的文本内容
page_texts = []
for i in range(total_pages):
text = get_text(i)
cleaned_text = clean_page_content(text, common_header) if text else ""
page_texts.append(cleaned_text)
# 内部函数:查找起始页 # 内部函数:查找起始页
def find_start_page(begin_page): def find_start_page(begin_page):
@ -420,36 +376,23 @@ def get_invalid_file(pdf_path, output_folder, common_header, begin_page):
finally: finally:
# 确保释放资源(对 fitz 的 pdf_document 需要关闭) # 确保释放资源(对 fitz 的 pdf_document 需要关闭)
if pdf_document and hasattr(pdf_document, "close"): if pdf_document and isinstance(pdf_document, fitz.Document):
pdf_document.close() pdf_document.close()
def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None, def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header,
output_suffix="normal", invalid_endpage=-1): exclusion_pattern=None, output_suffix="normal", invalid_endpage=-1):
def process_pages(get_text, total_pages):
start_page = None start_page = None
end_page = None end_page = None
flag = True flag = True
# 先尝试使用 PyPDF2
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
except Exception as e_pypdf2:
print(f"extract_pages_generic:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 失败,就再试 PyMuPDF
try:
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return None, None
end_limit = total_pages if invalid_endpage == -1 else min(invalid_endpage, total_pages) end_limit = total_pages if invalid_endpage == -1 else min(invalid_endpage, total_pages)
for i in range(begin_page, end_limit): for i in range(begin_page, end_limit):
text = get_text(i) text = get_text(i)
if text: if text:
cleaned_text = clean_page_content(text, common_header) cleaned_text = clean_page_content(text, common_header)
if output_suffix == "tobidders_notice2": #extract_pages_twice_tobidders_notice会调用该代码 if output_suffix == "tobidders_notice2":
catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE) catalog_pattern = regex.compile(r'\s*目\s*录\s*$', regex.MULTILINE)
if exclusion_pattern and flag and (start_page is not None) and regex.search(exclusion_pattern, cleaned_text): if exclusion_pattern and flag and (start_page is not None) and regex.search(exclusion_pattern, cleaned_text):
flag = False flag = False
@ -457,7 +400,7 @@ def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, comm
if begin_page == 0 and catalog_pattern.search(cleaned_text): if begin_page == 0 and catalog_pattern.search(cleaned_text):
continue # 如果存在目录,跳过当前页面 continue # 如果存在目录,跳过当前页面
else: else:
# 一般投标文件的编制、组成在'投标人须知前附表/正文'中需要防止begin_pattern匹配到这部分内容所以在start_page is None的时候使用该exclusion_pattern # 防止匹配到正文前附表/正文部分
if exclusion_pattern and flag and (start_page is None) and regex.search(exclusion_pattern, cleaned_text): if exclusion_pattern and flag and (start_page is None) and regex.search(exclusion_pattern, cleaned_text):
flag = False flag = False
continue continue
@ -466,21 +409,38 @@ def extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, comm
start_page = i start_page = i
continue continue
if start_page is not None: if start_page is not None:
if output_suffix in ["tobidders_notice", "notice", "tobidders_notice2"]: # 使用列表判断多个匹配项 if output_suffix in ["tobidders_notice", "notice", "tobidders_notice2"]:
# 判断 end_pattern 是否匹配且当前页大于起始页
if regex.search(end_pattern, cleaned_text) and i > start_page: if regex.search(end_pattern, cleaned_text) and i > start_page:
end_page = i end_page = i
break break
else: else:
# 如果 end_pattern 匹配且 begin_pattern 不匹配
if regex.search(end_pattern, cleaned_text) and not regex.search(begin_pattern, cleaned_text): if regex.search(end_pattern, cleaned_text) and not regex.search(begin_pattern, cleaned_text):
end_page = i end_page = i
break break
# 如果使用的是 PyMuPDF需要手动关闭文档
if pdf_document and isinstance(pdf_document, fitz.Document):
pdf_document.close()
return start_page, end_page return start_page, end_page
# 尝试使用 PyPDF2并使用 with open 打开文件
try:
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
start_page, end_page = process_pages(get_text, total_pages)
return start_page, end_page
except Exception as e_pypdf2:
print(f"extract_pages_generic: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 失败,尝试使用 PyMuPDF
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
start_page, end_page = process_pages(get_text, total_pages)
return start_page, end_page
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return None, None
def generate_mid_pattern(chapter_type=None): def generate_mid_pattern(chapter_type=None):
""" """
@ -565,7 +525,7 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
""" """
exclusion_pattern = regex.compile( exclusion_pattern = regex.compile(
r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$', regex.MULTILINE) r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$', regex.MULTILINE)
def run_extraction(begin_pattern, extraction_stage='first'): def run_extraction(begin_pattern,get_text,total_pages,extraction_stage='first'):
""" """
使用提供的 begin_pattern 运行提取过程 使用提供的 begin_pattern 运行提取过程
根据 extraction_stage 判断是第一次提取还是第三次提取 根据 extraction_stage 判断是第一次提取还是第三次提取
@ -649,23 +609,7 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
path2 = path1 path2 = path1
return path1, path2 return path1, path2
pdf_document = None def process_extraction(get_text, total_pages):
try:
try:
pdf_document = PdfReader(pdf_path)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
except Exception as e_pypdf2:
print(f"extract_pages_tobidders_notice:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
pdf_document = fitz.open(pdf_path)
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "", "" # 或者根据需求抛出异常
# 第一次提取尝试,使用初始的 begin_pattern # 第一次提取尝试,使用初始的 begin_pattern
begin_pattern = regex.compile( begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+|' r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+|'
@ -673,21 +617,28 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
r'(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+\s*$', r'(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知+\s*$',
regex.MULTILINE regex.MULTILINE
) )
start_page, mid_page, end_page = run_extraction(begin_pattern, extraction_stage='first') start_page, mid_page, end_page = run_extraction(begin_pattern,get_text,total_pages, extraction_stage='first')
# 如果第一次提取成功,保存并返回路径
if start_page is not None and mid_page is not None and end_page is not None: if start_page is not None and mid_page is not None and end_page is not None:
return perform_extraction_and_save(start_page, mid_page, end_page) return perform_extraction_and_save(start_page, mid_page, end_page)
print(f"第一次提取tobidders_notice失败尝试第二次提取: {pdf_path}") print(f"第一次提取tobidders_notice失败尝试第二次提取: {pdf_path}")
start_page1, end_page1, start_page2, end_page2 = extract_pages_twice_tobidders_notice(pdf_path, common_header, begin_page) start_page1, end_page1, start_page2, end_page2 = extract_pages_twice_tobidders_notice(
pdf_path, common_header, begin_page
if start_page1 is not None and end_page1 is not None and start_page2 is not None and end_page2 is not None: )
if (start_page1 is not None and end_page1 is not None and
start_page2 is not None and end_page2 is not None):
if end_page1 == start_page2: if end_page1 == start_page2:
mid_page = end_page1 mid_page = end_page1
return perform_extraction_and_save(start_page1, mid_page, end_page2) return perform_extraction_and_save(start_page1, mid_page, end_page2)
else: else:
path1=save_extracted_pages(pdf_path,output_folder,start_page1,end_page1,"tobidders_notice_part1",common_header) path1 = save_extracted_pages(
path2=save_extracted_pages(pdf_path,output_folder,start_page2,end_page2,"tobidders_notice_part2",common_header) pdf_path, output_folder, start_page1, end_page1,
"tobidders_notice_part1", common_header
)
path2 = save_extracted_pages(
pdf_path, output_folder, start_page2, end_page2,
"tobidders_notice_part2", common_header
)
return path1, path2 return path1, path2
print("第二次提取tobidders_notice失败尝试第三次提取!") print("第二次提取tobidders_notice失败尝试第三次提取!")
@ -696,52 +647,72 @@ def extract_pages_tobidders_notice(pdf_path, output_folder, begin_page, common_h
r'(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知前附表\s*$', r'(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知前附表\s*$',
regex.MULTILINE regex.MULTILINE
) )
start_page, mid_page, end_page = run_extraction(new_begin_pattern, extraction_stage='third') start_page, mid_page, end_page = run_extraction(new_begin_pattern, get_text, total_pages, extraction_stage='third')
if start_page is not None and mid_page is not None and end_page is not None: if start_page is not None and mid_page is not None and end_page is not None:
return perform_extraction_and_save(start_page, mid_page, end_page) return perform_extraction_and_save(start_page, mid_page, end_page)
else: else:
# 所有提取尝试均失败
print(f"三次提取tobidders_notice均失败!! {pdf_path}") print(f"三次提取tobidders_notice均失败!! {pdf_path}")
return "", "" return "", ""
try:
try:
# 尝试使用 PyPDF2 读取 PDF 文件
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
return process_extraction(get_text, total_pages)
except Exception as e_pypdf2:
print(f"extract_pages_tobidders_notice: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 如果 PyPDF2 读取失败,则使用 PyMuPDF 读取 PDF 文件
with fitz.open(pdf_path) as pdf_document:
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
return process_extraction(get_text, total_pages)
except Exception as e: except Exception as e:
print(f"处理文件 {pdf_path} 时发生错误: {e}") print(f"处理文件 {pdf_path} 时发生错误: {e}")
return "", "" return "", ""
finally:
# 无论如何都关闭 pdf_document对于 fitz 对象)
if pdf_document and hasattr(pdf_document, "close"):
pdf_document.close()
def extract_pages_twice_tobidders_notice(pdf_path, common_header, begin_page): def extract_pages_twice_tobidders_notice(pdf_path, common_header, begin_page):
# 使用 with open 确保文件流在读取后关闭,避免文件句柄泄露 # 使用 with open 确保文件流在读取后关闭,避免文件句柄泄露
try: try:
pdf_document = PdfReader(pdf_path) with open(pdf_path, "rb") as f:
except Exception as e: pdf_document = PdfReader(f)
print(f"读取 PDF 文件失败: {e}")
return None, None, None, None # 内部函数:设置所有正则匹配模式和排除规则
# 投标须知前附表占一章、投标须知正文占一章 def setup_patterns():
output_suffix = "tobidders_notice"
begin_pattern = regex.compile( begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知)+', r'^第[一二三四五六七八九十百千]+(?:章|部分).*(?:(?:投标人?|磋商|谈判|供应商|应答人|比选申请人).*须知)+',
regex.MULTILINE regex.MULTILINE
) )
second_begin_pattern = regex.compile( second_begin_pattern = regex.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*([\u4e00-\u9fff]+)', regex.MULTILINE # 捕获中文部分 r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*([\u4e00-\u9fff]+)',
regex.MULTILINE
) )
# 此处 end_pattern 默认采用 second_begin_pattern
end_pattern = second_begin_pattern end_pattern = second_begin_pattern
exclusion_words = ["合同", "评标", "开标", "评审", "采购", "资格"] # 在这里添加需要排除的关键词
exclusion_pattern = regex.compile( exclusion_pattern = regex.compile(
r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$', regex.MULTILINE) r'(文件的构成|文件的组成|文件组成|文件构成|文件的编制|文件编制)\s*$',
regex.MULTILINE
)
exclusion_words = ["合同", "评标", "开标", "评审", "采购", "资格"]
return begin_pattern, second_begin_pattern, end_pattern, exclusion_pattern, exclusion_words
# 初始化正则模式和排除规则
begin_pattern, second_begin_pattern, end_pattern, exclusion_pattern, exclusion_words = setup_patterns()
output_suffix = "tobidders_notice"
# 提取第一部分 # 提取第一部分
start_page1, end_page1 = extract_pages_generic(pdf_path, begin_pattern, end_pattern, begin_page, common_header, start_page1, end_page1 = extract_pages_generic(
exclusion_pattern, output_suffix) pdf_path, begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern, output_suffix
)
if start_page1 is None or end_page1 is None: if start_page1 is None or end_page1 is None:
return None, None, None, None return None, None, None, None
# 提取第二部分 先检查end_page1页面的内容 # 提取第二部分:先检查 end_page1 页的内容
text = get_text_pypdf2(pdf_document, end_page1) text = get_text_pypdf2(pdf_document, end_page1)
cleaned_text = clean_page_content(text, common_header) cleaned_text = clean_page_content(text, common_header)
match = end_pattern.search(cleaned_text) match = end_pattern.search(cleaned_text)
@ -750,19 +721,23 @@ def extract_pages_twice_tobidders_notice(pdf_path, common_header, begin_page):
chapter_title = match.group(1) if match.groups() else "" chapter_title = match.group(1) if match.groups() else ""
# 检查是否包含排除关键词 # 检查是否包含排除关键词
if any(word in chapter_title for word in exclusion_words): if any(word in chapter_title for word in exclusion_words):
# 如果包含排除关键词,直接返回相同的路径 # 如果包含排除关键词,则将第二部分的起始模式设置为与第一部分相同
second_begin_pattern=begin_pattern #投标人须知前附表和投标人须知正文不是紧挨着的eg:(发布稿)恒丰理财有限责任公司智能投研终端项目竞争性磋商 second_begin_pattern = begin_pattern
# 如果不包含排除关键词,继续提取第二部分
output_suffix='tobidders_notice2'
begin_page=end_page1-1 #传入start_page2-1是因为非notice截取->start_page>begin_page
start_page2, end_page2 = extract_pages_generic(pdf_path, second_begin_pattern, end_pattern, begin_page, common_header,
exclusion_pattern, output_suffix)
output_suffix = "tobidders_notice2"
# 对于第二部分,传入的 begin_page 改为 end_page1 - 1保证 start_page > begin_page
begin_page2 = end_page1 - 1
start_page2, end_page2 = extract_pages_generic(
pdf_path, second_begin_pattern, end_pattern, begin_page2, common_header,
exclusion_pattern, output_suffix
)
if start_page2 is None or end_page2 is None: if start_page2 is None or end_page2 is None:
return start_page1, end_page1, end_page1, end_page1 return start_page1, end_page1, end_page1, end_page1
return start_page1, end_page1, start_page2, end_page2 return start_page1, end_page1, start_page2, end_page2
except Exception as e:
print(f"读取 PDF 文件失败: {e}")
return None, None, None, None
def get_notice(pdf_path, output_folder, begin_page, common_header,invalid_endpage): def get_notice(pdf_path, output_folder, begin_page, common_header,invalid_endpage):
begin_pattern = regex.compile( begin_pattern = regex.compile(

View File

@ -5,6 +5,23 @@ from PyPDF2 import PdfReader
from docx import Document from docx import Document
import queue import queue
from queue import Queue, Empty from queue import Queue, Empty
# 定义一个函数用于提取文本
def get_text_pypdf2(pdf_doc, page_num):
return pdf_doc.pages[page_num].extract_text() or ""
def get_text_fitz(pdf_doc, page_num):
return pdf_doc.load_page(page_num).get_text() or ""
# 定义一个内部函数来绑定 pdf_document 对象
def create_get_text_function(pdf_lib, pdf_doc):
if pdf_lib == 'pypdf2':
return lambda page_num: get_text_pypdf2(pdf_doc, page_num)
elif pdf_lib == 'fitz':
return lambda page_num: get_text_fitz(pdf_doc, page_num)
else:
raise ValueError("不支持的 PDF 库。")
def extract_common_header(pdf_path): def extract_common_header(pdf_path):
""" """
提取 PDF 文件的公共页眉 提取 PDF 文件的公共页眉
@ -16,17 +33,11 @@ def extract_common_header(pdf_path):
- common_header: 公共页眉内容字符串如果未找到则返回空字符串 - common_header: 公共页眉内容字符串如果未找到则返回空字符串
""" """
def get_headers(pdf_document, start_page, pages_to_read, is_pypdf2): def get_headers(get_text, start_page, pages_to_read,total_pages):
headers = [] headers = []
for i in range(start_page, min(start_page + pages_to_read, for i in range(start_page, min(start_page + pages_to_read,total_pages)):
len(pdf_document.pages) if is_pypdf2 else pdf_document.page_count)):
try: try:
if is_pypdf2: text=get_text(i)
page = pdf_document.pages[i]
text = page.extract_text() or ""
else:
page = pdf_document.load_page(i)
text = page.get_text() or ""
if text: if text:
# 只取每页的前三行,去除前后的空白字符 # 只取每页的前三行,去除前后的空白字符
first_lines = [line.strip() for line in text.strip().split('\n')[:3]] first_lines = [line.strip() for line in text.strip().split('\n')[:3]]
@ -71,25 +82,7 @@ def extract_common_header(pdf_path):
return common_headers return common_headers
try: def extract_common_header_main(get_text,total_pages):
# 尝试使用 PyPDF2 读取 PDF
try:
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
is_pypdf2 = True
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
is_pypdf2 = False
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" # 或者根据需求抛出异常
# 定义两个提取策略 # 定义两个提取策略
strategies = [] strategies = []
if total_pages >= 3: if total_pages >= 3:
@ -114,8 +107,8 @@ def extract_common_header(pdf_path):
common_headers = [] common_headers = []
for idx, (start, count) in enumerate(strategies): for idx, (start, pages_to_read) in enumerate(strategies):
headers = get_headers(pdf_document, start, count, is_pypdf2) headers = get_headers(get_text, start, pages_to_read,total_pages)
if len(headers) < 2: if len(headers) < 2:
continue # 需要至少2页来比较 continue # 需要至少2页来比较
@ -125,9 +118,29 @@ def extract_common_header(pdf_path):
# print(f"使用策略{idx + 1}找到共同的页眉: {common_headers}") # print(f"使用策略{idx + 1}找到共同的页眉: {common_headers}")
break # 找到共同部分后退出 break # 找到共同部分后退出
# 如果没有找到,继续下一个策略 # 如果没有找到,继续下一个策略
return '\n'.join(common_headers) return '\n'.join(common_headers)
try:
# 尝试使用 PyPDF2 读取 PDF
try:
with open(pdf_path, "rb") as pdf_file:
pdf_document = PdfReader(pdf_file)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
return extract_common_header_main(get_text,total_pages)
except Exception as e_pypdf2:
print(f"extract_common_header:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
return extract_common_header_main(get_text,total_pages)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"extract_common_header:使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" # 或者根据需求抛出异常
# 执行策略1和策略2先执行策略1中间3页或根据页数调整然后执行策略2前三页或根据页数调整 # 执行策略1和策略2先执行策略1中间3页或根据页数调整然后执行策略2前三页或根据页数调整
# 获取两个策略的公共前缀:分别获取两个策略提取的公共页眉。 # 获取两个策略的公共前缀:分别获取两个策略提取的公共页眉。
# 选择最长的公共前缀如果两个策略的前缀之间存在包含关系选择最长的那个。如果没有包含关系则返回策略1提取的前缀。 # 选择最长的公共前缀如果两个策略的前缀之间存在包含关系选择最长的那个。如果没有包含关系则返回策略1提取的前缀。
@ -303,7 +316,7 @@ def is_pure_image(docx_path, percentage=0.3):
return True return True
if __name__ == '__main__': if __name__ == '__main__':
pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\bf0346dc-8711-43f5-839e-e4076acea84a\ztbfile.pdf" pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\971a834e-5cf5-4259-ad85-320aaca051e0\ztbfile.pdf"
res=is_scanned_pdf(pdf_path) res=is_scanned_pdf(pdf_path)
if res: if res:
print("扫描型") print("扫描型")

View File

@ -141,7 +141,7 @@ def save_extracted_text_to_txt(pdf_path, txt_path):
if __name__ == '__main__': if __name__ == '__main__':
# file_path='D:\\flask_project\\flask_app\\static\\output\\output1\\648e094b-e677-47ce-9073-09e0c82af210\\ztbfile_tobidders_notice_part2.pdf' # file_path='D:\\flask_project\\flask_app\\static\\output\\output1\\648e094b-e677-47ce-9073-09e0c82af210\\ztbfile_tobidders_notice_part2.pdf'
pdf_path=r"C:\Users\Administrator\Desktop\新建文件夹 (3)\ztbfile(1).pdf" pdf_path=r'C:\Users\Administrator\Downloads\完全没有解析进度1_南网超高压公司2024年第四批服务公开招标项目2024-FW-4-S-ZB2 (1).pdf'
# pdf_path=r"C:\Users\Administrator\Desktop\货物标\output2\广水市妇幼招标文件最新W改_evaluation_method.pdf" # pdf_path=r"C:\Users\Administrator\Desktop\货物标\output2\广水市妇幼招标文件最新W改_evaluation_method.pdf"
# file_path = 'C:\\Users\\Administrator\\Desktop\\货物标\\截取test\\交警支队机动车查验监管系统项目采购_tobidders_notice_part1.pdf' # file_path = 'C:\\Users\\Administrator\\Desktop\\货物标\\截取test\\交警支队机动车查验监管系统项目采购_tobidders_notice_part1.pdf'
# file_path = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest8.pdf" # file_path = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest8.pdf"

View File

@ -4,7 +4,7 @@ import regex
import os import os
import time import time
from PyPDF2 import PdfReader from PyPDF2 import PdfReader
from flask_app.general.读取文件.clean_pdf import clean_page_content from flask_app.general.读取文件.clean_pdf import clean_page_content,create_get_text_function
from flask_app.general.截取pdf通用函数 import get_start_and_common_header, save_extracted_pages, get_invalid_file, \ from flask_app.general.截取pdf通用函数 import get_start_and_common_header, save_extracted_pages, get_invalid_file, \
extract_pages_tobidders_notice, get_notice, extract_pages_generic extract_pages_tobidders_notice, get_notice, extract_pages_generic
from flask_app.general.通用功能函数 import get_global_logger from flask_app.general.通用功能函数 import get_global_logger
@ -24,31 +24,16 @@ def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_patter
print(f"Error processing {pdf_path}: {e}") print(f"Error processing {pdf_path}: {e}")
return "" return ""
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, last_begin_index): def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, last_begin_index):
""" """
处理 PDF 文件作为 truncate_pdf_main 的后备方法 处理 PDF 文件作为 truncate_pdf_main 的后备方法
此函数将在所有模式对都失败后调用 当所有模式均失败后调用此方法
""" """
pdf_document=None if output_suffix != "qualification":
try:
try:
pdf_document = PdfReader(pdf_path)
is_pypdf2 = True
total_pages = len(pdf_document.pages)
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_pages_twice:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
pdf_document = fitz.open(pdf_path)
is_pypdf2 = False
total_pages = pdf_document.page_count
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return "" return ""
if output_suffix == "qualification":
print("最后一次尝试获取qualification!") print("最后一次尝试获取qualification!")
# 动态设置 include_keys # 动态设置关键字
include_keys = ["资格审查", "资质审查", "符合性审查", "资格性检查", "符合性检查", "资格检查", "能力", "信誉"] include_keys = ["资格审查", "资质审查", "符合性审查", "资格性检查", "符合性检查", "资格检查", "能力", "信誉"]
# 定义起始匹配模式,仅匹配“附录”、“附件”或“附表” # 定义起始匹配模式,仅匹配“附录”、“附件”或“附表”
begin_pattern = r'^(?:附录(?:[一1])?[:]|附件(?:[一1])?[:]|附表(?:[一1])?[:])' begin_pattern = r'^(?:附录(?:[一1])?[:]|附件(?:[一1])?[:]|附表(?:[一1])?[:])'
@ -62,57 +47,69 @@ def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, l
r'第[一二三四五六七八九]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$', r'第[一二三四五六七八九]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$',
regex.MULTILINE regex.MULTILINE
) )
def process_pdf(total_pages, get_text, include_keys, begin_pattern, end_pattern_attachment, end_pattern_chapter,
common_header, last_begin_index):
"""
遍历 PDF 的各页依据匹配规则确定起始页和结束页
"""
start_page = None start_page = None
end_page = None end_page = None
def get_text(page_num):
try:
if is_pypdf2:
return pdf_document.pages[page_num].extract_text() or ""
else:
return pdf_document.load_page(page_num).get_text() or ""
except Exception as e:
print(f"提取第 {page_num} 页文本时出错: {e}")
return ""
# 从指定的开始页开始遍历
for i in range(last_begin_index, total_pages): for i in range(last_begin_index, total_pages):
text = get_text(i) text = get_text(i)
if text: if not text:
cleaned_text = clean_page_content(text, common_header)
# 确定起始页需在last_begin_index之后
if any(key in cleaned_text for key in include_keys):
if regex.search(begin_pattern, cleaned_text, regex.MULTILINE):
if start_page is None:
start_page = i # 确保起始页不小于章节的开始页码
continue continue
# print(f"匹配到附录/附件/附表,设置起始页为: {start_page}") cleaned_text = clean_page_content(text, common_header)
# 确定结束页 - 附录、附件、附表等
if start_page is not None and end_pattern_attachment.search(cleaned_text): # 确定起始页:页面包含 include_keys 且匹配 begin_pattern
# 额外检查当前页面是否不包含任何 include_keys if start_page is None and any(key in cleaned_text for key in include_keys):
if not any(key in cleaned_text for key in include_keys): if regex.search(begin_pattern, cleaned_text, regex.MULTILINE):
if i > start_page: start_page = i
continue
# 一旦起始页确定,则查找结束页
if start_page is not None:
# 结束模式:附件类
if end_pattern_attachment.search(cleaned_text):
# 如果页面不包含 include_keys 且页码大于起始页,则认为找到了结束页
if not any(key in cleaned_text for key in include_keys) and i > start_page:
end_page = i end_page = i
# print(f"找到结束页 (附件类): {end_page}") break
break # 找到结束页后退出循环
else: else:
print(f"当前页面匹配附件结束模式但包含 include_keys继续查找。页面: {i}") print(f"当前页面匹配附件结束模式但包含 include_keys继续查找。页面: {i}")
# 确定结束页 - 章节标题、评标附表、投标人须知等 # 结束模式:章节标题等
elif start_page is not None and end_pattern_chapter.search(cleaned_text): elif end_pattern_chapter.search(cleaned_text) and i > start_page:
if i > start_page:
end_page = i end_page = i
# print(f"找到结束页 (章节标题等): {end_page}") break
break # 找到结束页后退出循环 return start_page, end_page
# 尝试使用 PyPDF2 方式读取 PDF
try:
with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
get_text = create_get_text_function('pypdf2', pdf_document)
total_pages = len(pdf_document.pages)
start_page, end_page = process_pdf(total_pages, get_text, include_keys, begin_pattern,
end_pattern_attachment, end_pattern_chapter,
common_header, last_begin_index)
except Exception as e_pypdf2:
print(f"extract_pages_twice: 使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
# 使用 PyMuPDF 作为备用方案
try:
with fitz.open(pdf_path) as pdf_document:
get_text = create_get_text_function('fitz', pdf_document)
total_pages = pdf_document.page_count
start_page, end_page = process_pdf(total_pages, get_text, include_keys, begin_pattern,
end_pattern_attachment, end_pattern_chapter,
common_header, last_begin_index)
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return ""
if start_page is None or end_page is None: if start_page is None or end_page is None:
print(f"{output_suffix} twice: 未找到起始或结束页在文件 {pdf_path} 中!") print(f"{output_suffix} twice: 未找到起始或结束页在文件 {pdf_path} 中!")
return "" return ""
else: else:
return save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header) return save_extracted_pages(pdf_path, output_folder, start_page, end_page, output_suffix, common_header)
return ""
except Exception as e:
print(f"Error in extract_pages_twice: {e}")
return ""
finally:
if pdf_document and hasattr(pdf_document, 'close'):
pdf_document.close()
def truncate_pdf_main_engineering(input_path, output_folder, selection, logger, output_suffix="default",invalid_endpage=-1): def truncate_pdf_main_engineering(input_path, output_folder, selection, logger, output_suffix="default",invalid_endpage=-1):

View File

@ -2,10 +2,10 @@ import fitz
from PyPDF2 import PdfReader from PyPDF2 import PdfReader
import regex # 导入正则表达式库 import regex # 导入正则表达式库
import os # 用于文件和文件夹操作 import os # 用于文件和文件夹操作
from flask_app.general.读取文件.clean_pdf import clean_page_content from flask_app.general.读取文件.clean_pdf import clean_page_content,create_get_text_function
from flask_app.general.merge_pdfs import merge_and_cleanup from flask_app.general.merge_pdfs import merge_and_cleanup
from flask_app.general.截取pdf通用函数 import get_start_and_common_header, save_extracted_pages, is_pdf_or_doc, \ from flask_app.general.截取pdf通用函数 import get_start_and_common_header, save_extracted_pages, is_pdf_or_doc, \
get_invalid_file, extract_pages_tobidders_notice, extract_pages_generic, get_notice, create_get_text_function get_invalid_file, extract_pages_tobidders_notice, extract_pages_generic, get_notice
from flask_app.general.通用功能函数 import get_global_logger from flask_app.general.通用功能函数 import get_global_logger
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, common_header,output_suffix,logger,invalid_endpage=-1): def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, common_header,output_suffix,logger,invalid_endpage=-1):
@ -36,10 +36,6 @@ def get_patterns_for_procurement():
end_pattern = regex.compile(r'(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)第[一二三四五六七八九1-9]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$', regex.MULTILINE) end_pattern = regex.compile(r'(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!"\s*)(?<!“\s*)(?<!”\s*)第[一二三四五六七八九1-9]+(?:章|部分)\s*[\u4e00-\u9fff、()]+\s*$', regex.MULTILINE)
return begin_pattern, end_pattern return begin_pattern, end_pattern
# """
# 第四章
# 评标
# """ 也能匹配上
def get_patterns_for_evaluation_method(): def get_patterns_for_evaluation_method():
begin_pattern = regex.compile( begin_pattern = regex.compile(
r'(?:(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!"\s*)(?<!“\s*)(?<!“\s*)第[一二三四五六七八九十1-9]+(?:章|部分)\s*' # 第一种模式 r'(?:(?<!见\s*)(?<!与\s*)(?<!同\s*)(?<!对应\s*)(?<!根据\s*)(?<!按照\s*)(?<!"\s*)(?<!“\s*)(?<!“\s*)第[一二三四五六七八九十1-9]+(?:章|部分)\s*' # 第一种模式
@ -78,73 +74,67 @@ def extract_pages_qualification(pdf_path, begin_page, common_header):
) )
print("第二次尝试 qualification:匹配附件") print("第二次尝试 qualification:匹配附件")
start_page = None
end_page = None
include_keywords = ["资格审查", "资质审查", "符合性审查","资格性审查", "符合性检查","资格性检查", "资格检查"] include_keywords = ["资格审查", "资质审查", "符合性审查","资格性审查", "符合性检查","资格性检查", "资格检查"]
exclude_keywords = ["声明函", "承诺函"] exclude_keywords = ["声明函", "承诺函"]
pdf_document = None
try: def _extract_pages_loop(get_text, total_pages, begin_page, common_header):
try: start_page = None
# 尝试使用 PyPDF2 读取 PDF end_page = None
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
# print("使用 PyPDF2 成功读取 PDF 文件。")
except Exception as e_pypdf2:
print(f"extract_pages_qualification:使用 PyPDF2 读取 PDF 失败: {e_pypdf2}")
try:
# 如果 PyPDF2 失败,尝试使用 PyMuPDF 读取 PDF
pdf_document = fitz.open(pdf_path)
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
# print("使用 PyMuPDF 成功读取 PDF 文件。")
except Exception as e_fitz:
print(f"使用 PyMuPDF 读取 PDF 也失败: {e_fitz}")
return None, None # 或者根据需求抛出异常
# 从指定的开始页开始遍历
for i in range(begin_page, total_pages): for i in range(begin_page, total_pages):
text = get_text(i) text = get_text(i)
if text: if text:
cleaned_text = clean_page_content(text, common_header) cleaned_text = clean_page_content(text, common_header)
# 优先检查是否匹配优先模式 # 1. 优先判断是否匹配优先模式
priority_match = priority_pattern.search(cleaned_text) priority_match = priority_pattern.search(cleaned_text)
if priority_match and start_page is None: if priority_match and start_page is None:
start_page = i start_page = i
print(f"匹配到priority_pattern设置起始页为: {start_page}") print(f"Matched priority_pattern, setting start_page to: {start_page}")
continue continue
else: else:
# 如果未匹配优先模式,则按照一般模式进行判断 # 2. 若未匹配优先模式,则判断是否同时满足包含和排除关键词条件
if ( if (any(keyword in cleaned_text for keyword in include_keywords) and
any(keyword in cleaned_text for keyword in include_keywords) and
all(keyword not in cleaned_text for keyword in exclude_keywords) and all(keyword not in cleaned_text for keyword in exclude_keywords) and
start_page is None start_page is None):
):
if begin_pattern.search(cleaned_text): if begin_pattern.search(cleaned_text):
start_page = i start_page = i
print(f"匹配到附录等模式begin_pattern设置起始页为: {start_page}") print(f"Matched begin_pattern, setting start_page to: {start_page}")
continue continue
# 确定结束页 - 附录、附件、附表等 # 3. 判断结束页:附件/附表结束模式
if start_page is not None and end_pattern_attachment.search(cleaned_text): if start_page is not None and end_pattern_attachment.search(cleaned_text):
# 额外检查当前页面是否不包含任何 include_keywords # 若当前页面不包含任何 include_keywords则认为找到了结束页
if not any(keyword in cleaned_text for keyword in include_keywords): if not any(keyword in cleaned_text for keyword in include_keywords):
if i > start_page: if i > start_page:
end_page = i end_page = i
print(f"qualification找到结束页 (附件类): {end_page}") print(f"Found end_page (attachment): {end_page}")
break # 找到结束页后退出循环 break
else: else:
print(f"当前页面匹配附件结束模式但包含 include_keywords继续查找。页面: {i}") print(f"Page {i} matches attachment pattern but contains include_keywords, continue.")
# 4. 判断结束页:章节标题结束模式
# 确定结束页 - 章节标题
elif start_page is not None and end_pattern_chapter.search(cleaned_text): elif start_page is not None and end_pattern_chapter.search(cleaned_text):
if i > start_page: if i > start_page:
end_page = i end_page = i
print(f"qualification找到结束页 (章节标题): {end_page}") print(f"Found end_page (chapter title): {end_page}")
break # 找到结束页后退出循环 break
return start_page, end_page return start_page, end_page
finally:
if pdf_document and hasattr(pdf_document, 'close'): try:
pdf_document.close() with open(pdf_path, "rb") as f:
pdf_document = PdfReader(f)
total_pages = len(pdf_document.pages)
get_text = create_get_text_function('pypdf2', pdf_document)
return _extract_pages_loop(get_text, total_pages, begin_page, common_header)
except Exception as e_pypdf2:
print(f"extract_pages_qualification: Using PyPDF2 failed: {e_pypdf2}")
# 如果 PyPDF2 读取失败,尝试使用 PyMuPDF
try:
with fitz.open(pdf_path) as pdf_document:
total_pages = pdf_document.page_count
get_text = create_get_text_function('fitz', pdf_document)
return _extract_pages_loop(get_text, total_pages, begin_page, common_header)
except Exception as e_fitz:
print(f"Using PyMuPDF to read PDF also failed: {e_fitz}")
return None, None
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, begin_page, logger): def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header, begin_page, logger):