# -*- coding: utf-8 -*- import json import os.path import time import re from flask_app.main.json_utils import combine_json_results, nest_json_under_key from flask_app.main.通义千问long import upload_file, qianwen_long from concurrent.futures import ThreadPoolExecutor from flask_app.main.禁止投标情形 import find_forbidden, process_string_list #如果当前段落有序号,则向下匹配直接遇到相同的序号样式 #如果当前段落无序号,则向下匹配序号,把若干同类的序号都摘出来。 def extract_text_with_keywords(doc_path, keywords, follow_up_keywords): from collections import OrderedDict from docx import Document import re if isinstance(keywords, str): keywords = [keywords] doc = Document(doc_path) extracted_paragraphs = OrderedDict() continue_collecting = False current_section_pattern = None active_key = None def match_keywords(text, patterns): return any(re.search(pattern, text, re.IGNORECASE) for pattern in patterns) def extract_from_text(text, current_index): nonlocal continue_collecting, current_section_pattern, active_key if text == "": return current_index if continue_collecting: if current_section_pattern and re.match(current_section_pattern, text): continue_collecting = False active_key = None else: if active_key is not None: extracted_paragraphs[active_key].append(text) return current_index if match_keywords(text, keywords): active_key = text extracted_paragraphs[active_key] = [text] if match_keywords(text, follow_up_keywords): continue_collecting = True section_number = re.match(r'(\d+(\s*\.\s*\d+)*)', text) if section_number: current_section_number = section_number.group(1) level_count = current_section_number.count('.') # Pattern to match current level, e.g., 3.4.5 pattern = r'^' + (r'\d+\s*\.\s*') * level_count + r'\d+' # Generate patterns for next section at same level and parent level parts = current_section_number.split('.') matched_patterns = [pattern] # start with the full pattern # Next section at same level parts[-1] = str(int(parts[-1]) + 1) next_pattern = r'^' + r'\s*\.\s*'.join(parts) matched_patterns.append(next_pattern) # Parent section (if applicable) if len(parts) > 1: parent_section_parts = parts[:-1] parent_section_parts[-1] = str(int(parent_section_parts[-1]) + 1) parent_pattern = r'^' + r'\s*\.\s*'.join(parent_section_parts) matched_patterns.append(parent_pattern) # Combine the patterns combined_pattern = r'(' + r')|('.join(matched_patterns) + r')' current_section_pattern = re.compile(combined_pattern) else: found_next_number = False current_section_pattern = None while current_index < len(doc.paragraphs) - 1: current_index += 1 next_text = doc.paragraphs[current_index].text.strip() if not found_next_number: next_section_number = re.match(r'^([A-Za-z0-9]+(?:\.[A-Za-z0-9]+)*)|(\(\d+\))', next_text) if next_section_number: found_next_number = True if next_section_number.group(1): section_parts = next_section_number.group(1).split('.') dynamic_pattern = r'^' + r'\.'.join([r'[A-Za-z0-9]+' for _ in section_parts]) + r'\b' elif next_section_number.group(2): dynamic_pattern = r'^[\(\(]\d+[\)\)]' current_section_pattern = re.compile(dynamic_pattern) if current_section_pattern and re.match(current_section_pattern, next_text): extracted_paragraphs[active_key].append(next_text) else: continue_collecting = False active_key=None break return current_index index = 0 while index < len(doc.paragraphs): index = extract_from_text(doc.paragraphs[index].text.strip(), index) index += 1 return extracted_paragraphs def preprocess_text_list(text_list): new_text_list = [] # 正则表达式匹配中文字符或标点后的空格,该空格后紧跟字母、数字或带括号的数字 split_pattern = re.compile(r'(?<=[\u4e00-\u9fff。;!??!;])(?=\s+[a-zA-Z\d]|\s+\([1-9]\d*\)|\s+\([1-9]\d*\))') for text in text_list: # 使用正则表达式检查并拆分元素 parts = split_pattern.split(text) new_text_list.extend(part.strip() for part in parts if part.strip()) # 添加非空字符串检查 return new_text_list def clean_dict_datas(extracted_contents, keywords,excludes): #让正则表达式提取到的东西格式化 all_texts1 = [] all_texts2=[] # 定义用于分割句子的正则表达式,包括中文和西文的结束标点 split_pattern = r'(?<=[。!?\!\?])' for key, text_list in extracted_contents.items(): if len(text_list) == 1: for data in text_list: # 检查是否包含任何需要排除的字符串 if any(exclude in data for exclude in excludes): continue # 如果包含任何排除字符串,跳过这个数据 # 去掉开头的序号,包括字母+数字的格式 以及括号+数字 pattern = r'^\s*([((]\d+[))]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|.)?|[一二三四五六七八九十]+、)' data = re.sub(pattern, '', data).strip() keyword_match = re.search(keywords, data) if keyword_match: # 从关键词位置开始查找结束标点符号 start_pos = keyword_match.start() # 截取从关键词开始到后面的内容 substring = data[start_pos:] # 按定义的结束标点分割 sentences = re.split(split_pattern, substring, 1) if len(sentences) > 0 and sentences[0]: # 只取第一句,保留标点 cleaned_text = data[:start_pos] + sentences[0] #eg:经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。潜在投标人应自行承担现场考察的全部费用、责任和风险。 # 经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。 else: cleaned_text = data # 如果没有标点,使用整个字符串 else: # 如果没有找到关键词,保留原文本 cleaned_text = data # 删除空格 cleaned_text_no_spaces = cleaned_text.replace(' ', '').replace(' ', '') all_texts1.append(cleaned_text_no_spaces) # 将处理后的文本添加到结果列表 else: # print(text_list) new_text_list=preprocess_text_list(text_list) # print(new_text_list) pattern = r'^\s*([((]\d+[))]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|.)?|[一二三四五六七八九十]+、)' # data = re.sub(pattern, '', new_text_list[0]).strip() #去除序号 data = re.sub(pattern, '', new_text_list[0]).replace(' ','').strip() # 将修改后的第一个元素和剩余的元素连接起来 new_text_list[0] = data # 更新列表中的第一个元素 joined_text = "\n".join(new_text_list) # 如果列表中有多个元素,则连接它们 # 删除空格 joined_text_no_spaces = joined_text.replace(' ', '').replace(' ', '') all_texts2.append(joined_text_no_spaces) # 将每个列表的内容添加到 all_texts 中 return all_texts1,all_texts2 #all_texts1要额外用gpt all_text2直接返回结果 def find_sentences_with_keywords(data, keywords, follow_up_keywords): """递归查找并返回包含关键词的句子列表,并根据是否存在后续关键词分别存储到两个列表中。""" sentences1 = [] # 保存没有后续关键词的情况 sentences2 = [] # 保存有后续关键词的情况 if isinstance(data, dict): for value in data.values(): result1, result2 = find_sentences_with_keywords(value, keywords, follow_up_keywords) sentences1.extend(result1) sentences2.extend(result2) elif isinstance(data, list): for item in data: result1, result2 = find_sentences_with_keywords(item, keywords, follow_up_keywords) sentences1.extend(result1) sentences2.extend(result2) elif isinstance(data, str): # 分割句子,保证句子完整性(按标点符号和序号分割) # split_sentences = re.split(r'(?<=[。!?\!\?])|(?=\d+[\、\.])|(?=[((]\d+[))])', data) # 扩展匹配序号分割 split_sentences = re.split(r'(?<=[。!?\!\?])|(?=\d+\.\d+)|(?=\d+[\、\.])|(?=[((]\d+[))])', data) i = 0 while i < len(split_sentences): sentence = split_sentences[i].strip() if re.search(keywords, sentence, re.IGNORECASE): follow_up_present = any( re.search(follow_up, sentence, re.IGNORECASE) for follow_up in follow_up_keywords) if follow_up_present: # 如果存在后续关键词,则从当前位置开始截取 start_index = i end_index = start_index found_next_section = False for j in range(start_index + 1, len(split_sentences)): if re.match(r'\d+\.\d+(\.\d+)?', split_sentences[j].strip()): end_index = j found_next_section = True break if found_next_section: full_text = ' '.join(split_sentences[start_index:end_index]).strip() else: full_text = ' '.join(split_sentences[start_index:]).strip() # pattern = r'^\s*([((]\d+[))]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)' pattern = r'^\s*([((]\d+[))]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|.)?|[一二三四五六七八九十]+、)' # data=re.sub(pattern,'',full_text) data = re.sub(pattern, '', full_text).replace(' ','').strip() sentences2.append(data) # 存储有后续关键词的情况 i = end_index if found_next_section else len(split_sentences) else: # pattern = r'^\s*([((]\d+[))]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)' pattern = r'^\s*([((]\d+[))]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|.)?|[一二三四五六七八九十]+、)' # data = re.sub(pattern, '', sentence).replace('\n','').strip() data = re.sub(pattern, '', sentence).replace('\n', '').replace(' ','').strip() sentences1.append(data) # 存储没有后续关键词的情况 i += 1 else: i += 1 return sentences1, sentences2 # 返回两个列表 def extract_sentences_from_json(json_path, keywords,follow_up_keywords): with open(json_path, 'r', encoding='utf-8') as file: data = json.load(file) """从JSON数据中提取包含关键词的句子。""" return find_sentences_with_keywords(data, keywords,follow_up_keywords) #处理无效投标 def extract_values_if_contains(data, includes): """ 递归检查字典中的值是否包含列表 'includes' 中的内容。 如果包含,将这些值添加到一个列表中并返回。 参数: data (dict): 字典或从 JSON 解析得到的数据。 includes (list): 包含要检查的关键词的列表。 返回: list: 包含满足条件的值的列表。 """ included_values = [] # 初始化结果列表 # 定义递归函数来处理嵌套字典 def recursive_search(current_data): if isinstance(current_data, dict): for key, value in current_data.items(): if isinstance(value, dict): # 如果值是字典,递归搜索 recursive_search(value) elif isinstance(value, str): # 如果值是字符串,检查是否包含任何 includes 中的关键词 if any(include in value for include in includes): included_values.append(value) elif isinstance(current_data, list): for item in current_data: # 如果是列表,递归每个元素 recursive_search(item) # 开始递归搜索 recursive_search(data) return included_values #你是一个文本助手,文本内的信息以'...............'分割,你负责准确筛选所需的信息并返回,每块信息要求完整,不遗漏,你不得擅自进行总结或删减。 #以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或使投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果,x为符合的信息的序号。 #以上是原文内容,文本内的信息以'...............'分割,请你根据该信息回答:否决投标或拒绝投标或无效投标或使投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选所需的信息并返回。最终结果以json列表格式返回给我,键名为'否决和无效投标情形',你的回答完全忠于原文内容,且回答内容与原文内容一致,要求完整与准确,不能擅自总结或者概括。", def handle_query(file_path, user_query, output_file, result_key, keywords, truncate_json_path): excludes = ["说明表", "重新招标", "否决所有", "否决投标的条件", "备注:", "本人保证:"] follow_up_keywords = [r'情\s*形\s*之\s*一', r'情\s*况\s*之\s*一', r'下\s*列', r'以\s*下'] extracted_contents = extract_text_with_keywords(file_path, [keywords], follow_up_keywords) #提取正文(除表格) # print(extracted_contents) all_texts1, all_texts2 = clean_dict_datas(extracted_contents, keywords, excludes) # 列表 all_tables1, all_tables2 = extract_sentences_from_json(truncate_json_path, keywords, follow_up_keywords) #提取表格数据(json_data) qianwen_txt = all_texts1 + all_tables1 selected_contents = set() # 使用 set 去重 if qianwen_txt: with open(output_file, 'w', encoding='utf-8') as file: counter = 1 for content in qianwen_txt: file.write("..............." + '\n') file.write(f"{counter}. {content}\n") counter += 1 file_id = upload_file(output_file) qianwen_ans = qianwen_long(file_id, user_query) num_list = process_string_list(qianwen_ans) print(num_list) for index in num_list: if index - 1 < len(qianwen_txt): content = qianwen_txt[index - 1] selected_contents.add(content) # 无论 qianwen_txt 是否为空,都添加 all_texts2 和 all_tables2 的内容 selected_contents.update(all_texts2) selected_contents.update(all_tables2) # 如果 selected_contents 不为空,则返回结果,否则返回空字符串 if selected_contents: res = {result_key: list(selected_contents)} else: res = {result_key: ""} return res def combine_find_invalid(file_path, output_dir, truncate_json_path,clause_path,truncate3): queries = [ (r'否\s*决|无\s*效\s*投\s*标|被\s*拒\s*绝|予\s*以\s*拒\s*绝|投\s*标\s*失\s*效|投\s*标\s*无\s*效', "以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果,x为符合的信息的序号,若情况不存在,返回[]。", os.path.join(output_dir, "temp1.txt"), "否决和无效投标情形"), (r'废\s*标', "以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:废标项的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果,x为符合的信息的序号,若情况不存在,返回[]。", os.path.join(output_dir, "temp2.txt"), "废标项") ] results = [] # 使用线程池来并行处理查询 with ThreadPoolExecutor() as executor: futures = [] for keywords, user_query, output_file, result_key in queries: future = executor.submit(handle_query, file_path, user_query, output_file, result_key, keywords, truncate_json_path) futures.append(future) time.sleep(1) # 暂停1秒后再提交下一个任务 for future in futures: results.append(future.result()) #禁止投标 print("starting不得存在的情形...") forbidden_res = find_forbidden(truncate_json_path, clause_path, truncate3) results.append(forbidden_res) combined_dict = {} for d in results: combined_dict.update(d) print("无效标与废标done...") # return nest_json_under_key(combined_dict, "无效标与废标项") return {"无效标与废标项":combined_dict} if __name__ == '__main__': start_time = time.time() truncate_json_path = "C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\truncate_output.json" clause_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\clause1.json" truncate3="C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\ztbfile_qualification.pdf" output_dir = "C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\zbout" # doc_path = 'C:\\Users\\Administrator\\Desktop\\fsdownload\\temp7\\3abb6e16-19db-42ad-9504-53bf1072dfe7\\ztbfile_invalid.docx' doc_path = 'C:\\Users\\Administrator\\Desktop\\fsdownload\\006decc2-b3b5-4898-9b9f-4b0eab4e173f\\ztbfile_invalid.docx' results = combine_find_invalid(doc_path, output_dir,truncate_json_path,clause_path,truncate3) end_time = time.time() print("Elapsed time:", str(end_time - start_time)) print("Results:", json.dumps(results,ensure_ascii=False,indent=4))