import re from functools import cmp_to_key from flask_app.general.json_utils import clean_json_string from flask_app.general.通义千问long import upload_file, qianwen_long def compare_headings(a, b): a_nums = [int(num) for num in a[0].rstrip('.').split('.') if num.isdigit()] b_nums = [int(num) for num in b[0].rstrip('.').split('.') if num.isdigit()] return (a_nums > b_nums) - (a_nums < b_nums) def preprocess_data(data): """ 预处理数据,自动添加缺失的父层级键,并按数字顺序排序。 """ keys_to_add = set() for key in data.keys(): parts = key.split('.') if len(parts) > 1: parent_key = parts[0] + '.' if parent_key not in data: keys_to_add.add(parent_key) # 添加缺失的父层级键 for parent_key in keys_to_add: data[parent_key] = parent_key.rstrip('.') # 对键进行排序 sorted_data = dict(sorted(data.items(), key=cmp_to_key(compare_headings))) return sorted_data # 转换结构化的JSON数据 #No parent found at level 1 for key '24.2'. Check the data structure. def transform_json(data): result = {} temp = {0: result} # 初始化根字典 data=preprocess_data(data) # 首先,创建一个临时字典用于检查是否存在三级标题 has_subkey = {} for key in data.keys(): parts = key.split('.') if len(parts) > 2 and parts[1]: parent_key = parts[0] + '.' + parts[1] has_subkey[parent_key] = True for key, value in data.items(): match = re.match(r'(\d+)(?:\.(\d+))?(?:\.(\d+))?', key) if match: levels = [int(l) for l in match.groups() if l is not None] if (len(levels) - 1) in temp: parent = temp[len(levels) - 1] else: print(f"No parent found at level {len(levels) - 1} for key '{key}'. Check the data structure.") continue if len(levels) == 1: # 一级标题 # 新增逻辑:判断值中是否有 ':' 或 ':',并进行拆分 # 优先按 '\n' 拆分 if '\n' in value: new_key, *new_value = value.split('\n', 1) new_key = new_key.strip() new_value = new_value[0].strip() if new_value else "" # 如果没有 '\n',再检查 ':' 或 ':',并进行拆分 elif ':' in value or ':' in value: delimiter = ':' if ':' in value else ':' new_key, new_value = value.split(delimiter, 1) new_key = new_key.strip() new_value = new_value.strip() else: new_key = value.strip() new_value = "" parent[new_key] = {} if new_value: parent[new_key][new_key] = new_value # 使用 new_key 作为键名,而不是固定的 "content" temp[len(levels)] = parent[new_key] elif len(levels) == 2: # 二级标题 new_key, *new_value = value.split('\n', 1) new_key = new_key.strip() new_value = new_value[0].strip() if new_value else "" if f"{levels[0]}.{levels[1]}" in has_subkey: parent[new_key] = [new_value] if new_value else [] else: parent[new_key] = new_value temp[len(levels)] = parent[new_key] else: # 三级标题 if isinstance(parent, dict): parent_key = list(parent.keys())[-1] if isinstance(parent[parent_key], list): parent[parent_key].append(value) elif parent[parent_key]: parent[parent_key] = [parent[parent_key], value] else: parent[parent_key] = [value] elif isinstance(parent, list): parent.append(value) def remove_single_item_lists(node): if isinstance(node, dict): for key in list(node.keys()): node[key] = remove_single_item_lists(node[key]) if isinstance(node[key], list) and len(node[key]) == 1: node[key] = node[key][0] return node return remove_single_item_lists(result) # 主要是处理键值中若存在若干序号且每个序号块的内容>=50字符的时候,用列表表示。 def post_process(value): # 如果传入的是非字符串值,直接返回原值 if not isinstance(value, str): return value # 定义可能的分割模式及其正则表达式 patterns = [ (r'\d+、', r'(?=\d+、)'), # 匹配 '1、' (r'[((]\d+[))]', r'(?=[((]\d+[))])'), # 匹配 '(1)' 或 '(1)' (r'\d+\.', r'(?=\d+\.)'), # 匹配 '1.' (r'[一二三四五六七八九十]、', r'(?=[一二三四五六七八九十]、)'), # 匹配 '一、'、'二、' 等 (r'[一二三四五六七八九十]\.', r'(?=[一二三四五六七八九十]\.)') # 匹配 '一.'、'二.' 等 ] # 初始化用于保存最早匹配到的模式及其位置 first_match = None first_match_position = len(value) # 初始值设为文本长度,确保任何匹配都会更新它 # 遍历所有模式,找到第一个出现的位置 for search_pattern, split_pattern_candidate in patterns: match = re.search(search_pattern, value) if match: # 如果这个匹配的位置比当前记录的更靠前,更新匹配信息 if match.start() < first_match_position: first_match = split_pattern_candidate first_match_position = match.start() # 如果找到了最早出现的匹配模式,使用它来分割文本 if first_match: blocks = re.split(first_match, value) else: # 如果没有匹配的模式,保留原文本 blocks = [value] processed_blocks = [] for block in blocks: if not block: continue # 计算中英文字符总数,如果大于50,则加入列表 if block and len(re.findall(r'[\u4e00-\u9fff\w]', block)) >= 50: processed_blocks.append(block.strip()) else: # 如果发现有块长度小于50,返回原数据 return value # 如果所有的块都符合条件,返回分割后的列表 return processed_blocks def process_nested_data(data): # 先检查是否所有值都是 ""、"/" 或空列表 if isinstance(data, dict) and all(v == "" or v == "/" or (isinstance(v, list) and not v) for v in data.values()): return list(data.keys()) # 递归遍历字典,处理最内层的字符串 if isinstance(data, dict): # 如果当前项是字典,继续递归遍历其键值对 result = {} for key, value in data.items(): processed_value = process_nested_data(value) # 如果处理后的值是只有一个元素的列表,就直接使用该元素 if isinstance(processed_value, list) and len(processed_value) == 1: result[key] = processed_value[0] else: result[key] = processed_value return result elif isinstance(data, list): # 如果是列表,直接返回列表,保持原样 return data else: # 到达最内层,处理非字典和非列表的元素(字符串) return post_process(data) def get_requirements_with_gpt(invalid_path, selection): """ 根据 selection 的值选择相应的用户查询,并调用大模型获取要求。 Args: invalid_path (str): 无效文件的路径,用于上传。 selection (int): 选择的类型(1、2 或 3)。 Returns: dict: 大模型返回的要求结果,或错误信息。 """ # 上传文件并获取 file_id file_id = upload_file(invalid_path) # 定义 selection 对应的用户查询 user_queries = { 1: """ 该招标文件中对投标文件的要求是什么?你需要从'编写要求'、'格式要求'、'承诺书要求'、'递交要求'四个角度来回答,其中'格式'可以从投标文件格式要求、标记要求、装订要求、文件数量要求角度说明,'递交要求'可以从投标地点、投标文件交标方式、投标文件的修改与撤回角度说明,请以json格式返回给我结果,外层键名分别为'编写要求','格式','承诺书要求','递交要求',你可以用嵌套键值对组织回答,嵌套键名为你对相关子要求的总结,而嵌套键名应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。输出格式示例如下: { "编写要求":"投标函的编写要求xxx;法定代表人身份证明要求xx", "格式要求":{ "投标文件格式要求":"投标文件格式要求", "标记要求":"投标文件标记要求", "装订要求":"投标文件装订要求", "文件数量":"投标文件文件数量要求" }, "承诺书要求":"未知", "递交要求":{ "投标地点":"使用加密其投标文件的CA数字证书(企业锁)登录“电子交易系统”,进入“开标大厅”选择所投标段进行签到,并实时在线关注招标人的操作情况。", "投标文件交标方式":"线上开标", "投标文件的修改与撤回":"在投标人须知前附表规定的投标有效期内,投标人不得要求撤销或修改其投标文件。出现特殊情况需要延长投标有效期的,招标人以书面形式通知所有投标人延长投标有效期。投标人同意延长的,应相应延长其投标保证金的有效期,但不得要求或被允许修改或撤销其投标文件;投标人拒绝延长的,其投标失效,但投标人有权收回其投标保证金。" } } """, 2: """ 该招标文件中开标、评标、定标要求(或磋商流程内容)是什么?你需要从'开标'、'开标异议'、'评标'、'定标'四个角度回答,其中'评标'可以从特殊情况的处置、评标办法及流程、评标委员会的组建角度来说明,'定标'可以从定标流程、履约能力的审查角度来说明,请以json格式返回给我结果,外层键名分别为'开标'、'开标异议'、'评标'、'定标',你可以用嵌套键值对组织回答,嵌套键名为你对相关子要求的总结,而嵌套键名应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。输出格式示例如下: { "开标":"招标文件关于项目开标的要求", "开标异议":"招标文件中关于开标异议的项", "评标":{ "特殊情况的处置":"因“电子交易系统”系统故障导致无法投标的,交易中心及时通知招标人,招标人视情况决定是否顺延投标截止时间。因投标人自身原因导致无法完成投标的,由投标人自行承担后果。", "评标办法及流程":"评标流程", "评标委员会的组建":"评标由招标人依法组建的评标委员会负责。评标委员会由招标人或其委托的招标代理机构熟悉相关业务的代表,以及有关技术、经济等方面的专家组成。" }, "定标":{ "定标流程":"定标流程", "履约能力的审查":"履约能力的审查" } } """, 3: """ 该招标文件中重新招标(或采购)、不再招标(或采购)、终止招标(或采购)的情况分别是什么?请以json格式返回给我结果,键名分别为'重新招标','不再招标','终止招标',键值应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。示例输出如下: { "重新招标":"有下列情形之一的,招标人将重新招标:(1)投标截止时间止,投标人少于3个的;(2)经评标委员会评审后否决所有投标的;", "不再招标":"重新招标后投标人仍少于3个或者所有投标被否决的,属于必须审批或核准的工程建设项目,经原审批或核准部门批准后不再进行招标。", "终止招标":"未知" } """ } # 根据 selection 选择相应的 user_query user_query = user_queries.get(selection) if not user_query: return {"error": f"无效的 selection 值: {selection}. 请选择 1、2 或 3。"} # 调用大模型并处理响应 try: res = qianwen_long(file_id, user_query) cleaned_res = clean_json_string(res) return cleaned_res except Exception as e: return {"error": "调用大模型失败"}