zbparse/flask_app/main/截取pdf.py

689 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import os
from PyPDF2 import PdfReader, PdfWriter
from flask_app.general.merge_pdfs import merge_pdfs
import concurrent.futures
import logging
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
logger = None
def clean_page_content(text, common_header):
# 首先删除抬头公共部分
if common_header: # 确保有公共抬头才进行替换
for header_line in common_header.split('\n'):
if header_line.strip(): # 只处理非空行
# 替换首次出现的完整行
text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1)
# 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除
text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时
text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码
text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码
text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码
return text
def extract_common_header(pdf_path):
pdf_document = PdfReader(pdf_path)
headers = []
total_pages = len(pdf_document.pages)
# 确定要读取的页数和起始页
if total_pages == 2:
pages_to_read = 2
start_page = 0
else:
pages_to_read = 3
middle_page = total_pages // 2
start_page = max(0, middle_page - 1)
for i in range(start_page, min(start_page + pages_to_read, total_pages)):
page = pdf_document.pages[i]
text = page.extract_text() or ""
if text:
# 只取每页的前三行
first_lines = text.strip().split('\n')[:3]
headers.append(first_lines)
if len(headers) < 2:
return "" # 如果没有足够的页来比较,返回空字符串
# 寻找每一行中的公共部分,按顺序保留
common_headers = []
for lines in zip(*headers):
# 提取第一行的词汇顺序
first_words = lines[0].split()
# 筛选所有页面都包含的词汇,保持顺序
common_line = [word for word in first_words if all(word in line.split() for line in lines[1:])]
if common_line:
common_headers.append(' '.join(common_line))
return '\n'.join(common_headers)
def save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page, common_header):
try:
# 获取文件基本名称
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
# 构建主要输出文件路径
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
# 读取PDF文件
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
# 检查起始和结束页码是否有效
if start_page < 0 or end_page >= total_pages or start_page > end_page:
print(f"无效的页面范围: {start_page}{end_page}")
return ""
# 如果 output_suffix 是 'notice',保存 start_page 之前的页面(若遇到'目录',则截取到'目录')
if output_suffix == 'notice' and start_page > 0:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
before_doc = PdfWriter()
toc_page = -1
# 查找目录页
for page_num in range(min(start_page, total_pages)):
page_text = pdf_document.pages[page_num].extract_text()
cleaned_text = clean_page_content(page_text, common_header)
if re.search(r'\s*录', cleaned_text, re.MULTILINE):
toc_page = page_num
break
# 确定截取的页数
pages_to_extract = toc_page + 1 if toc_page != -1 else start_page
# 提取页面
for page_num in range(pages_to_extract):
before_doc.add_page(pdf_document.pages[page_num])
with open(before_pdf_path, 'wb') as f_before:
before_doc.write(f_before)
# print(f"已保存页面从 0 到 {pages_to_extract} 为 {before_pdf_path}")
# 提取指定范围的页面
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
output_doc.add_page(pdf_document.pages[page_num])
# 保存新的PDF文件
with open(output_pdf_path, 'wb') as f_output:
output_doc.write(f_output)
print(f"{output_suffix} 已截取并保存页面从 {start_page + 1}{end_page + 1}{output_pdf_path}")
return output_pdf_path
except Exception as e:
print(f"Error in save_pages_to_new_pdf: {e}")
return "" # 返回空字符串
def extract_pages_tobidders_notice(pdf_path, output_folder,begin_pattern, begin_page,common_header):
pdf_document = PdfReader(pdf_path)
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
def run_extraction():
start_page = None
mid_page = None
end_page = None
chapter_type = None # 用于存储“章”或“部分”
for i, page in enumerate(pdf_document.pages):
text = page.extract_text() or ""
cleaned_text = clean_page_content(text, common_header)
if exclusion_pattern and re.search(exclusion_pattern, cleaned_text) and mid_page is not None:
continue
if start_page is None:
match = re.search(begin_pattern, cleaned_text)
if match and i > begin_page:
start_page = i
matched_text = match.group(0) # 获取整个匹配的文本
if '' in matched_text:
chapter_type = ''
elif '部分' in matched_text:
chapter_type = '部分'
else:
chapter_type = None # 未匹配到“章”或“部分”
if chapter_type:
# 根据 chapter_type 动态生成 end_pattern
end_pattern = re.compile(
rf'^第[一二三四五六七八九十百千]+?(?:{chapter_type})\s*[\u4e00-\u9fff]+|' # Match chapter pattern
r'^评标办法前附表|' # Match "评标办法前附表" at the beginning of a line
r'^附录(?:一)?[:]|' # Match "附录一" with optional "一" and colon
r'^附件(?:一)?[:]|' # Match "附件一" with optional "一" and colon
r'^附表(?:一)?[:]', # Match "附表一" with optional "一" and colon
re.MULTILINE
)
# print(f"动态生成的 end_pattern: {end_pattern.pattern}") # 打印生成的 end_pattern
# 根据 chapter_type 动态生成 additional_mid_pattern
if chapter_type == '':
additional_mid_pattern = r'^第[一二三四五六七八九十百千]+?(?:部分)'
elif chapter_type == '部分':
additional_mid_pattern = r'^第[一二三四五六七八九十百千]+?(?:章)'
else:
additional_mid_pattern = ''
# 定义基础的 mid_pattern
base_mid_pattern = (
r'^\s*(?:[(]\s*[一1]?\s*[)]\s*[、..]*|[一1][、..]+|[、..]+)\s*(说\s*明|总\s*则)'
r'|^((投标人?|磋商|供应商|谈判供应商|磋商供应商)\s*须知正文部分)'
)
# 合并基础模式和额外模式
if additional_mid_pattern:
combined_mid_pattern = re.compile(
rf'(?:{base_mid_pattern})|(?:{additional_mid_pattern})',
re.MULTILINE
)
else:
combined_mid_pattern = re.compile(
rf'{base_mid_pattern}',
re.MULTILINE
)
# print(f"生成的 combined_mid_pattern: {combined_mid_pattern.pattern}") # 打印 combined_mid_pattern
else:
# 如果未匹配到“章”或“部分”,使用默认的 end_pattern 和 mid_pattern
end_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+|'
r'^评标办法前附表|' # Match "评标办法前附表" at the beginning of a line
r'^附录(?:一)?[:]|' # Match "附录一" with optional "一" and colon
r'^附件(?:一)?[:]|' # Match "附件一" with optional "一" and colon
r'^附表(?:一)?[:]', # Match "附表一" with optional "一" and colon
re.MULTILINE
)
# print(f"使用默认的 end_pattern: {end_pattern.pattern}") # 打印默认的 end_pattern
# 定义基础的 mid_pattern
base_mid_pattern = r'^\s*(?:[(]\s*[一1]?\s*[)]\s*[、..]*|[一1][、..]+|[、..]+)\s*(说\s*明|总\s*则)'
combined_mid_pattern = re.compile(
rf'{base_mid_pattern}',
re.MULTILINE
)
continue
if start_page is not None and mid_page is None and combined_mid_pattern:
if re.search(combined_mid_pattern, cleaned_text):
mid_page = i
if start_page is not None and mid_page is not None and chapter_type:
if re.search(end_pattern, cleaned_text):
if mid_page is None:
if i > start_page:
end_page = i
break
else:
if i > mid_page:
end_page = i
break
return start_page, mid_page, end_page
# 运行提取
start_page, mid_page, end_page = run_extraction()
if start_page is None or end_page is None or mid_page is None:
print(f"first: tobidders_notice 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。")
return "",""
path1 = save_pages_to_new_pdf(pdf_path, output_folder, "tobidders_notice_table", start_page,mid_page, common_header)
path2 = save_pages_to_new_pdf(pdf_path, output_folder, "tobidders_notice", mid_page,end_page, common_header)
return path1,path2
def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix,common_header,is_secondary_match=False):
# 打开PDF文件
pdf_document = PdfReader(pdf_path)
start_page = None
end_page = None
# 遍历文档的每一页,查找开始和结束短语的位置
for i in range(len(pdf_document.pages)):
page = pdf_document.pages[i]
text = page.extract_text()
if text:
cleaned_text = clean_page_content(text, common_header)
if not is_secondary_match:
if re.search(begin_pattern, cleaned_text) and i >= begin_page:
if output_suffix == "invalid" and start_page:
continue
else:
start_page = i
else:
if re.search(begin_pattern, cleaned_text) and i >=begin_page:
if output_suffix == "notice" and start_page: #考虑投标人须知前附表中有' 同招标公告 '的情况
pass
else:
start_page = i
if start_page is not None and re.search(end_pattern, cleaned_text):
condition = i > start_page
if condition:
is_invalid_condition = output_suffix == "invalid" and i > 30 # 这边默认无效投标至少有30页
if is_invalid_condition or output_suffix != "invalid":
end_page = i
break # 找到结束页后退出循环
# 移除对 extract_pages_twice 的调用
if start_page is None or end_page is None:
print(f"{output_suffix} first: 未找到起始或结束页在文件 {pdf_path} 中!")
return [""]
else:
return [save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page, common_header)]
# def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix,common_header,is_secondary_match):
# # 确保输出文件夹存在
# if not os.path.exists(output_folder):
# os.makedirs(output_folder)
# if os.path.isdir(input_path):
# generated_files = []
# # 遍历文件夹内的所有PDF文件
# for file in os.listdir(input_path):
# if file.endswith(".pdf"):
# pdf_path = os.path.join(input_path, file)
# output_pdf_path = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix,common_header,is_secondary_match)
# if output_pdf_path and os.path.isfile(output_pdf_path):
# generated_files.append(output_pdf_path)
# return generated_files
# elif os.path.isfile(input_path) and input_path.endswith(".pdf"):
# # 处理单个PDF文件
# output_pdf_path = extract_pages(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix,common_header,is_secondary_match)
# if output_pdf_path and os.path.isfile(output_pdf_path):
# return [output_pdf_path] # 以列表形式返回,以保持一致性
# else:
# print("提供的路径既不是文件夹也不是PDF文件。")
# return []
def get_start_and_common_header(input_path):
common_header = extract_common_header(input_path)
last_begin_index = 0
begin_pattern = re.compile(r'.*(?:招标公告|投标邀请书|投标邀请函)\s*$',re.MULTILINE)
pdf_document = PdfReader(input_path)
for i, page in enumerate(pdf_document.pages):
if i > 25:
return common_header, 0 # 如果页码大于25直接返回last_begin_index=0
text = page.extract_text()
if text:
cleaned_text = clean_page_content(text, common_header)
if begin_pattern.search(cleaned_text):
last_begin_index = i # 更新第一个匹配的索引页码从0开始
return common_header,last_begin_index
return common_header,last_begin_index
def truncate_pdf_main(input_path, output_folder, selection):
if os.path.isdir(input_path):
generated_files = []
for file_name in os.listdir(input_path):
if file_name.endswith('.pdf'):
file_path = os.path.join(input_path, file_name)
files = truncate_pdf_main(file_path, output_folder, selection)
if files:
generated_files.extend(files)
return generated_files
elif os.path.isfile(input_path) and input_path.endswith('.pdf'):
# base_file_name = os.path.splitext(os.path.basename(input_path))[0]
common_header,last_begin_index = get_start_and_common_header(input_path)
# print(last_begin_index)
if selection == 1:
# Selection 1: 投标人须知前附表
pattern_pairs = [
(
re.compile(
r'(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人?|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人?|磋商|供应商)须知前附表)'),
re.compile(
r'第[一二三四五六七八九十]+章\s*评标办法|^评标办法前附表|^附录(?:一)?[:]|^附件(?:一)?[:]|^附表(?:一)?[:]',
re.MULTILINE)
)
]
output_suffix="tobidders_notice"
elif selection == 2:
# Selection 2: 评标办法
pattern_pairs = [
(
re.compile(r'第[一二三四五六七八九十]+章\s*评标办法'), # Alternative begin pattern
re.compile(r'评标办法正文|评标办法') # Alternative end pattern
),
]
output_suffix = "evaluation_method"
elif selection == 3:
# Selection 3: 资格审查条件
pattern_pairs = [
(
re.compile(r'^(?:附录(?:[一1])?[:]|附件(?:[一1])?[:]|附表(?:[一1])?[:]).*(?:资质|能力|信誉).*$',
re.MULTILINE),
re.compile(
r'^(?:附录[一二三四五六七八九1-9]*[:]|附件[一二三四五六七八九1-9]*[:]|附表[一二三四五六七八九1-9]*[:])(?!.*(?:资质|能力|信誉)).*$'
r'^第[一二三四五六七八九1-9]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
)
]
output_suffix = "qualification"
elif selection == 4:
# Selection 4: 招标公告
pattern_pairs = [
(
re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书|邀请函).*|^第一卷|^投标邀请书'),
re.compile(r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE)
),
(
re.compile(r'.*(?:招标公告|投标邀请书|投标邀请函|第一卷)\s*$', re.MULTILINE),
re.compile(r".*(?:投标人须知|投标人须知前附表)\s*$", re.MULTILINE)
)
]
output_suffix = "notice"
elif selection == 5:
# Selection 5: 无效标
pattern_pairs = [
(
re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书|邀请函).*|^第一卷|^投标邀请书'),
re.compile(r'第[一二三四五六七八九十]+章\s*合同|[:]清标报告|^第二卷', re.MULTILINE)
),
(
re.compile(r'.*(?:招标公告|投标邀请书|投标邀请函|第一卷)\s*$', re.MULTILINE),
re.compile(r'第[一二三四五六七八九十]+章\s*合同|[:]清标报告|^第二卷', re.MULTILINE)
)
]
output_suffix = "invalid"
else:
print("无效的选择:请选择1-5")
return [""]
for idx, (begin_pattern, end_pattern) in enumerate(pattern_pairs, start=1):
is_secondary_match = (idx == 2)
begin_page = last_begin_index if last_begin_index != 0 else {
1: 3, #前附表
2: 10, #评标
3: 5, #资格
4: 0, #公告
5: 0 #无效标
}.get(selection, 0)
if selection == 1: #投标人须知
output_paths = list(
extract_pages_tobidders_notice(input_path,output_folder, begin_pattern, begin_page, common_header))
if output_paths and any(os.path.isfile(f) for f in output_paths):
return output_paths
else:
# print(f"Selection {selection}: 使用组合提取函数失败。尝试下一个模式对。")
continue
output_paths = extract_pages(
input_path,
output_folder,
begin_pattern,
begin_page=begin_page,
end_pattern=end_pattern,
output_suffix=output_suffix,
common_header=common_header,
is_secondary_match=is_secondary_match
)
if output_paths and any(os.path.isfile(f) for f in output_paths):
# print(f"Selection {selection}: 使用模式对 {idx} 成功提取。")
return output_paths # Return immediately upon successful extraction
else:
# print(f"Selection {selection}: 使用模式对 {idx} 失败。尝试下一个模式对。")
continue
# If all pattern pairs failed, attempt a fallback
print(f"Selection {selection}: 所有模式对都未匹配成功。尝试执行 extract_pages_twice 或返回空。")
fallback_result = extract_pages_twice(input_path, output_folder, output_suffix, common_header, last_begin_index)
if fallback_result and any(os.path.isfile(f) for f in fallback_result):
return fallback_result
else:
if selection == 1:
return ["", ""]
else:
return ['']
else:
print("Invalid input path. It is neither a PDF file nor a directory containing PDF files.")
return ['']
def extract_pages_twice(pdf_path, output_folder, output_suffix,common_header,last_begin_index):
"""
处理 PDF 文件,作为 truncate_pdf_main 的后备方法。
此函数将在所有模式对都失败后调用。
"""
try:
pdf_document = PdfReader(pdf_path)
if output_suffix == "qualification":
begin_pattern = r'^(?:附录(?:[一1])?[:]|附件(?:[一1])?[:]|附表(?:[一1])?[:])'
end_pattern = re.compile(
r'^(?:附录[一二三四五六七八九1-9]*[:]|附件[一二三四五六七八九1-9]*[:]|附表[一二三四五六七八九1-9]*[:])(?!.*(?:资质|能力|信誉)).*$|'
r'^第[一二三四五六七八九]+(?:章|部分)\s*[\u4e00-\u9fff]+|评标办法前附表|投标人须知',
re.MULTILINE
)
start_page = None
end_page = None
# 从章节开始后的位置进行检查
for i, page in enumerate(pdf_document.pages[last_begin_index:], start=last_begin_index):
text = page.extract_text()
if text:
cleaned_text = clean_page_content(text, common_header)
# 确定起始页需在last_begin_index之后
if "资格审查" in cleaned_text or "资质条件" in cleaned_text: #当页包含这些内容可以认为是'资格审查资料部分'
if re.search(begin_pattern, cleaned_text, re.MULTILINE):
if start_page is None:
start_page = i # 确保起始页不小于章节的开始页码
# 确定结束页
if start_page is not None and re.search(end_pattern, cleaned_text):
if i > start_page:
end_page = i
break # 找到结束页后退出循环
if start_page is None or end_page is None:
print(f"{output_suffix} twice: 未找到起始或结束页在文件 {pdf_path} 中!")
return []
else:
return [save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page, common_header)]
elif output_suffix == "invalid":
pdf_document = PdfReader(pdf_path)
total_pages = len(pdf_document.pages)
# 计算总页数的三分之二
total = int(total_pages * 2 / 3)
start_page = last_begin_index
end_page = min(90, total)
return [save_pages_to_new_pdf(pdf_path, output_folder, output_suffix, start_page, end_page, common_header)]
else:
print(f"{output_suffix} twice: 未定义的输出后缀。")
return []
except Exception as e:
print(f"Error in extract_pages_twice: {e}")
return []
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name):
"""
合并 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件,
以及 truncate_files 中以指定后缀结尾的文件,按照指定顺序合并。
参数:
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径。
- truncate_files (list): 包含 PDF 文件路径的列表。
- output_path (str): 合并后的 PDF 文件保存路径。
- base_file_name (str): 用于匹配文件名的基础名称。
返回:
- str: 合并后的 PDF 文件路径,如果没有合并则返回 ""
"""
# 1. 获取 output_folder 中所有文件
try:
all_output_files = os.listdir(output_folder)
except FileNotFoundError:
print(f"输出文件夹 '{output_folder}' 未找到。")
return ""
except PermissionError:
print(f"没有权限访问输出文件夹 '{output_folder}'")
return ""
# 2. 定义要选择的文件后缀及合并顺序,包括 before 文件
desired_suffixes = [
f'{base_file_name}_before.pdf',
f'{base_file_name}_notice.pdf',
f'{base_file_name}_tobidders_notice_table.pdf'
]
all_pdfs_to_merge = []
for suffix in desired_suffixes:
if suffix == f'{base_file_name}_before.pdf':
# 从 output_folder 中选择以 {base_file_name}_before.pdf 结尾的文件
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else:
# 从 truncate_files 中选择以指定后缀结尾的文件
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
# 如果找到多个匹配的文件,按名称排序并添加
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"没有找到以 '{suffix}' 结尾的文件。")
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
if not all_pdfs_to_merge:
print("没有找到要合并的 PDF 文件。")
return ""
# 过滤掉不存在或为空的文件路径
all_pdfs_to_merge = [f for f in all_pdfs_to_merge if os.path.isfile(f)]
if not all_pdfs_to_merge:
print("没有有效的 PDF 文件需要合并。")
return ""
# 调用 merge_pdfs 函数进行合并
try:
merge_pdfs(all_pdfs_to_merge, output_path)
print(f"已成功合并 PDF 文件到 '{output_path}'")
return output_path
except Exception as e:
print(f"合并 PDF 文件时出错: {e}")
return ""
def truncate_pdf_multiple(input_path, output_folder, unique_id="123"):
global logger
logger = get_global_logger(unique_id)
base_file_name = os.path.splitext(os.path.basename(input_path))[0] # 纯文件名
truncate_files = []
selections = range(1, 6) # 选择 1 到 5
# 使用 ThreadPoolExecutor 进行多线程处理
with concurrent.futures.ThreadPoolExecutor(max_workers=len(selections)) as executor:
# 提交所有任务并保持 selection 顺序
future_to_selection = {selection: executor.submit(truncate_pdf_main, input_path, output_folder, selection) for selection in selections}
# 按 selection 顺序收集结果
for selection in selections:
future = future_to_selection.get(selection)
try:
files = future.result()
if files and any(f for f in files):
# 过滤空字符串
valid_files = [f for f in files if f]
truncate_files.extend(valid_files)
else:
logger.error(f"Selection {selection}: 截取失败,已添加空字符串。")
# Append two empty strings for selection=1; otherwise, one empty string
truncate_files.extend(["", ""] if selection == 1 else [""])
except Exception as e:
logger.error(f"Selection {selection} 生成了一个异常: {e}")
# Append two empty strings for selection=1; otherwise, one empty string
truncate_files.extend(["", ""] if selection == 1 else [""])
# Proceed with merging logic as before...
if any(f for f in truncate_files if f): # 检查是否有有效的文件路径
merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_baseinfo.pdf")
merged_result = merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name)
if merged_result:
truncate_files.append(merged_result)
logger.info(f"merged_baseinfo: 已生成合并文件: {merged_output_path}")
else:
truncate_files.append("") # 如果 merged_result 未生成,添加空字符串
logger.warning("merged_baseinfo: 未生成合并文件,因为没有找到需要合并的 PDF 文件。")
else:
truncate_files.append("") # 如果没有文件需要合并,也添加空字符串
logger.warning(f"merged_baseinfo: 没有文件需要合并 for {input_path}")
return truncate_files
def truncate_pdf_specific_engineering(pdf_path, output_folder, selections, unique_id="123"):
try:
global logger
logger = get_global_logger(unique_id)
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
truncate_files = []
# 使用 ThreadPoolExecutor 进行多线程处理
with concurrent.futures.ThreadPoolExecutor(max_workers=len(selections)) as executor:
# 提交所有任务并保持 selection 顺序
future_to_selection = {selection: executor.submit(truncate_pdf_main, pdf_path, output_folder, selection) for selection in selections}
# 按 selection 顺序收集结果
for selection in selections:
future = future_to_selection.get(selection)
try:
files = future.result()
if files and any(f for f in files):
# 过滤空字符串
valid_files = [f for f in files if f]
truncate_files.extend(valid_files)
else:
logger.error(f"Selection {selection}: 截取失败,已添加空字符串。")
# Append two empty strings for selection=1; otherwise, one empty string
truncate_files.extend(["", ""] if selection == 1 else [""])
except Exception as e:
logger.error(f"Selection {selection} 生成了一个异常: {e}")
# Append two empty strings for selection=1; otherwise, one empty string
truncate_files.extend(["", ""] if selection == 1 else [""])
# Proceed with merging logic as before...
if any(f for f in truncate_files if f): # 检查是否有有效的文件路径
merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_specific.pdf")
merged_result = merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name)
if merged_result:
truncate_files.append(merged_result)
logger.info(f"merged_specific: 已生成合并文件: {merged_output_path}")
else:
truncate_files.append("") # 如果 merged_result 未生成,添加空字符串
logger.warning("merged_specific: 未生成合并文件,因为没有找到需要合并的 PDF 文件。")
else:
truncate_files.append("") # 如果没有文件需要合并,也添加空字符串
logger.warning(f"merged_specific: 没有文件需要合并 for {pdf_path}")
return truncate_files
except Exception as e:
logger.error(f"Error in truncate_pdf_specific_engineering: {e}")
return [""] * len(selections) # 返回与 selections 数量相同的空字符串列表
# TODO:需要完善二次请求。目前invalid一定能返回 前附表 须知正文如果为空的话要额外处理一下比如说就不进行跳转见xx表 开评定标这里也要考虑 如果评分表为空,也要处理。
#TODO:zbtest8 zbtest18有问题 后期需要完善,截取需要截两次,第一次严格第二次宽松
#TODO:目前merged_baseinfo没有包含投标人须知正文。
#投标人须知前附表改为货物标一样的
if __name__ == "__main__":
input_path = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest7.pdf" # 可以是单个PDF文件路径或文件夹路径 #zbtest20.pdf zbtest4_evaluation_method.pdf zbtest8.pdf
# input_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\68549b0b-e892-41a9-897c-c3694535ee61\\ztbfile.pdf"
# input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\2-招标文件.pdf"
output_folder="C:\\Users\\Administrator\\Desktop\\招标文件\\new_test\\新建文件夹"
# files=truncate_pdf_multiple(input_path,output_folder)
# selections = [5, 1] # 仅处理 selection 5、1
# files=truncate_pdf_specific_engineering(input_path,output_folder,selections)
# print(files)
selection = 5 # 例如1 - 投标人须知前附表+正文, 2 - 评标办法, 3 -资格审查条件 4-招标公告 5-无效标
generated_files = truncate_pdf_main(input_path, output_folder, selection)
# print("生成的文件:", generated_files)