import json import os import re from PyPDF2 import PdfWriter, PdfReader from flask_app.general.llm.通义千问long_plus import upload_file, qianwen_long from flask_app.general.通用功能函数 import process_string_list def extract_and_format_from_paths(json_paths, includes, excludes): """ 从多个 JSON 文件路径读取数据,提取包含特定关键词的内容,并按照格式要求合并。 参数: json_paths (list): 包含多个 JSON 文件路径的列表。 includes (list): 包含要检查的关键词的列表。 excludes (list): 包含要排除的关键词的列表。 返回: list: 包含所有文件中满足条件的格式化字符串列表。 """ all_formatted_results = [] # 遍历每个文件路径 for path in json_paths: # 检查路径是否为空或不存在 if not path or not os.path.isfile(path): print(f"禁止投标情形: Error: The file path '{path}' is invalid or does not exist.") continue # 跳过无效路径 try: with open(path, 'r', encoding='utf-8') as file: # 加载 JSON 数据 json_data = json.load(file) formatted_results = [] # 遍历 JSON 数据的每个键值对 for key, value in json_data.items(): if isinstance(value, dict): # 如果值是字典,检查嵌套字典的每个键值对 for sub_key, sub_value in value.items(): if any(include in sub_key for include in includes): # 如果子值包含关键词,格式化并添加到结果列表 formatted_results.append(f"{sub_value}") elif isinstance(value, str): # clause # 检查是否包含任何 include 关键词 for include in includes: if include in value: # 找到 include 之前的内容 prefix = value.split(include)[0] # 检查 prefix 是否不包含任何 exclude 关键词 if not any(exclude in prefix for exclude in excludes): # 如果不包含任何 exclude 关键词,添加整个 value 到结果列表 if '\n' in value: value = value.split('\n', 1)[-1] formatted_results.append(value) break # 找到一个符合条件的就跳出循环 # 将当前文件的结果添加到总结果列表 all_formatted_results.extend(formatted_results) except FileNotFoundError: print(f"禁止投标情形: Error: The file '{path}' does not exist.") except json.JSONDecodeError: print(f"禁止投标情形: Error: The file '{path}' contains invalid JSON.") return all_formatted_results def extract_unique_items_from_texts(texts): # 更新正则表达式以包括更广泛的序号类型,包括中文序号 pattern = re.compile(r'(?:\d+\.|\(\d+\)|\(\d+\)|\d+\)|\①|\②|\③|\④|\⑤|\⑥|\⑦|\⑧|\⑨|\⑩|\⑪|\⑫)\s*') intro_pattern = re.compile(r'^.*?[::]') punctuation_pattern = re.compile(r'[;。,、..,:;!?!?]+$') url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') content_check_pattern = re.compile(r'[\u4e00-\u9fa5a-zA-Z0-9]{2,}') # 检查是否含有至少2个连续的字母、数字或汉字 all_results = [] seen = set() for text in texts: # 去除文本中的制表符和换行符 text = text.replace('\t', '').replace('\n', '') # 删除引导性的文本(直到冒号,但保留冒号后的内容) text = intro_pattern.sub('', text) # 替换URL为占位符,并保存URL以便后续还原 urls = [] def url_replacer(match): urls.append(match.group(0)) return f"{{URL{len(urls)}}}" text = url_pattern.sub(url_replacer, text) # 使用数字和括号的模式分割文本 items = pattern.split(text) for item in items: cleaned_item = item.strip() if cleaned_item: # 进一步清理每个条目 cleaned_item = pattern.sub('', cleaned_item) cleaned_item = punctuation_pattern.sub('', cleaned_item) cleaned_item = cleaned_item.strip() # 还原URL for i, url in enumerate(urls, 1): cleaned_item = cleaned_item.replace(f"{{URL{i}}}", url) # 添加未见过的独特条目,确保它包含足够的实质内容并长度大于3个字符 if cleaned_item and cleaned_item not in seen and len(cleaned_item) > 3 and content_check_pattern.search(cleaned_item): seen.add(cleaned_item) all_results.append(cleaned_item) return all_results def merge_pdfs(paths, output_filename): pdf_writer = PdfWriter() output_path = None for path in paths: pdf_reader = PdfReader(path) for page in range(len(pdf_reader.pages)): # 将每页添加到写入对象中 pdf_writer.add_page(pdf_reader.pages[page]) if output_path is None: # 确定输出文件路径 output_path = os.path.join(os.path.dirname(path), output_filename) # 写入合并的PDF到文件 if output_path: with open(output_path, 'wb') as out: pdf_writer.write(out) print(f"禁止投标情形: Merged PDF saved to {output_path}") else: print("禁止投标情形: No files to merge.") return output_path def find_forbidden(qualification=""): try: if qualification: file_id = upload_file(qualification) user_query_forbidden = ( "该招标文件规定的投标人不得存在的其他情形有哪些,请以列表给我提供信息,形如[xx,xx,...]," "请你不要回答有关\"信誉要求\"的内容,若原文未提及,返回[]。" ) qianwen_forbidden_str = qianwen_long(file_id, user_query_forbidden) else: qianwen_forbidden_str = "[]" actual_list = process_string_list(qianwen_forbidden_str) # 提取出字符串列表 ["xxx","xx"] includes = ["不得存在", "不得与", "禁止投标", "对投标人的纪律"] excludes = ["招标", "评标", "定标"] # forbidden_results = extract_and_format_from_paths([truncate_json_path, clause_path], includes, excludes) # processed_results = extract_unique_items_from_texts(forbidden_results) # merged_forbidden_list = list(dict.fromkeys(actual_list + processed_results)) merged_forbidden_list = list(dict.fromkeys(actual_list)) forbidden_dict = {'不得存在的其他情形': merged_forbidden_list} return forbidden_dict except Exception as e: print(f"find_forbidden 在处理时发生异常: {e}") return {'不得存在的其他情形': ""} #TODO:不得存在的情况文中有很多内容,货物标中采用了全文搜索的逻辑。 if __name__ == '__main__': # truncate_json_path = "C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\truncate_output.json" # clause_path = "C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\clause1.json" qualification = "C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\ztbfile_qualification.pdf" output_dir = "C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f" doc_path = 'C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\ztbfile.docx' res = find_forbidden(qualification) print(json.dumps(res, ensure_ascii=False, indent=4))