# -*- encoding:utf-8 -*- import ast import logging import re from flask_app.general.json_utils import clean_json_string from flask_app.general.多线程提问 import multi_threading from flask_app.general.通义千问long import upload_file, qianwen_long from flask_app.工程标.判断是否分包等 import read_questions_from_judge def get_deviation_requirements(invalid_path): file_id=upload_file(invalid_path) user_query="""该招标文件对响应文件(投标文件)偏离项的要求或内容是怎样的?请不要回答具体的技术参数,也不要回答具体的评分要求。请以json格式给我提供信息,外层键名为'偏离',若存在嵌套信息,嵌套内容键名为文件中对应字段或是你的总结,而嵌套键值必须与原文保持一致,若文中未涉及相关内容,在键值中填'未知'。 注意:不使用任何预设的示例作为回答,示例仅作为格式参考。 禁止内容: 确保所有输出内容均基于提供的实际招标文件内容,不使用任何预设的示例作为回答。 示例1,嵌套键值对情况: { "偏离":{ "技术要求":"以★标示的内容不允许负偏离", "商务要求":"以★标示的内容不允许负偏离" } } 示例2,无嵌套键值对情况: { "偏离":"所有参数需在技术响应偏离表内响应,如应答有缺项,且无有效证明材料的,评标委员会有权不予认可,视同负偏离处理" } """ model_res=qianwen_long(file_id,user_query) return clean_json_string(model_res) def process_judge_questions(judge_file_path, chosen_numbers, invalid_path, baseinfo_list1): file_id=upload_file(invalid_path) judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers) judge_consortium = judge_consortium_bidding(baseinfo_list1) if judge_consortium: judge_consortium_question = ( "该招标文件对于联合体投标的要求是怎样的,请按json格式给我提供信息," "外层键名为'联合体投标要求',其中有一个嵌套键值对为:\"是否接受联合体投标\":\"是\"" ) judge_questions.append(judge_consortium_question) if not judge_questions: print("process_judge_questions:没有需要处理的 judge_questions,跳过 multi_threading 调用。") return # 或者根据需要返回其他值 # file_id3 = upload_file(merged_baseinfo_path) res2 = multi_threading(judge_questions, "", file_id, 2) for question, response in res2: baseinfo_list1.append(clean_json_string(response)) def aggregate_basic_info(baseinfo_list,mode="engineering"): """ 将基础信息列表中的数据进行合并和分类。 参数: - baseinfo_list (list): 包含多个基础信息的列表。 返回: - dict: 合并和分类后的基础信息字典。 """ key_groups = { "招标人/代理信息": ["招标人", "招标人联系方式", "招标代理机构", "招标代理机构联系方式","项目联系方式"], "项目信息": ["项目名称", "项目编号", "项目概况", "项目基本情况", "招标控制价", "投标竞争下浮率"], "关键时间/内容": [ "投标文件递交截止日期", "开标时间", "开标地点", "澄清招标文件的截止时间", "投标有效期", "信息公示媒介" ], "保证金相关": [], "其他信息": [ "重新招标、不再招标和终止招标", "投标费用承担", "是否退还投标文件", ] } # 如果模式是 'goods',则添加 "采购要求" 组 if mode == 'goods': key_groups["采购要求"] = [ "采购需求", "技术要求", "服务要求", "商务要求", "其他要求" ] # 定义采购要求的默认值 DEFAULT_PROCUREMENT_REQS = { "采购需求": {}, "技术要求": [], "商务要求": [], "服务要求": [], "其他要求": [] } combined_data = {} relevant_keys_detected = set() # 合并所有基础信息并收集相关键 for baseinfo in baseinfo_list: combined_data.update(baseinfo) relevant_keys_detected.update(baseinfo.keys()) # 动态调整键组 dynamic_key_handling(key_groups, relevant_keys_detected) # 创建一个副本以存储未分类的项目 unclassified_items = {k: v for k, v in combined_data.items() if k not in [item for sublist in key_groups.values() for item in sublist]} # 定义一个辅助函数,用于查找包含目标键的实际键 def find_matching_key(combined_data, target_key): for k in combined_data: if target_key in k: return k return None # 按键组分类并嵌套 for group_name, keys in key_groups.items(): group_data = {} for key in keys: if group_name == "采购要求": # 在“采购要求”组中使用子字符串匹配 matched_key = find_matching_key(combined_data, key) if matched_key: group_data[matched_key] = combined_data[matched_key] # 从未分类项目中移除已匹配的键 unclassified_items.pop(matched_key, None) else: group_data[key] = DEFAULT_PROCUREMENT_REQS.get(key, "未提供") else: # 在其他组中使用严格匹配 if key in combined_data: group_data[key] = combined_data[key] # 从未分类项目中移除已匹配的键 unclassified_items.pop(key, None) else: group_data[key] = "未提供" # 特殊处理 "招标人/代理信息" 组中的 "项目联系方式" if group_name == "招标人/代理信息" and "项目联系方式" in group_data: project_contact = group_data["项目联系方式"] if isinstance(project_contact, dict): if all(inner_value == "未知" for inner_value in project_contact.values()): # 移除 "项目联系方式" del group_data["项目联系方式"] combined_data[group_name] = group_data # 将剩余未分类的键值对添加到 "其他信息" 组(如果存在) if "其他信息" in combined_data: combined_data["其他信息"].update(unclassified_items) else: combined_data["其他信息"] = unclassified_items # 移除顶层的未分类键值对 for key in list(combined_data.keys()): if key not in key_groups: del combined_data[key] return combined_data def dynamic_key_handling(key_groups, detected_keys): # 检查和调整键组配置 for key in detected_keys: if "技术、服务要求" in detected_keys: # 如果检测到“技术、服务要求”,则移除“技术要求”和“服务要求” if "技术要求" in key_groups["采购要求"]: key_groups["采购要求"].remove("技术要求") if "服务要求" in key_groups["采购要求"]: key_groups["采购要求"].remove("服务要求") # 确保"技术、服务要求"存在于"采购要求"组中 if "技术、服务要求" not in key_groups["采购要求"]: key_groups["采购要求"].insert(1, "技术、服务要求") # 处理“保证金相关”组 elif "保证金" in key: if "保证金相关" not in key_groups: key_groups["保证金相关"] = [] # 直接追加到 "保证金相关" 组的末尾 if key not in key_groups["保证金相关"]: key_groups["保证金相关"].append(key) elif "联合体" in key: key_groups["项目信息"].append(key) elif "分包" in key: key_groups["项目信息"].append(key) elif "踏勘现场" in key: key_groups["其他信息"].append(key) elif "投标预备会" in key: key_groups["其他信息"].append(key) elif "偏离" in key: key_groups["其他信息"].append(key) elif "递交方式" in key or "递交地点" in key: group = key_groups["关键时间/内容"] insert_after = "投标文件递交截止日期" if insert_after in group: index = group.index(insert_after) # 确保新键不重复 if key not in group: group.insert(index + 1, key) else: # 如果“投标文件递交截止日期”不存在,则追加到末尾 if key not in group: group.append(key) def judge_consortium_bidding(baseinfo_list): updated_list = [] accept_bidding = False for baseinfo in baseinfo_list: # 检查 "是否接受联合体投标" 键是否存在且其值为 "是" if "是否接受联合体投标" in baseinfo and baseinfo["是否接受联合体投标"] == "是": accept_bidding = True # 从字典中移除特定键值对 baseinfo.pop("是否接受联合体投标", None) # # 将修改后的 json 数据转换回 JSON 字符串(如果需要) # updated_info = json.dumps(json_data) updated_list.append(baseinfo) # 更新原始列表,如果你想保留修改 baseinfo_list[:] = updated_list return accept_bidding #字符串列表转为普通列表,从大模型回答中提取 def process_string_list(string_list): # 使用正则表达式匹配方括号内的内容 try: match = re.search(r'\[(.*?)\]', string_list) if match: # 获取匹配的内容,即方括号内的部分 content_inside_brackets = match.group(1) if content_inside_brackets: # 检查内容是否为空 # 提取所有数字项,并转换为整数 numbers = [ int(item.strip()) for item in content_inside_brackets.split(',') if re.match(r'^\d+$', item.strip()) # 正则表达式判断是否为纯数字 ] return numbers else: return [] # 如果内容为空,直接返回空列表 else: return [] # 如果没有匹配到内容,返回空列表 except Exception as e: print(f"Error occurred: {e}") return [] # 出现任何异常时返回空列表 def get_global_logger(unique_id): if unique_id is None: return logging.getLogger() # 获取默认的日志器 logger = logging.getLogger(unique_id) return logger