from concurrent.futures import ThreadPoolExecutor, as_completed from flask_app.general.通义千问long import upload_file, qianwen_long from flask_app.general.json_utils import clean_json_string def combine_qualification_new(invalid_path, qualification_path,notice_path): detailed_res = {} # 初始化无效文件ID invalid_file_id = None if qualification_path: # 上传资格文件并获取文件ID qualification_file_id = upload_file(qualification_path) # 定义第一个查询,用于检查资格性审查是否存在 first_query = """ 该文档中是否有关于资格性审查标准的具体内容?请以json格式给出回答,外键为'资格性审查',键值仅限于'是','否',输出格式示例如下: { "资格性审查":"是" } """ # 执行第一个查询并清洗返回的JSON字符串 print("call first_query") first_res = clean_json_string(qianwen_long(qualification_file_id, first_query)) # 判断是否存在资格性审查 zige_file_id = qualification_file_id if first_res.get("资格性审查") == "是" else None # 如果需要,上传无效文件 if zige_file_id is None: if invalid_file_id is None: invalid_file_id = upload_file(invalid_path) zige_file_id = invalid_file_id else: # 如果 qualification_path 为空,直接使用无效文件 zige_file_id = upload_file(invalid_path) # 定义第二组查询,仅包含资格性审查 second_query = [ { "key": "资格性审查", "query": "该招标文件中规定的资格性审查标准是怎样的?请以json格式给出,外层为'资格性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关符合性审查的内容。" } ] # 定义任务函数 def process_second_query(key, query, file_id): print("call second_query") try: res = qianwen_long(file_id, query) cleaned_res = clean_json_string(res) return key, cleaned_res.get(key, "未找到相关内容") except Exception as e: print(f"执行查询 '{key}' 时出错: {e}") return key, "查询失败" def process_notice(notice_path): print("call notice_path") try: # 上传通知文件并获取文件ID file_id1 = upload_file(notice_path) # 定义用户查询,提取申请人资格要求 user_query1 = """ 第一章招标公告(投标邀请书)中说明的申请人资格要求是怎样的?请以json格式给出回答,外键为'申请人资格要求',键值为字符串列表,其中每个字符串对应原文中的一条要求,你的回答与原文内容一致,不要擅自总结删减。输出格式示例如下: { "申请人资格要求":[ "1.满足《中华人民共和国政府采购法》第二十二条规定;", "1.1 法人或者其他组织的营业执照等证明文件,如供应商是自然人的提供身份证明材料;", "2.未被列入“信用中国”网站(www.creditchina.gov.cn)信用服务栏失信被执行人、重大税收违法案件当事人名单;" ] } """ # 执行查询并清洗结果 res1 = clean_json_string(qianwen_long(file_id1, user_query1)) # 提取申请人资格要求 requirements = res1.get("申请人资格要求", "未找到相关内容") return "申请人资格要求", requirements except Exception as e: print(f"处理申请人资格要求时出错: {e}") return "申请人资格要求", "处理失败" # 初始化 ThreadPoolExecutor with ThreadPoolExecutor(max_workers=2) as executor: future_to_key = {} # 提交第二组查询 for query_info in second_query: key = query_info["key"] query = query_info["query"] current_file_id = zige_file_id future = executor.submit(process_second_query, key, query, current_file_id) future_to_key[future] = key # 有条件地提交通知处理 if notice_path: future = executor.submit(process_notice, notice_path) future_to_key[future] = "申请人资格要求" else: future = executor.submit(process_notice, invalid_path) future_to_key[future] = "申请人资格要求" # 收集结果(按完成顺序) for future in as_completed(future_to_key): key, result = future.result() detailed_res[key] = result # 定义所需的顺序 desired_order = ["申请人资格要求", "资格性审查"] # print(json.dumps(detailed_res,ensure_ascii=False,indent=4)) # 创建一个新的有序字典 ordered_res = {} for key in desired_order: if key in detailed_res: ordered_res[key] = detailed_res[key] # 最终处理结果,例如打印或保存 return {"资格审查": ordered_res}