# -*- encoding:utf-8 -*- import json import re from functools import cmp_to_key from flask_app.general.json_utils import clean_json_string from flask_app.general.model_continue_query import process_continue_answers from flask_app.general.通义千问long import upload_file, qianwen_long_stream def compare_headings(a, b): a_nums = [int(num) for num in a[0].rstrip('.').split('.') if num.isdigit()] b_nums = [int(num) for num in b[0].rstrip('.').split('.') if num.isdigit()] return (a_nums > b_nums) - (a_nums < b_nums) def preprocess_data(data): """ 预处理数据,自动添加缺失的父层级键,并按数字顺序排序。 """ keys_to_add = set() for key in data.keys(): parts = key.split('.') if len(parts) > 1: parent_key = parts[0] + '.' if parent_key not in data: keys_to_add.add(parent_key) # 添加缺失的父层级键 for parent_key in keys_to_add: data[parent_key] = parent_key.rstrip('.') # 对键进行排序 sorted_data = dict(sorted(data.items(), key=cmp_to_key(compare_headings))) return sorted_data # 转换结构化的JSON数据 #生成结构化的数据 def transform_json(data): result = {} temp = {0: result} # 初始化根字典 data=preprocess_data(data) # 首先,创建一个临时字典用于检查是否存在三级标题 has_subkey = {} for key in data.keys(): parts = key.split('.') if len(parts) > 2 and parts[1]: parent_key = parts[0] + '.' + parts[1] has_subkey[parent_key] = True for key, value in data.items(): match = re.match(r'(\d+)(?:\.(\d+))?(?:\.(\d+))?', key) if match: levels = [int(l) for l in match.groups() if l is not None] if (len(levels) - 1) in temp: parent = temp[len(levels) - 1] else: print(f"No parent found at level {len(levels) - 1} for key '{key}'. Check the data structure.") continue if len(levels) == 1: # 一级标题 # 新增逻辑:判断值中是否有 ':' 或 ':',并进行拆分 # 优先按 '\n' 拆分 if '\n' in value: new_key, *new_value = value.split('\n', 1) new_key = new_key.strip() new_value = new_value[0].strip() if new_value else "" # 如果没有 '\n',再检查 ':' 或 ':',并进行拆分 elif ':' in value or ':' in value: delimiter = ':' if ':' in value else ':' new_key, new_value = value.split(delimiter, 1) new_key = new_key.strip() new_value = new_value.strip() else: new_key = value.strip() new_value = "" parent[new_key] = {} if new_value: parent[new_key][new_key] = new_value # 使用 new_key 作为键名,而不是固定的 "content" temp[len(levels)] = parent[new_key] elif len(levels) == 2: # 二级标题 new_key, *new_value = value.split('\n', 1) new_key = new_key.strip() new_value = new_value[0].strip() if new_value else "" if f"{levels[0]}.{levels[1]}" in has_subkey: parent[new_key] = [new_value] if new_value else [] else: parent[new_key] = new_value temp[len(levels)] = parent[new_key] else: # 三级标题 if isinstance(parent, dict): parent_key = list(parent.keys())[-1] if isinstance(parent[parent_key], list): parent[parent_key].append(value) elif parent[parent_key]: parent[parent_key] = [parent[parent_key], value] else: parent[parent_key] = [value] elif isinstance(parent, list): parent.append(value) def remove_single_item_lists(node): if isinstance(node, dict): for key in list(node.keys()): node[key] = remove_single_item_lists(node[key]) if isinstance(node[key], list) and len(node[key]) == 1: node[key] = node[key][0] return node return remove_single_item_lists(result) # 主要是处理键值中若存在若干序号且每个序号块的内容>=50字符的时候,用列表表示。 def post_process(value): # 如果传入的是非字符串值,直接返回原值 if not isinstance(value, str): return value # 定义可能的分割模式及其正则表达式 patterns = [ (r'\d+、', r'(?=\d+、)'), # 匹配 '1、' (r'[((]\d+[))]', r'(?=[((]\d+[))])'), # 匹配 '(1)' 或 '(1)' (r'\d+\.', r'(?=\d+\.)'), # 匹配 '1.' (r'[一二三四五六七八九十]、', r'(?=[一二三四五六七八九十]、)'), # 匹配 '一、'、'二、' 等 (r'[一二三四五六七八九十]\.', r'(?=[一二三四五六七八九十]\.)') # 匹配 '一.'、'二.' 等 ] # 初始化用于保存最早匹配到的模式及其位置 first_match = None first_match_position = len(value) # 初始值设为文本长度,确保任何匹配都会更新它 # 遍历所有模式,找到第一个出现的位置 for search_pattern, split_pattern_candidate in patterns: match = re.search(search_pattern, value) if match: # 如果这个匹配的位置比当前记录的更靠前,更新匹配信息 if match.start() < first_match_position: first_match = split_pattern_candidate first_match_position = match.start() # 如果找到了最早出现的匹配模式,使用它来分割文本 if first_match: blocks = re.split(first_match, value) else: # 如果没有匹配的模式,保留原文本 blocks = [value] processed_blocks = [] for block in blocks: if not block: continue # 计算中英文字符总数,如果大于50,则加入列表 if block and len(re.findall(r'[\u4e00-\u9fff\w]', block)) >= 50: processed_blocks.append(block.strip()) else: # 如果发现有块长度小于50,返回原数据 return value # 如果所有的块都符合条件,返回分割后的列表 return processed_blocks def process_nested_data(data): # 先检查是否所有值都是 ""、"/" 或空列表 if isinstance(data, dict) and all(v == "" or v == "/" or (isinstance(v, list) and not v) for v in data.values()): return list(data.keys()) # 递归遍历字典,处理最内层的字符串 if isinstance(data, dict): # 如果当前项是字典,继续递归遍历其键值对 result = {} for key, value in data.items(): processed_value = process_nested_data(value) # 如果处理后的值是只有一个元素的列表,就直接使用该元素 if isinstance(processed_value, list) and len(processed_value) == 1: result[key] = processed_value[0] else: result[key] = processed_value return result elif isinstance(data, list): # 如果是列表,直接返回列表,保持原样 return data else: # 到达最内层,处理非字典和非列表的元素(字符串) return post_process(data) #生成无结构的数据货物标 def concatenate_keys_values(section_content): # print(json.dumps(section_content, ensure_ascii=False, indent=4)) """ 将章节内容的键值对拼接成一个字符串列表,每个元素为 "key value"。 Args: section_content (dict): 章节内容的键值对。 Returns: list of str: 拼接后的字符串列表。 """ concatenated = [] def get_level(key): """ 计算给定键的层级。 Args: key (str): 层级键,如 "30.1.1" Returns: int: 层级深度,例如 "30.1.1" 的层级为 3 """ key = key.rstrip('.') parts = key.split('.') return len(parts) for key, value in section_content.items(): level = get_level(key) # 添加缩进:一级无缩进,二级缩进4个空格,三级及以上每多一级加4个空格 indent = ' ' * 4 * (level - 1) concatenated.append(f"{indent}{key} {value}") return concatenated #生成无结构的数据工程标,对提取出的若干键值对,生成外键为target_value,值为列表的新键值对 def extract_sections(data, target_values): """ 从输入字典中提取目标值对应的章节,并为每个子章节添加缩进。 Args: data (dict): 输入的字典,键为层级编号,值为章节内容。 target_values (list): 需要提取的目标章节名称列表。 Returns: dict: 包含目标章节名称作为键,格式化后的子章节列表作为值的字典。 """ result = {} merged_sections = [] processed_keys = set() # 对键进行排序以保持顺序 sorted_keys = sorted( data.keys(), key=lambda x: [int(part) for part in x.rstrip('.').split('.') if part.isdigit()] ) def get_level(key): """ 计算给定键的层级。 Args: key (str): 层级键,如 "3.5.1" Returns: int: 层级深度,例如 "3.5.1" 的层级为 3 """ key = key.rstrip('.') parts = key.split('.') return len(parts) for key in sorted_keys: if key in processed_keys: continue # 跳过已经处理过的键 value = data[key] # 使用子字符串匹配 if any(target in value for target in target_values): section_key_prefix = key if key.endswith('.') else key + '.' section_name = value section_level = get_level(key) subitems = [] for sub_key in sorted_keys: if sub_key.startswith(section_key_prefix) and sub_key != key: sub_value = data[sub_key] sub_level = get_level(sub_key) # 根据层级添加缩进,二级无缩进,三级开始每多一级加4个空格 if sub_level == section_level + 1: indent = '' # 二级层级无缩进 else: indent = ' ' * 4 * (sub_level - section_level - 1) # 三级及以上层级增加缩进 subitems.append(f"{indent}{sub_key} {sub_value}") processed_keys.add(sub_key) # 标记子键为已处理 # 检查是否需要合并 "定标" 和 "中标" 章节 if any(target in section_name for target in ["定标", "中标"]): merged_sections.extend(subitems) else: result[section_name] = subitems processed_keys.add(key) # 标记主键为已处理 # 如果存在需要合并的章节,将其合并为 "定标与中标" if merged_sections: result["定标与中标"] = merged_sections return result #提取两个大标题之间的内容 def extract_between_sections(data, target_values): target_found = False extracted_data = {} current_section_title = "" section_pattern = re.compile(r'^[一二三四五六七八九十]+$') # 匹配 "一", "二", "三" 等大标题 current_block = {} # 遍历所有键值对 for key, value in data.items(): # 只匹配形如 "一": "竞争性磋商响应文件" 的章节标题 if section_pattern.match(key): #匹配到大标题... if target_found: # 如果已经找到了符合的章节,并且遇到了另一个章节 # 保存当前块并重置 if current_block: extracted_data[current_section_title] = current_block current_block = {} target_found = False # 检查当前标题是否包含 target_values 中的任意关键词 if any(tv in value for tv in target_values): target_found = True # 找到了目标章节,开始捕获后续内容 current_section_title = value # 保存章节标题内容 elif target_found: # 匹配到普通序号... current_block[key] = value # 保存最后一个块(如果有的话) if current_block: extracted_data[current_section_title] = current_block return extracted_data def get_requirements_with_gpt(merged_baseinfo_path, selection): """ 根据 selection 的值选择相应的用户查询,并调用大模型获取要求。 Args: merged_baseinfo_path (str): 无效文件的路径,用于上传。 selection (int): 选择的类型(1、2 或 3)。 Returns: dict: 大模型返回的要求结果,或错误信息。 """ # 上传文件并获取 file_id file_id = upload_file(merged_baseinfo_path) # 定义 selection 对应的用户查询 user_queries = { # 1: """ # 该招标文件中对投标文件的要求是什么?你需要从'编写要求'、'格式要求'、'承诺书要求'、'递交要求'四个角度来回答,其中'格式'可以从投标文件格式要求、标记要求、装订要求、文件数量要求角度说明,而不一定一致;'递交要求'可以从投标地点、投标文件交标方式、投标文件的修改与撤回角度说明,而不一定一致;请以json格式返回给我结果,外层键名分别为'编写要求','格式','承诺书要求','递交要求',你可以用嵌套键值对组织回答,嵌套键名为你对相关子要求的总结,而嵌套键名应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。输出格式示例如下,内容与该文件无关: # { # "编写要求":"编写要求xxx", # "格式要求":{ # "投标文件格式要求":"投标文件格式要求", # "标记要求":"投标文件标记要求", # "装订要求":"投标文件装订要求", # "文件数量":"投标文件文件数量要求" # }, # "承诺书要求":"承诺书要求xxx", # "递交要求":{ # "投标地点":"投标地点", # "投标文件交标方式":"交标方式", # "投标文件的修改与撤回":["投标文件的修改相关要求","投标文件的撤回相关要求"] # } # } # """, 1:"""请根据提供的招标文件内容,提取其中有关投标文件的要求、内容,并以 JSON 格式返回结果。 格式要求: - 外层键名应为文中提到的有关投标文件相关要求、内容的大项概览性标题。 - 对于每个大项下的具体要求内容,有以下两种组织方式: -不使用嵌套键值对: - 如果要求是单一内容,键值应为单独的字符串,内容必须与原文保持一致,不得总结或删减。 - 如果要求包含多个并列内容,键值应为一个字符串列表(数组),其中每个元素都是一条子要求。 -使用嵌套键值对。 -嵌套键名应为原文中的具体标题或对相关子要求的简明总结。 -最内层的键值应与原文内容保持一致,不得进行任何总结、删减或改写。默认键值是单独的字符串,如果一个子要求包含多个并列内容,键值应为一个字符串列表(数组),其中每个元素都是子要求内容。 - 特别限制: - 若文件中有类似“投标文件格式要求”的小节,禁止输出原文中的表格格式示例,请仅提取并描述具体的文字部分的格式要求,而不是重现表格内容;若无类似小节,请忽略这点,也无需返回该键值对。 表格内容处理: - 如果原文中对应内容以表格形式呈现,请使用 Markdown 语法准确重现该表格。 - 表格的每一行应作为键值(字符串列表)中的一个独立字符串,保持表格结构和内容的完整性。 禁止内容: - 确保所有输出内容均基于提供的实际招标文件内容,不使用任何预设的示例作为回答。 - 预设的示例中的外层键名仅供格式参考,以文中实际内容为主。 示例格式(**不要**在回答中包含此内容,仅供参考): { "投标的语言":"投标人提交的投标文件以及投标人与集中采购机构或采购人就有关投标的所有来往函电均应使用中文。", "格式要求":{ "投标文件格式要求":"投标文件格式要求", "标记要求":"投标文件标记要求", "装订要求":"投标文件装订要求", "文件数量":"投标文件文件数量要求" }, "投标报价":[ "投标人所提供的货物(工程或服务)均以人民币计价。", "应包含所有相关费用;" ] "递交要求":{ "递交投标文件的截止日期及递交地点":["递交投标文件的截止日期xxx","递交投标文件的递交地点xxx"], "投标文件交标方式":"交标方式xxx", "投标文件的修改与撤回":["投标文件的修改相关要求xxx","投标文件的撤回相关要求xxx"] } } """, # 2: """ # 该招标文件中开标、评标、定标要求(或磋商流程内容)是什么?你需要从'开标'、'开标异议'、'评标'、'定标'四个角度回答,其中'评标'可以从特殊情况的处置、评标办法及流程、评标委员会的组建角度来说明,'定标'可以从定标流程、履约能力的审查角度来说明,请以json格式返回给我结果,外层键名分别为'开标'、'开标异议'、'评标'、'定标',你可以用嵌套键值对组织回答,嵌套键名为你对相关子要求的总结,而嵌套键值应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。输出格式示例如下: # { # "开标":"招标文件关于项目开标的要求", # "开标异议":"招标文件中关于开标异议的项", # "评标":{ # "特殊情况的处置":"因“电子交易系统”系统故障导致无法投标的,交易中心及时通知招标人,招标人视情况决定是否顺延投标截止时间。因投标人自身原因导致无法完成投标的,由投标人自行承担后果。", # "评标办法及流程":"评标流程", # "评标委员会的组建":"评标由招标人依法组建的评标委员会负责。评标委员会由招标人或其委托的招标代理机构熟悉相关业务的代表,以及有关技术、经济等方面的专家组成。" # }, # "定标":{ # "定标流程":"定标流程", # "履约能力的审查":"履约能力的审查" # } # } # """, 2:"""该招标文件中有关开标、评标、定标、中标要求、内容(或磋商流程内容)是什么?请以 JSON 格式返回结果。 格式要求: - 外层键名应为文中提到的有关开标、评标、定标、中标要求或磋商流程内容的大项概览性标题。 - 对于每个大项下的子要求,使用嵌套的键值对进行组织。 -其中嵌套键名应为原文中的具体标题或对相关子要求的简明总结。 -最内层的键值应与原文内容保持一致,不得进行任何总结、删减或改写。默认键值是单独的字符串,如果一个子要求包含多个并列内容,键值应为一个字符串列表(数组),其中每个元素都是子要求内容。 表格内容处理: - 如果原文中对应内容以表格形式呈现,请使用 Markdown 语法准确重现该表格。 - 表格的每一行应作为键值中的一个独立字符串,保持表格结构和内容的完整性。 禁止内容: - 确保所有输出内容均基于提供的实际招标文件内容,不使用任何预设的示例作为回答。 - 预设的示例中的外层键名仅供格式参考,以文中实际内容为主。 示例格式(**不要**在回答中包含此内容,仅供参考): { "开标": { "开标时间":"2024.10.30", "开标地点":"洪山区人民政府", "资格审查":[ "公开招标采购项目开标结束后,采购人与集中采购机构依据法律、法规及招标文件的规定", "资格审查详见第四章“资格审查方法及标准”。" ], }, "评标": { "特殊情况的处置": "因“电子交易系统”系统故障导致无法投标的,交易中心及时通知招标人,招标人视情况决定是否顺延投标截止时间。因投标人自身原因导致无法完成投标的,由投标人自行承担后果。", "评标办法及流程": "评标流程", "评标委员会的组建": "评标由招标人依法组建的评标委员会负责。评标委员会由招标人或其委托的招标代理机构熟悉相关业务的代表,以及有关技术、经济等方面的专家组成。" }, "定标": { "定标流程": ["定标流程1","定标流程2"], "履约能力的审查": "履约能力的审查" } } """, 3: """ 该招标文件中重新招标(或重新采购)、不再招标(或不再采购)、终止招标(或终止采购)的情况分别是什么?请以json格式返回给我结果,键名分别为'重新招标','不再招标','终止招标',键值应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。 示例输出如下,仅供格式参考: { "重新招标":"有下列情形之一的,招标人将重新招标:(1)投标截止时间止,投标人少于3个的;(2)经评标委员会评审后否决所有投标的;", "不再招标":"重新招标后投标人仍少于3个或者所有投标被否决的,属于必须审批或核准的工程建设项目,经原审批或核准部门批准后不再进行招标。", "终止招标":"未知" } """ } # 根据 selection 选择相应的 user_query user_query = user_queries.get(selection) if not user_query: return {"error": f"无效的 selection 值: {selection}. 请选择 1、2 或 3。"} # 调用大模型并处理响应 try: questions_to_continue = [] qianwen_res = qianwen_long_stream(file_id, user_query,2,1,True) message = qianwen_res[0] parsed = clean_json_string(message) total_tokens = qianwen_res[1] if not parsed and total_tokens >5900: questions_to_continue.append((user_query, message)) if questions_to_continue: continued_results = process_continue_answers(questions_to_continue, 0, file_id) return continued_results else: return parsed except Exception as e: return {"error": "调用大模型失败"} if __name__ == "__main__": merged_baseinfo_path = r"D:\flask_project\flask_app\static\output\output1\eabefc28-142f-4bb5-b1be-e86e43bb87b5\invalid_del.docx" selection=1 res=get_requirements_with_gpt(merged_baseinfo_path,selection) print(json.dumps(res,ensure_ascii=False,indent=4))