# -*- encoding:utf-8 -*- import copy import json import re from flask_app.main.通义千问long import qianwen_long, upload_file from flask_app.main.多线程提问 import multi_threading from flask_app.main.json_utils import extract_content_from_json, clean_json_string from flask_app.货物标.投标人须知正文条款提取成json文件货物标版 import convert_clause_to_json from flask_app.货物标.货物标截取pdf import truncate_pdf_main # 这个字典可能有嵌套,你需要遍历里面的键名,对键名作判断,而不是键值,具体是这样的:如果处于同一层级的键的数量>1并且键名全由数字或点号组成。那么就将这些序号键名全部删除,重新组织成一个字典格式的数据,你可以考虑用字符串列表来保持部分平级的数据 # 对于同级的键,如果数量>1且键名都统一,那么将键名去掉,用列表保持它们的键值 def is_numeric_key(key): # 这个正则表达式匹配由数字、点、括号中的数字或单个字母(小写或大写)组成的字符串, # 字母后跟数字,或数字后跟字母,单个字母后跟点,但不能是字母-数字-字母的组合 pattern = r'^[\d.]+$|^\(\d+\)$|^(\d+)$|^[a-zA-Z]$|^[a-zA-Z]\d+$|^\d+[a-zA-Z]$|^[a-zA-Z]\.$' return re.match(pattern, key) is not None # TODO:如果键值中存在数字就不行 # zbtest20也有问题 def contains_number_or_index(key, value): # 判断值是否是数字或数字字符串 is_number = isinstance(value, (int, float)) or (isinstance(value, str) and value.isdigit()) # 判断键是否包含 "序号" contains_index = '序号' in key # 判断值中是否包含数字 contains_digit = isinstance(value, str) and re.search(r'\d+', value) # 判断值中是否包含中文字符 contains_chinese = isinstance(value, str) and re.search(r'[\u4e00-\u9fff]', value) # 如果值中包含数字但也有中文字符,则保留(返回 False) if contains_digit and contains_chinese: return False # 如果值是数字或包含数字,且不包含中文字符,或者键包含 "序号",返回 True return is_number or contains_index or contains_digit # 对于同一个字典中,可能存在若干键值对,若它们的键值都是""或者"/" 你就将它们的键值删去,它们的键名用字符串列表保存 # 如果键名是"序号"或者键值中全是数字,删去序号 def preprocess_dict(data): if isinstance(data, dict): if len(data) > 1: # 检查是否所有值都是 "" 或 "/" if all(v == "" or v == "/" or (isinstance(v, list) and not v) for v in data.values()): return list(data.keys()) else: processed = {} for k, v in data.items(): if not contains_number_or_index(k, v): processed_v = preprocess_dict(v) if processed_v != "": # 只添加非空值 processed[k] = processed_v return processed else: return {k: preprocess_dict(v) for k, v in data.items()} elif isinstance(data, list): return [preprocess_dict(item) for item in data] else: return data def process_dict(data): """ 递归处理字典,将符合条件的键值对进行转换。 如果键是数字或特定格式的字符串,则将其值放入 'items' 列表中并排序。 对于非数字键,如果对应的值是列表且列表中只有一个元素,则将其展平为单个元素。 Args: data (dict): 输入的字典数据。 Returns: dict 或 list 或 原始数据类型: 处理后的数据结构。 """ if not isinstance(data, dict): return data result = {} numeric_keys = [] non_numeric_keys = {} # 分类键为数字键和非数字键 for key, value in data.items(): if is_numeric_key(key): numeric_keys.append((key, value)) else: non_numeric_keys[key] = value # 处理数字键,将其值递归处理后放入 'items' 列表中 if numeric_keys: # 按键排序,确保顺序一致 numeric_keys_sorted = sorted(numeric_keys, key=lambda x: x[0]) result['items'] = [process_dict(item[1]) for item in numeric_keys_sorted] # 处理非数字键 for key, value in non_numeric_keys.items(): if isinstance(value, list): processed_list = [] for item in value: if isinstance(item, dict): # 处理字典中只有一个键值对的情况 if len(item) == 1: processed_item = process_dict(list(item.values())[0]) else: processed_item = process_dict(item) else: processed_item = process_dict(item) # 如果处理后的项是只包含一个元素的列表,则展平它 if isinstance(processed_item, list) and len(processed_item) == 1: processed_item = processed_item[0] processed_list.append(processed_item) # 新增逻辑:如果 processed_list 只有一个元素,则将其展平为单个元素 if len(processed_list) == 1: result[key] = processed_list[0] else: result[key] = processed_list else: # 如果值不是列表,直接递归处理 result[key] = process_dict(value) # 如果结果只有一个键 'items',则直接返回 'items' 列表 if len(result) == 1 and 'items' in result: return result['items'] # 检查如果所有键对应的值都是空列表,则将键名转换成列表项 if all(isinstance(v, list) and not v for v in result.values()): return list(result.keys()) return result # 查找引用的序号 def find_chapter_clause_references(data, parent_key=""): exclude_list = ["格式要求"] result = [] # 正则匹配"第x章"或"第x款" chapter_clause_pattern = re.compile(r'第[一二三四五六七八九十\d]+[章款]') # 如果数据不是字典,则直接返回空列表 if not isinstance(data, dict): return result # 遍历字典中的键值对 for key, value in data.items(): # 生成当前的完整键名 full_key = f"{parent_key}.{key}" if parent_key else key # 检查是否应排除该键或值 if any(exclude_item in full_key for exclude_item in exclude_list) or \ (isinstance(value, str) and any(exclude_item in value for exclude_item in exclude_list)): continue # 如果在排除项中,跳过处理 if isinstance(value, dict): # 如果值是字典,递归调用函数 result.extend(find_chapter_clause_references(value, full_key)) elif isinstance(value, list): # 如果值是列表,遍历列表中的元素 for index, item in enumerate(value): if isinstance(item, dict): # 生成新的键路径,包括列表索引 new_parent_key = f"{full_key}[{index}]" result.extend(find_chapter_clause_references(item, new_parent_key)) elif isinstance(value, str): # 如果值是字符串,检查是否匹配"第x章"或"第x款" if chapter_clause_pattern.search(value): result.append({full_key: value}) return result def preprocess_value(value): # 使用正则表达式查找"第X章"或"第X款" chapter_match = re.search(r'第(.+?)章', value) clause_match = re.search(r'第(.+?)款', value) if chapter_match or clause_match: # 以逗号、句号、问号、感叹号为分隔符 separators = r'[,。?!,\?!]' # 分隔符检测函数,确保括号成对闭合时才用作分隔符 def is_separator(ch, count): return count['('] == count[')'] and count['('] == count[')'] and re.match(separators, ch) parts = [] current_part = [] count = {'(': 0, ')': 0, '(': 0, ')': 0} for ch in value: if ch in count: count[ch] += 1 if is_separator(ch, count): parts.append("".join(current_part).strip()) current_part = [] else: current_part.append(ch) if current_part: parts.append("".join(current_part).strip()) # 查找包含章节或条款的部分 target_part = next((part for part in parts if '章' in part or '款' in part), None) if target_part: # 删除开头的"符合"或"应满足" target_part = re.sub(r'^(符合|应满足)\s*', '', target_part.strip()) return target_part # 如果没有找到特定章节或条款,返回原始值 return value def generate_questions(input_list): template = ( "关于'{key}',{value}的内容是怎样的?请按json格式给我提供信息,键名为'{key}',而键值需要完全与原文保持一致,不要擅自总结、删减,如果存在未知信息,请在对应键值处填'未知'。" ) questions = [] for input_dict in input_list: for key, value in input_dict.items(): processed_value = preprocess_value(value) question = template.format(key=key, value=processed_value) questions.append(question) return questions """ eg: response_list = [ { "person.name": "Bob", "person.address.city": "Los Angeles" }, { "company.location": "Austin", "person.age": 35 } ] """ # 用新数据更新原有字典 def update_json_data(original_data, response_list): def recursive_update(data, key, value): # 处理点分隔的键,递归定位并更新嵌套字典 keys = key.split('.') for k in keys[:-1]: data = data.setdefault(k, {}) if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict): data[keys[-1]] = {**data.get(keys[-1], {}), **value} else: data[keys[-1]] = value for response_dict in response_list: for key, value in response_dict.items(): recursive_update(original_data, key, value) return original_data def process_match_keys(match_keys, clause_path_file): """ 处理 match_keys,根据其中的数字或中文数字提取相应的条款内容,并附加到原始值后面。 参数: - match_keys (list): 包含键值对的列表。 - clause_path_file (str): clause_path的JSON文件路径。 返回: - list: 更新后的match_keys列表。 """ # 定义数字到中文数字的映射,扩展到'十' digit_map = {'1': '一', '2': '二','3': '三','4': '四','5': '五','6': '六','7': '七','8': '八','9': '九','10': '十'} # 定义中文数字列表 chinese_numerals = ['一', '二', '三', '四', '五', '六', '七', '八', '九', '十'] # 编译一个正则表达式,用于查找中文数字后面跟着的不是'章'或'部分'的字符 # 这个模式会捕获中文数字和紧随其后的一个字符 pattern = re.compile(r'([一二三四五六七八九十]+)(?!章|部分)(.)') # 读取clause_path的内容 try: with open(clause_path_file, 'r', encoding='utf-8') as file: clause_path = json.load(file) except FileNotFoundError: print(f"文件未找到: {clause_path_file}") return match_keys except json.JSONDecodeError: print(f"文件内容不是有效的JSON格式: {clause_path_file}") return match_keys for item in match_keys: for key, value in item.items(): # 将match_keys中的数字1-10转换为对应的中文数字 for digit, chinese in digit_map.items(): value = re.sub(r'{}'.format(digit), chinese, value) # 查找值中所有匹配的中文数字 matches = pattern.findall(value) # 存储需要附加的条款内容,避免重复 clauses_to_append = [] for match in matches: numeral = match[0] # 检查提取的中文数字是否在定义的列表中 if numeral in chinese_numerals: # 在clause_path的键中查找包含该中文数字的键 for clause_key in clause_path.keys(): if numeral in clause_key: clause_value = clause_path[clause_key] if clause_value not in clauses_to_append: clauses_to_append.append(clause_value) if clauses_to_append: # 将找到的条款内容用换行符连接 appended_text = '\n'.join(clauses_to_append) # 更新当前项的值,添加换行和附加内容 item[key] = value + '\n' + appended_text return match_keys #处理如'符合本采购文件第一章第二款要求'的情况,跳转到指定地方摘取内容 def process_additional_queries(combined_res, match_keys, output_folder, notice_path, knowledge_name): """ 处理额外的查询并更新结果。 Args: combined_res: 初始的组合结果。 match_keys: 匹配的章节或条款引用。 [{'资格性审查.资格要求': '符合本采购文件第一章第二款要求,并提供合格有效的证明材料。'}] output_folder: 输出文件夹路径。 notice_path: 通知文件路径。 knowledge_name: 知识库的名称。 Returns: dict: 更新后的最终结果。 """ clause2_path = convert_clause_to_json(notice_path, output_folder, 2) updated_match_keys = process_match_keys(match_keys, clause2_path) if updated_match_keys != match_keys: form_response_dict = update_json_data(combined_res, updated_match_keys) else: ques = generate_questions(match_keys) results = multi_threading(ques, knowledge_name) for _, response in results: if response and len(response) > 1: try: temp = extract_content_from_json(response[1]) updated_match_keys.append(temp) except Exception as e: print(f"形式响应评审:Error processing response: {e}") else: print(f"形式响应评审:Warning: Missing or incomplete response data.") form_response_dict = update_json_data(combined_res, updated_match_keys) # 添加额外的处理步骤 final_result = {"资格审查": form_response_dict} return final_result def combine_qualification_review(output_folder,qualification_path, notice_path,knowledge_name): """ 组合资格性审查和符合性审查的评审结果。 Args: truncate_file: 要上传和处理的文件。 knowledge_name: 知识库的名称,用于后续查询。 Returns: dict: 最终组合的评审结果。 """ # 上传文件并获取文件ID file_id = upload_file(qualification_path) # 定义用户查询列表 user_query = [ "该招标文件中规定的资格性审查标准是怎样的?请以json格式给出,外层为'资格性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关符合性性审查的内容。", "该招标文件中规定的符合性审查标准是怎样的?请以json格式给出,外层为'符合性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关资格性审查的内容。" ] # 执行多线程查询 results = multi_threading(user_query, "", file_id, 2) combined_res = {} for question, response in results: if response: cleaned_data = clean_json_string(response) # 清理大模型回答 processed1 = preprocess_dict(cleaned_data) processed2 = process_dict(processed1) combined_res.update(processed2) else: print(f"Warning: No response for question '{question}'.") # 查找章节或条款引用 match_keys = find_chapter_clause_references(combined_res) #[{'资格性审查.资格要求': '符合本采购文件第一章第二款要求,并提供合格有效的证明材料。'}] # 如果没有匹配的章节或条款,直接返回 combined_res if not match_keys: return {"资格审查":combined_res} # 调用新的函数处理后续逻辑 return process_additional_queries(combined_res, match_keys, output_folder,notice_path,knowledge_name) # 整合基础信息核心代码 # [{'资格性审查.资格要求': '符合本采购文件第一章第二款要求,并提供合格有效的证明材料'}, {'资格性审查.没有重大违法记录的书面声明': '是否提交参加政府采购活动前三年内在经营活动中没有重大违法记录的书面承诺或声明(格式要求详见本项目采购文件第六章相关格式要求)'}] # TODO:有个严重的问题,对于{'资格性审查.资格要求': '符合本采购文件第一章第二款要求,并提供合格有效的证明材料'},调用百炼rag的时候容易得到一模一样的回答,而不是跳转到具体的地方,有两个思路,1.结构化第一章内容 2.优化提示词 3.构造问题的时候不带value,直接问key if __name__ == "__main__": # qualification_path="C:\\Users\\Administrator\\Desktop\\货物标\\output3\\6.2定版视频会议磋商文件_qualification2.pdf" output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\zboutpub" qualification_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output3\\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件_qualification2.pdf" notice_path="C:\\Users\\Administrator\\Desktop\\货物标\\output5\\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件_notice.pdf" knowledge_name = "6.2视频会议docx" res = combine_qualification_review(output_folder,qualification_path, notice_path,knowledge_name) print(json.dumps(res, ensure_ascii=False, indent=4))