import logging from PyPDF2 import PdfReader, PdfWriter import re # 导入正则表达式库 import os # 用于文件和文件夹操作 from flask_app.general.format_change import docx2pdf from flask_app.general.merge_pdfs import merge_and_cleanup,merge_pdfs import concurrent.futures def get_global_logger(unique_id): if unique_id is None: return logging.getLogger() # 获取默认的日志器 logger = logging.getLogger(unique_id) return logger logger = None def clean_page_content(text, common_header): # 首先删除抬头公共部分 if common_header: # 确保有公共抬头才进行替换 for header_line in common_header.split('\n'): if header_line.strip(): # 只处理非空行 # 替换首次出现的完整行 text = re.sub(r'^' + re.escape(header_line.strip()) + r'\n?', '', text, count=1) # 删除页码 eg:89/129 这个代码分三步走可以把89/129完全删除 text = re.sub(r'^\s*\d+\s*(?=\D)', '', text) # 删除开头的页码,仅当紧跟非数字字符时 text = re.sub(r'\s+\d+\s*$', '', text) # 删除结尾的页码 text = re.sub(r'\s*\/\s*\d+\s*', '', text) # 删除形如 /129 的页码 text = re.sub(r'\s*[—-]\s*\d+\s*[—-]\s*', '', text) # 删除形如 '—2—' 或 '-2-' 的页码 return text # PYPDF2版本 def extract_common_header(pdf_path): from PyPDF2 import PdfReader pdf_document = PdfReader(pdf_path) headers = [] total_pages = len(pdf_document.pages) # 确定要读取的页数和起始页 if total_pages == 2: pages_to_read = 2 start_page = 0 else: pages_to_read = 3 middle_page = total_pages // 2 start_page = max(0, middle_page - 1) for i in range(start_page, min(start_page + pages_to_read, total_pages)): page = pdf_document.pages[i] text = page.extract_text() or "" if text: # 只取每页的前三行 first_lines = text.strip().split('\n')[:3] headers.append(first_lines) if len(headers) < 2: return "" # 如果没有足够的页来比较,返回空字符串 # 寻找每一行中的公共部分,按顺序保留 common_headers = [] for lines in zip(*headers): # 提取第一行的词汇顺序 first_words = lines[0].split() # 筛选所有页面都包含的词汇,保持顺序 common_line = [word for word in first_words if all(word in line.split() for line in lines[1:])] if common_line: common_headers.append(' '.join(common_line)) return '\n'.join(common_headers) # fitz库版本 # def extract_common_header(pdf_path): # doc = fitz.open(pdf_path) # headers = [] # total_pages = len(doc) # # if total_pages == 2: # pages_to_read = 2 # start_page = 0 # else: # pages_to_read = 3 # middle_page = total_pages // 2 # start_page = max(0, middle_page - 1) # # for i in range(start_page, min(start_page + pages_to_read, total_pages)): # page = doc[i] # text = page.get_text() # if text: # first_lines = text.strip().split('\n')[:3] # headers.append(first_lines) # # doc.close() # # if len(headers) < 2: # return "" # # common_headers = [] # for lines in zip(*headers): # common_line = set(lines[0].split()).intersection(*[set(line.split()) for line in lines[1:]]) # if common_line: # common_headers.append(' '.join(common_line)) # # return '\n'.join(common_headers) def is_pdf_or_doc(filename): # 判断文件是否为PDF或Word文档 return filename.lower().endswith(('.pdf', '.doc', '.docx')) def convert_to_pdf(file_path): # 假设 docx2pdf 函数已经被定义,这里仅根据文件扩展名来决定是否需要转换 if file_path.lower().endswith(('.doc', '.docx')): return docx2pdf(file_path) return file_path def process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix): pdf_path = convert_to_pdf(file_path) result = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) if result: if output_suffix == "tobidders_notice": # 确保返回的是元组,并将其中的 None 转换为 "" path1, path2 = result return (path1 or "", path2 or "") elif output_suffix == "qualification1": merge_and_cleanup(result, "qualification3") return result or "" return result or "" return "" # 返回空字符串 def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix): if not os.path.exists(output_folder): os.makedirs(output_folder) generated_files = [] if os.path.isdir(input_path): for file_name in os.listdir(input_path): file_path = os.path.join(input_path, file_name) if is_pdf_or_doc(file_path): result = process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) if isinstance(result, tuple): generated_files.extend([f if f else "" for f in result]) # 保留空字符串 else: generated_files.append(result) # 直接添加result,可能是空字符串 elif os.path.isfile(input_path) and is_pdf_or_doc(input_path): result = process_files(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) if isinstance(result, tuple): generated_files.extend([f if f else "" for f in result]) # 保留空字符串 else: generated_files.append(result) # 直接添加result,可能是空字符串 else: print("提供的路径既不是文件夹也不是PDF文件。") return generated_files # 默认逻辑是start_page匹配上就不再设置了,一般不匹配上目录的原因是设置了begin_page=5,但是匹配'第一章 招标公告'的时候start_page可能会错误匹配到目录。 def extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None, output_suffix="normal"): start_page = None end_page = None for i, page in enumerate(pdf_document.pages): text = page.extract_text() or "" cleaned_text = clean_page_content(text, common_header) if output_suffix == "tobidders_notice": if exclusion_pattern and re.search(exclusion_pattern, cleaned_text) and start_page is not None: continue else: if exclusion_pattern and re.search(exclusion_pattern, cleaned_text): continue if output_suffix == "notice": if re.search(begin_pattern, cleaned_text) and i > begin_page: start_page = i else: if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page: start_page = i if start_page is not None and re.search(end_pattern, cleaned_text) and i > start_page: end_page = i break return start_page, end_page def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix): try: common_header = extract_common_header(pdf_path) pdf_document = PdfReader(pdf_path) exclusion_pattern = None total_pages = len(pdf_document.pages) - 1 # 获取总页数 if output_suffix == "tobidders_notice": exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据') start_page, mid_page, end_page = extract_pages_tobidders_notice( pdf_document, begin_pattern, begin_page, common_header, exclusion_pattern ) if start_page is None or end_page is None or mid_page is None: print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。") return extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix, common_header) path1 = save_extracted_pages(pdf_document, start_page, mid_page, pdf_path, output_folder, "tobidders_notice_part1") path2 = save_extracted_pages(pdf_document, mid_page, end_page, pdf_path, output_folder, "tobidders_notice_part2") return path1, path2 else: # 原有的处理逻辑保持不变 if output_suffix == "qualification1": exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据') start_page, end_page = extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern, output_suffix) # 针对 selection = 6 的特殊处理 if output_suffix == "format": if start_page is None: print(f"{output_suffix}: 未找到起始页,提取失败!") return "" if end_page is None: # 如果未匹配到结束页,默认截取到文件末尾 end_page = total_pages print(f"{output_suffix}: 未找到结束页,默认截取到文件末尾。") return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix) if start_page is None or end_page is None: print(f"first: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!尝试备用提取策略。") return extract_pages_twice(pdf_path, output_folder, output_suffix, common_header) elif output_suffix == "qualification1": truncate_pdf_main(pdf_path, output_folder, 2, "qualification3") # 合并'资格审查'章节和'评标办法'章节 return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix) except Exception as e: print(f"Error processing {pdf_path}: {e}") return "" def get_patterns_for_procurement(): begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|' r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:采购|技术标准).*|' r'^[一二三四五六七八九十百千]+、\s*采购清单', re.MULTILINE) end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE) return begin_pattern, end_pattern def get_patterns_for_evaluation_method(): begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*', re.MULTILINE) end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE) return begin_pattern, end_pattern def get_patterns_for_qualification(): # # 原始匹配逻辑 # begin_pattern_original = re.compile( # r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE) # end_pattern_original = re.compile( # r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE) # 新匹配逻辑 begin_pattern_new = re.compile( r'^资格性检查', re.MULTILINE) end_pattern_new = re.compile( r'^附件\s*\d+|^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE) return begin_pattern_new, end_pattern_new def get_patterns_for_qualification2(): begin_pattern = re.compile( r'^(?:附录(?:一|1)?[::]|附件(?:一|1)?[::]|附表(?:一|1)?[::]).*(?:资格|符合性).*$', re.MULTILINE ) end_pattern = re.compile( r'^(?!.*(?:资格|符合))(?:附录.*?[::]|附件.*?[::]|附表.*?[::]).*$|' r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE ) return begin_pattern, end_pattern def get_patterns_for_notice(): begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书).*', re.MULTILINE ) end_pattern = re.compile( # r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人须知|磋商须知|供应商须知)+|(?:一\s*、\s*)?(?:投标人须知|磋商须知|供应商须知)前附表)', r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE ) return begin_pattern, end_pattern # def extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern, begin_page, common_header, # exclusion_pattern): # start_page = None # mid_page = None # end_page = None # for i, page in enumerate(pdf_document.pages): # text = page.extract_text() or "" # cleaned_text = clean_page_content(text, common_header) # if exclusion_pattern and re.search(exclusion_pattern, cleaned_text) and mid_page is not None: # continue # if start_page is None and re.search(begin_pattern, cleaned_text) and i > begin_page: # start_page = i # if start_page is not None and mid_page is None and re.search( # r'^\s*[((]?\s*[一1]\s*[))]?\s*[、..]*\s*(说\s*明|总\s*则)', cleaned_text): # mid_page = i # if start_page is not None and mid_page is not None and re.search(end_pattern, cleaned_text) and i > mid_page: # end_page = i # break # return start_page, mid_page, end_page def extract_pages_tobidders_notice(pdf_document, begin_pattern, begin_page, common_header, exclusion_pattern): def run_extraction(): start_page = None mid_page = None end_page = None chapter_type = None # 用于存储“章”或“部分” for i, page in enumerate(pdf_document.pages): text = page.extract_text() or "" cleaned_text = clean_page_content(text, common_header) if exclusion_pattern and re.search(exclusion_pattern, cleaned_text) and mid_page is not None: continue if start_page is None: match = re.search(begin_pattern, cleaned_text) if match and i > begin_page: start_page = i matched_text = match.group(0) # 获取整个匹配的文本 if '章' in matched_text: chapter_type = '章' elif '部分' in matched_text: chapter_type = '部分' else: chapter_type = None # 未匹配到“章”或“部分” if chapter_type: # 根据 chapter_type 动态生成 end_pattern end_pattern = re.compile( rf'^第[一二三四五六七八九十百千]+?(?:{chapter_type})\s*[\u4e00-\u9fff]+', re.MULTILINE ) # print(f"动态生成的 end_pattern: {end_pattern.pattern}") # 打印生成的 end_pattern # 根据 chapter_type 动态生成 additional_mid_pattern if chapter_type == '章': additional_mid_pattern = r'^第[一二三四五六七八九十百千]+?(?:部分)' elif chapter_type == '部分': additional_mid_pattern = r'^第[一二三四五六七八九十百千]+?(?:章)' else: additional_mid_pattern = '' # 定义基础的 mid_pattern base_mid_pattern = r'^\s*(?:[((]\s*[一1]?\s*[))]\s*[、..]*|[一1][、..]+|[、..]+)\s*(说\s*明|总\s*则)' # 合并基础模式和额外模式 if additional_mid_pattern: combined_mid_pattern = re.compile( rf'(?:{base_mid_pattern})|(?:{additional_mid_pattern})', re.MULTILINE ) else: combined_mid_pattern = re.compile( rf'{base_mid_pattern}', re.MULTILINE ) # print(f"生成的 combined_mid_pattern: {combined_mid_pattern.pattern}") # 打印 combined_mid_pattern else: # 如果未匹配到“章”或“部分”,使用默认的 end_pattern 和 mid_pattern end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE ) print(f"使用默认的 end_pattern: {end_pattern.pattern}") # 打印默认的 end_pattern # 定义基础的 mid_pattern base_mid_pattern = r'^\s*(?:[((]\s*[一1]?\s*[))]\s*[、..]*|[一1][、..]+|[、..]+)\s*(说\s*明|总\s*则)' combined_mid_pattern = re.compile( rf'{base_mid_pattern}', re.MULTILINE ) print( f"使用默认的 combined_mid_pattern: {combined_mid_pattern.pattern}") # 打印默认的 combined_mid_pattern continue if start_page is not None and mid_page is None and combined_mid_pattern: if re.search(combined_mid_pattern, cleaned_text): mid_page = i if start_page is not None and mid_page is not None and chapter_type: if re.search(end_pattern, cleaned_text): if mid_page is None: if i > start_page: end_page = i break else: if i > mid_page: end_page = i break return start_page, mid_page, end_page # 运行提取 start_page, mid_page, end_page = run_extraction() return start_page, mid_page, end_page def extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix, common_header): begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:(?:投标人?|磋商|供应商|谈判供应商|磋商供应商)须知)+' ) end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*([\u4e00-\u9fff]+)' # 捕获中文部分 ) exclusion_words = ["合同", "评标", "开标","评审","采购","资格"] # 在这里添加需要排除的关键词 pdf_document = PdfReader(pdf_path) exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据') # 提取第一部分 start_page1, end_page1 = extract_pages_generic(pdf_document, begin_pattern, end_pattern, -1, common_header) if start_page1 is None or end_page1 is None: print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!") return "", "" # 保存第一部分的路径 path1 = save_extracted_pages(pdf_document, start_page1, end_page1, pdf_path, output_folder, "tobidders_notice_part1") # 提取第二部分 start_page2 = end_page1 # 检查end_page1页面的内容 text = pdf_document.pages[end_page1].extract_text() or "" cleaned_text = clean_page_content(text, common_header) match = end_pattern.search(cleaned_text) if match: # 获取匹配到的中文部分 chapter_title = match.group(1) # 检查是否包含排除关键词 if any(word in chapter_title for word in exclusion_words): # 如果包含排除关键词,直接返回相同的路径 return path1, path1 # 如果不包含排除关键词,继续提取第二部分 _, end_page2 = extract_pages_generic(pdf_document, end_pattern, end_pattern, start_page2 - 1, common_header, exclusion_pattern) if end_page2 is None: print(f"second: {output_suffix} 未找到第二部分的结束页在文件 {pdf_path} 中!") return path1, path1 # 保存第二部分的路径 path2 = save_extracted_pages(pdf_document, start_page2, end_page2, pdf_path, output_folder, "tobidders_notice_part2") return path1, path2 def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header): try: exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据') pdf_document = PdfReader(pdf_path) patterns = None begin_page = 0 start_page = None end_page = None if output_suffix == "procurement": patterns = [get_patterns_for_procurement()] begin_page = 5 elif output_suffix == "evaluation_method" or output_suffix == "qualification2" or output_suffix == "qualification3": patterns = [get_patterns_for_evaluation_method()] begin_page = 5 elif output_suffix == "qualification1": patterns = [get_patterns_for_qualification(),get_patterns_for_qualification2()] begin_page = 5 elif output_suffix == "notice": patterns = [get_patterns_for_notice()] begin_page = 0 if patterns: for pattern_pair in patterns: # print(pattern_pair[0]) # print(pattern_pair[1]) start_page, end_page = extract_pages_generic(pdf_document, pattern_pair[0], pattern_pair[1], begin_page, common_header, exclusion_pattern, output_suffix) if start_page is not None and end_page is not None: break if start_page is None or end_page is None: if output_suffix == "qualification1": print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!") print("third:尝试提取评分办法章节...") temp = truncate_pdf_main(pdf_path, output_folder, 2, "qualification2") if len(temp) > 0: return temp[0] else: return "" else: print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!") return "" return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix) except Exception as e: print(f"Error in extract_pages_twice: {e}") return "" # def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix): # if output_suffix=='notice': # print(start_page) # base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] # output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf") # output_doc = PdfWriter() # for page_num in range(start_page, end_page + 1): # output_doc.add_page(pdf_document.pages[page_num]) # with open(output_pdf_path, 'wb') as f: # output_doc.write(f) # print(f"{output_suffix} 已截取并保存页面从 {start_page} 到 {end_page} 为 {output_pdf_path}") # return output_pdf_path def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix): try: # 检查 start_page 和 end_page 是否为 None if start_page is None or end_page is None: print("Error: start_page 或 end_page 为 None") return "" base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf") if start_page < 0 or end_page >= len(pdf_document.pages) or start_page > end_page: print(f"无效的页面范围: {start_page} 到 {end_page}") return "" if output_suffix == 'notice' and start_page - 1 >= 0: before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf") before_doc = PdfWriter() for page_num in range(0, start_page): before_doc.add_page(pdf_document.pages[page_num]) with open(before_pdf_path, 'wb') as f: before_doc.write(f) print(f"已保存页面从 0 到 {start_page - 1} 为 {before_pdf_path}") output_doc = PdfWriter() for page_num in range(start_page, end_page + 1): output_doc.add_page(pdf_document.pages[page_num]) with open(output_pdf_path, 'wb') as f: output_doc.write(f) print(f"{output_suffix} 已截取并保存页面从 {start_page} 到 {end_page} 为 {output_pdf_path}") return output_pdf_path except Exception as e: print(f"Error in save_extracted_pages: {e}") return "" # 返回空字符串 #合并封面+招标公告+投标人须知前附表+须知正文 def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name): """ 合并 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件, 以及 truncate_files 中以指定后缀结尾的文件,按照指定顺序合并。 参数: - output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径。 - truncate_files (list): 包含 PDF 文件路径的列表。 - output_path (str): 合并后的 PDF 文件保存路径。 - base_file_name (str): 用于匹配文件名的基础名称。 - logger (logging.Logger): 日志记录器对象。 返回: - str: 如果合并成功,返回 output_path;否则,返回空字符串 ""。 """ # 1. 获取 output_folder 中所有文件 try: all_output_files = os.listdir(output_folder) except FileNotFoundError: print(f"输出文件夹 '{output_folder}' 未找到。") return "" except PermissionError: print(f"没有权限访问输出文件夹 '{output_folder}'。") return "" # 2. 定义要选择的文件后缀及合并顺序,包括 before 文件 desired_suffixes = [ f'{base_file_name}_before.pdf', f'{base_file_name}_notice.pdf', f'{base_file_name}_tobidders_notice_part1.pdf', f'{base_file_name}_tobidders_notice_part2.pdf' ] all_pdfs_to_merge = [] for suffix in desired_suffixes: if suffix == f'{base_file_name}_before.pdf': # 从 output_folder 中选择以 {base_file_name}_before.pdf 结尾的文件 matching_files = [ os.path.join(output_folder, f) for f in all_output_files if f.endswith(suffix) ] else: # 从 truncate_files 中选择以指定后缀结尾的文件 matching_files = [f for f in truncate_files if f.endswith(suffix)] if matching_files: # 如果找到多个匹配的文件,按名称排序并添加 matching_files_sorted = sorted(matching_files) all_pdfs_to_merge.extend(matching_files_sorted) for f in matching_files_sorted: print(f"选中文件: {f}") else: print(f"没有找到以 '{suffix}' 结尾的文件。") print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}") if not all_pdfs_to_merge: print("没有找到要合并的 PDF 文件。") return "" # 调用 merge_pdfs 函数进行合并 merge_pdfs(all_pdfs_to_merge, output_path) print(f"已成功合并 PDF 文件到 '{output_path}'。") # 检查合并后的文件是否存在且不为空 if os.path.exists(output_path) and os.path.getsize(output_path) > 0: return output_path else: print(f"合并失败,没有生成 '{output_path}'。") return "" def get_start_and_common_header(input_path): common_header = extract_common_header(input_path) last_begin_index = 0 begin_pattern = re.compile(r'.*(?:招标公告|投标邀请书|投标邀请函)\s*$',re.MULTILINE) pdf_document = PdfReader(input_path) for i, page in enumerate(pdf_document.pages): if i > 25: return common_header, 0 # 如果页码大于25,直接返回last_begin_index=0 text = page.extract_text() if text: cleaned_text = clean_page_content(text, common_header) if begin_pattern.search(cleaned_text): last_begin_index = i # 更新第一个匹配的索引,页码从0开始 return common_header,last_begin_index return common_header,last_begin_index def truncate_pdf_main(input_path, output_folder, selection, output_suffix="default"): try: last_begin_index = get_start_and_common_header(input_path) begin_page = last_begin_index if last_begin_index != 0 else { 4: 1, # 前附表 2: 5, # 评标 3: 5, # 资格 1: 0, # 公告 5: 3 # 采购需求 }.get(selection, 0) if selection == 1: #招标公告 begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书|邀请函).*' ) end_pattern = re.compile( # r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)' r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE ) local_output_suffix = "notice" elif selection == 2: begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*' ) end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+' ) local_output_suffix = "evaluation_method" elif selection == 3: begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE ) end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE ) local_output_suffix = "qualification1" elif selection == 4: # 投标人须知前附表和正文 begin_pattern = re.compile( r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人?|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人?|磋商|供应商)须知前附表)', re.MULTILINE ) end_pattern=None # end_pattern = re.compile( # r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE # ) local_output_suffix = "tobidders_notice" elif selection == 5: #采购需求 # 更新的正则表达式以匹配"第x章"和"第x部分",考虑到可能的空格和其他文字 begin_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|' r'^第[一二三四五六七八九十百千]+(?:章|部分).*?采购.*|' r'^第[一二三四五六七八九十百千]+(?:章|部分).*?需求.*' ) end_pattern = re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+' ) local_output_suffix = "procurement" elif selection==6: #投标文件格式 begin_pattern=re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:格式).*' ) end_pattern=re.compile( r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE ) local_output_suffix = "format" else: print("无效的选择:请选择1-5") return [''] # 如果传入的 output_suffix 是 'default',则使用本地生成的 output_suffix if output_suffix == "default": output_suffix = local_output_suffix # 调用相应的处理函数 return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) or "" except Exception as e: print(f"Error in truncate_pdf_main: {e}") return [''] # 返回空字符串 def truncate_pdf_multiple(pdf_path, output_folder,unique_id="123"): global logger logger = get_global_logger(unique_id) base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] truncate_files = [] # 定义要处理的选择范围 selections = range(1, 6) # 使用 ThreadPoolExecutor 进行多线程处理 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 提交所有任务并保持 selection 顺序 future_to_selection = { selection: executor.submit(truncate_pdf_main, pdf_path, output_folder, selection, output_suffix="default") for selection in selections} # 按 selection 顺序收集结果 for selection in selections: future = future_to_selection.get(selection) try: files = future.result() if files: truncate_files.extend(files) except Exception as e: logger.error(f"Selection {selection} 生成了一个异常: {e}") # 定义合并后的输出路径 merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_baseinfo.pdf") # 调用 merge_selected_pdfs 并获取返回值 merged_path = merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name) if merged_path: # 合并成功,添加合并后的文件路径 truncate_files.append(merged_path) logger.info(f"已生成合并文件: {merged_output_path}") else: # 合并失败,添加空字符串 truncate_files.append("") logger.warning(f"合并失败,没有生成合并文件 for {pdf_path}") return truncate_files #小解析,只需要前三章内容 def truncate_pdf_specific_goods(pdf_path, output_folder, selections,unique_id="123"): """ 处理 PDF 文件,选择指定的 selections,并合并结果。 Args: pdf_path (str): 要处理的 PDF 文件路径。 output_folder (str): 截取后的文件保存文件夹路径。 selections (iterable): 要处理的 selection 列表。 Returns: list: 截取的文件路径列表,包括合并后的文件路径(如果有)。 """ global logger logger = get_global_logger(unique_id) base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] truncate_files = [] # 使用 ThreadPoolExecutor 进行多线程处理 with concurrent.futures.ThreadPoolExecutor(max_workers=len(selections)) as executor: # 提交所有任务并保持 selection 顺序 future_to_selection = {selection: executor.submit(truncate_pdf_main, pdf_path, output_folder, selection, output_suffix="default") for selection in selections} # 按 selection 顺序收集结果 for selection in selections: future = future_to_selection.get(selection) try: files = future.result() if files: if isinstance(files, list): truncate_files.extend(files) elif isinstance(files, str): truncate_files.append(files) except Exception as e: logger.error(f"Selection {selection} 生成了一个异常: {e}") # 定义合并后的输出路径 merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_baseinfo.pdf") # 调用 merge_selected_pdfs 并获取返回值 merged_path = merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name) if merged_path: # 合并成功,添加合并后的文件路径 truncate_files.append(merged_path) logger.info(f"已生成合并文件: {merged_output_path}") else: # 合并失败,添加空字符串 truncate_files.append("") logger.warning(f"合并失败,没有生成合并文件 for {pdf_path}") return truncate_files # TODO:交通智能系统和招标(1)(1)文件有问题 包头 绍兴 资格审查文件可能不需要默认与"evaluation"同一章 无效投标可能也要考虑 “more”的情况,类似工程标 if __name__ == "__main__": # input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\zbtest4_evaluation_method.pdf" # input_path = "C:\\Users\\Administrator\\Desktop\\fsdownload\\b151fcd0-4cd8-49b4-8de3-964057a9e653\\ztbfile.pdf" input_path="C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles" # input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output1\\2-招标文件_procurement.pdf" # input_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\a091d107-805d-4e28-b8b2-0c7327737238\\ztbfile.pdf" # output_folder = "C:\\Users\\Administrator\\Desktop\\fsdownload\\a091d107-805d-4e28-b8b2-0c7327737238\\tmp" output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\新建文件夹" # files = truncate_pdf_multiple(input_path, output_folder) # selections = [1,4] # files=truncate_pdf_specific_goods(input_path,output_folder,selections) # print(files) selection = 4# 例如:1 - 商务技术服务要求, 2 - 评标办法, 3 - 资格审查后缀有qualification1或qualification2(与评标办法一致) 4.投标人须知前附表part1 投标人须知正文part2 5-公告 generated_files = truncate_pdf_main(input_path, output_folder, selection) # print(generated_files)