diff --git a/flask_app/general/post_processing.py b/flask_app/general/post_processing.py index 25d38fa..0f3b157 100644 --- a/flask_app/general/post_processing.py +++ b/flask_app/general/post_processing.py @@ -8,12 +8,50 @@ from flask_app.general.format_amout import format_amount from flask_app.general.纯技术参数要求提取 import extract_matching_keys from flask_app.general.json_utils import clean_json_string import concurrent.futures +def process_functions_in_parallel(tech_deviation, combined_data, zige_info, fuhe_info, zigefuhe_info): + # 准备输入参数 + tech_deviation_json = json.dumps(tech_deviation, ensure_ascii=False, indent=4) + combined_data_nested = get_nested(combined_data, ['基础信息'], {}) + # 定义任务和对应参数 + tasks = [ + ("tech_star_deviation", get_tech_star_deviation, (tech_deviation_json,)), + ("business_deviation_and_star", extract_business_deviation, (combined_data_nested,)), + ("zigefuhe_deviation", extract_zige_deviation_table, (zige_info, fuhe_info, zigefuhe_info)), + ] + + results = {} + + # 执行多线程任务 + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_task = {executor.submit(func, *args): name for name, func, args in tasks} + + for future in concurrent.futures.as_completed(future_to_task): + name = future_to_task[future] + try: + result = future.result() + # 处理返回值(如果任务返回多个值,需要解包) + if name == "business_deviation_and_star": + results["business_deviation"], results["business_star_deviation"] = result + else: + results[name] = result + except Exception as e: + print(f"Task {name} failed with exception: {e}") + results[name] = None + + # 返回结果 + return ( + results.get("tech_star_deviation"), + results.get("business_deviation"), + results.get("business_star_deviation"), + results.get("zigefuhe_deviation") + ) def get_tech_star_deviation(tech_string): prompt_template = """以下文本是采购的货物的技术参数要求或者采购要求,请你帮我从键值列表中各字符串中提取带星★或带三角▲的要求项,你的返回格式同输入文本格式,各外键名为输入文本中的各外键,键值为字符串列表,其中每个字符串为对应货物或系统或功能模块的带星或带三角的要求项。 要求与指南: 1. 若输入文本内的货物或系统或功能模块底下没有带星★或带三角▲的要求项,直接不返回该键值对,而不是返回空值。 2. 每个星★或三角▲要求占据一个字符串。 +3. 若没有带星★或三角▲的要求,键值为空列表,即[] ### 示例输入如下: {{ "摄像机控制键盘": [ @@ -49,7 +87,7 @@ def get_tech_star_deviation(tech_string): # print(model_res) tech_star_deviation = clean_json_string(model_res) return tech_star_deviation -def extract_business_requirements(data): +def extract_business_deviation(data): procurement = data.get("采购要求", {}) new_data = {} @@ -135,7 +173,7 @@ def extract_business_requirements(data): return business_req_deviation, business_star_req_deviation -def extract_zige_deviation_table(zige_info, fuhe_info): +def extract_zige_deviation_table(zige_info, fuhe_info, zigefuhe_info): prompt_template1 = """ 任务:给出一份文本,根据文本提取资格性检查的具体评审标准。 输出要求: @@ -143,7 +181,8 @@ def extract_zige_deviation_table(zige_info, fuhe_info): 2.键名为"资格性检查",键值为字符串列表,每个字符串为一条评审标准,评审标准不分先后,不要有序号标注。 要求与指南: 1. 评审标准是具体的内容,不要返回诸如'本项目的特定资格要求:'这种标题性质且不能体现具体评审标准的内容。 - 2. 存在相同或相似表述的键名,需要合并列表,键名取其一即可。 + 2. 若文本中存在相同或相似的表述,仅需取其中一个作为键值中的一条即可。 + 文本内容:{full_text} """ @@ -152,33 +191,71 @@ def extract_zige_deviation_table(zige_info, fuhe_info): 输出要求: 1.以json格式返回结果,不要输出其他内容。 2.键名为"符合性检查",键值为字符串列表,每个字符串为一条评审标准,评审标准不分先后,不要有序号标注。 + 3.仔细检查你所选取的标准,若发现这些标准实际上是在描述不允许出现的符合性审查情况,则将外键替换为'符合性检查(以下情况不得出现)',并将这些标准写入其中。 要求与指南: - 1. 评审标准是具体的内容,不要返回诸如'本项目的特定符合性要求:'这种标题性质且不能体现具体评审标准的内容。 - 2. 存在相同或相似表述的键名,需要合并列表,键名取其一即可。 + 1. 评审标准应该是具体的内容,不要返回诸如'本项目的特定符合性要求:'这种标题性质且不能体现具体评审标准的内容。 + 2. 若文本中存在相同或相似的表述,仅需取其中一个作为键值中的一条即可。 + 输出示例1: + {{ + "符合性检查": [ + "因素1", + "因素2", + ... + ] + }} + 输出示例2: + {{ + "符合性检查(以下情况不得出现)": [ + "因素1", + "因素2", + ... + ] + }} + 文本内容:{full_text} """ + prompt_template3 = """ + 任务:给出一份文本,根据文本提取资格性检查和符合性检查的具体评审标准。 + 输出要求: + 1.以json格式返回结果,不要输出其他内容。 + 2.键名为"资格性和符合性检查",键值为字符串列表,每个字符串为一条评审标准,评审标准不分先后,不要有序号标注。 + 要求与指南: + 1. 评审标准应该是具体的内容,不要返回诸如'本项目的特定符合性要求:'这种标题性质且不能体现具体评审标准的内容。 + 2. 若文本中存在相同或相似的表述,仅需取其中一个作为键值中的一条即可。 - user_query1 = prompt_template1.format(full_text=zige_info) - user_query2 = prompt_template2.format(full_text=fuhe_info) + 文本内容:{full_text} + """ def get_model_response(query): return doubao_model(query) - # 使用 ThreadPoolExecutor 并行执行两个模型调用 - with concurrent.futures.ThreadPoolExecutor() as executor: - # 提交任务到线程池 - future1 = executor.submit(get_model_response, user_query1) - future2 = executor.submit(get_model_response, user_query2) + result = {"资格审查": {}} - # 等待任务完成并获取结果 - model_res1 = future1.result() - model_res2 = future2.result() - # print(model_res1) - # print(model_res2) + if zigefuhe_info: + # 如果zigefuhe_info非空,使用prompt_template3 + user_query3 = prompt_template3.format(full_text=zigefuhe_info) + model_res3 = get_model_response(user_query3) + zigefuhe_deviation = clean_json_string(model_res3) + result["资格审查"] = zigefuhe_deviation + else: + # 使用原有逻辑处理分开的资格审查和符合性审查 + user_query1 = prompt_template1.format(full_text=zige_info) + user_query2 = prompt_template2.format(full_text=fuhe_info) - zige_deviation = clean_json_string(model_res1) - fuhe_deviation = clean_json_string(model_res2) - return zige_deviation,fuhe_deviation + # 使用 ThreadPoolExecutor 并行执行两个模型调用 + with concurrent.futures.ThreadPoolExecutor() as executor: + future1 = executor.submit(get_model_response, user_query1) + future2 = executor.submit(get_model_response, user_query2) + model_res1 = future1.result() + model_res2 = future2.result() + + zige_deviation = clean_json_string(model_res1) + fuhe_deviation = clean_json_string(model_res2) + result["资格审查"] = { + **zige_deviation, + **fuhe_deviation + } + return result # 定义一个辅助函数用于获取嵌套字典中的值 def get_nested(dic, keys, default=None): @@ -417,7 +494,7 @@ def outer_post_processing(combined_data, includes, good_list): tech_deviation = "" zige_info="" fuhe_info="" - + zigefuhe_info="" # 检查 '基础信息' 是否在 includes 中 if "基础信息" in includes: base_info = combined_data.get("基础信息", {}) @@ -437,23 +514,36 @@ def outer_post_processing(combined_data, includes, good_list): if "资格审查" in includes: zige_review = combined_data.get("资格审查", {}) # print("资格审查内容:", zige_review) - try: - # 正确访问 '申请人资格要求' 和 '资格性审查' - zige_info = json.dumps({ - "申请人资格要求": zige_review["申请人资格要求"], - "资格性审查": zige_review["资格性审查"] - }, ensure_ascii=False, indent=4) + # 检查是否存在"资格性和符合性审查" + if "资格性和符合性审查" in zige_review: + # 情况3:只有"申请人资格要求"和"资格性和符合性审查" + zigefuhe_info = json.dumps({ + "申请人资格要求": zige_review.get("申请人资格要求", "未提供"), + "资格性和符合性审查": zige_review.get("资格性和符合性审查", "未提供") + }, ensure_ascii=False, indent=4) + else: + # 情况1和2:存在分开的资格审查和符合性审查 + zige_info = json.dumps({ + "申请人资格要求": zige_review.get("申请人资格要求", "未提供"), + "资格性审查": zige_review.get("资格性审查", "未提供") + }, ensure_ascii=False, indent=4) + + # 处理符合性审查的两种可能的键名 + fuhe_key = "符合性审查" if "符合性审查" in zige_review else "符合性审查(以下情况不得出现)" + fuhe_info = json.dumps({ + fuhe_key: zige_review.get(fuhe_key, "未提供") + }, ensure_ascii=False, indent=4) - # 正确访问 '符合性审查' - fuhe_info = json.dumps({ - "符合性审查": zige_review["符合性审查"] - }, ensure_ascii=False, indent=4) except KeyError as e: print(f"缺少关键字: {e}") - tech_star_deviation=get_tech_star_deviation(json.dumps(tech_deviation,ensure_ascii=False,indent=4)) - business_deviation,business_star_deviation = extract_business_requirements(get_nested(combined_data, ['基础信息'], {})) - zige_deviation,fuhe_deviation=extract_zige_deviation_table(zige_info,fuhe_info) + tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation = process_functions_in_parallel( + tech_deviation=tech_deviation, + combined_data=combined_data, + zige_info=zige_info, + fuhe_info=fuhe_info, + zigefuhe_info=zigefuhe_info + ) # 遍历原始字典的每一个键值对 for key, value in combined_data.items(): @@ -472,8 +562,7 @@ def outer_post_processing(combined_data, includes, good_list): if not processed_data["其他"]: del processed_data["其他"] - return processed_data, extracted_info, tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zige_deviation,fuhe_deviation - + return processed_data, extracted_info, tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation if __name__ == "__main__": combined_data = { diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 375c250..8fbce60 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -403,7 +403,7 @@ def process_and_stream(file_url, zb_type): output_json_path = os.path.join(output_folder, 'final_result.json') extracted_info_path = os.path.join(output_folder, 'extracted_result.json') includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"] - final_result, extracted_info,tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zige_deviation,fuhe_deviation = outer_post_processing(combined_data, includes, good_list) + final_result, extracted_info,tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation = outer_post_processing(combined_data, includes, good_list) logger.info( f"技术偏离表: {json.dumps(tech_deviation, ensure_ascii=False, indent=4)}" @@ -422,14 +422,9 @@ def process_and_stream(file_url, zb_type): ) # 添加日志记录 logger.info( - f"资格检查偏离表: {json.dumps(zige_deviation, ensure_ascii=False, indent=4)}" + f"资格检查偏离表: {json.dumps(zigefuhe_deviation, ensure_ascii=False, indent=4)}" ) # 添加日志记录 - logger.info( - f"符合性检查偏离表: {json.dumps(fuhe_deviation, ensure_ascii=False, indent=4)}" - ) # 添加日志记录 - - # 采购需求 tech_deviation_response = { 'message': 'procurement_reqs', @@ -441,16 +436,12 @@ def process_and_stream(file_url, zb_type): 'filename': filename, 'data': json.dumps(tech_star_deviation, ensure_ascii=False) } - zige_deviation_response = { + zigefuhe_deviation_response = { 'message': 'zige_deviation', 'filename': filename, - 'data': json.dumps(zige_deviation, ensure_ascii=False) - } - fuhe_deviation_response = { - 'message': 'fuhe_deviation', - 'filename': filename, - 'data': json.dumps(fuhe_deviation, ensure_ascii=False) + 'data': json.dumps(zigefuhe_deviation, ensure_ascii=False) } + shangwu_deviation_response = { 'message': 'shangwu_deviation', 'filename': filename, @@ -463,8 +454,7 @@ def process_and_stream(file_url, zb_type): } yield f"data: {json.dumps(tech_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(tech_deviation_star_response, ensure_ascii=False)}\n\n" - yield f"data: {json.dumps(zige_deviation_response, ensure_ascii=False)}\n\n" - yield f"data: {json.dumps(fuhe_deviation_response, ensure_ascii=False)}\n\n" + yield f"data: {json.dumps(zigefuhe_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(shangwu_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(shangwu_star_deviation_response, ensure_ascii=False)}\n\n" @@ -668,16 +658,11 @@ def test_process_and_stream(): "▲具有存储安全保障功能,当存储压力过高或硬盘出现性能 不足时,可优先录像业务存储;"] } - zige_deviation_response = { - 'message': 'zige_deviation', + zigefuhe_deviation_response = { + 'message': 'zigefuhe_deviation', 'filename': filename, 'data': json.dumps(zige_deviation_table, ensure_ascii=False) } - fuhe_deviation_response = { - 'message': 'fuhe_deviation', - 'filename': filename, - 'data': json.dumps(fuhe_deviation_table, ensure_ascii=False) - } shangwu_deviation_response = { 'message': 'shangwu_deviation', 'filename': filename, @@ -694,8 +679,7 @@ def test_process_and_stream(): 'data': json.dumps(jishu_star_deviation_table, ensure_ascii=False) } - yield f"data:{json.dumps(zige_deviation_response, ensure_ascii=False)}\n\n" - yield f"data:{json.dumps(fuhe_deviation_response, ensure_ascii=False)}\n\n" + yield f"data:{json.dumps(zigefuhe_deviation_response, ensure_ascii=False)}\n\n" yield f"data:{json.dumps(shangwu_deviation_response, ensure_ascii=False)}\n\n" yield f"data:{json.dumps(shangwu_star_deviation_response, ensure_ascii=False)}\n\n" yield f"data:{json.dumps(jishu_star_deviation_response, ensure_ascii=False)}\n\n"