zbparse/flask_app/main/资格评审new.py

121 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from concurrent.futures import ThreadPoolExecutor, as_completed
from flask_app.general.通义千问long import upload_file, qianwen_long
from flask_app.general.json_utils import clean_json_string
def combine_qualification_new(invalid_path, qualification_path,notice_path):
detailed_res = {}
# 初始化无效文件ID
invalid_file_id = None
if qualification_path:
# 上传资格文件并获取文件ID
qualification_file_id = upload_file(qualification_path)
# 定义第一个查询,用于检查资格性审查是否存在
first_query = """
该文档中是否有关于资格性审查标准的具体内容?请以json格式给出回答,外键为'资格性审查',键值仅限于'','',输出格式示例如下:
{
"资格性审查":""
}
"""
# 执行第一个查询并清洗返回的JSON字符串
print("call first_query")
first_res = clean_json_string(qianwen_long(qualification_file_id, first_query))
# 判断是否存在资格性审查
zige_file_id = qualification_file_id if first_res.get("资格性审查") == "" else None
# 如果需要,上传无效文件
if zige_file_id is None:
if invalid_file_id is None:
invalid_file_id = upload_file(invalid_path)
zige_file_id = invalid_file_id
else:
# 如果 qualification_path 为空,直接使用无效文件
zige_file_id = upload_file(invalid_path)
# 定义第二组查询,仅包含资格性审查
second_query = [
{
"key": "资格性审查",
"query": "该招标文件中规定的资格性审查标准是怎样的请以json格式给出外层为'资格性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关符合性审查的内容。"
}
]
# 定义任务函数
def process_second_query(key, query, file_id):
print("call second_query")
try:
res = qianwen_long(file_id, query)
cleaned_res = clean_json_string(res)
return key, cleaned_res.get(key, "未找到相关内容")
except Exception as e:
print(f"执行查询 '{key}' 时出错: {e}")
return key, "查询失败"
def process_notice(notice_path):
print("call notice_path")
try:
# 上传通知文件并获取文件ID
file_id1 = upload_file(notice_path)
# 定义用户查询,提取申请人资格要求
user_query1 = """
第一章招标公告投标邀请书中说明的申请人资格要求是怎样的请以json格式给出回答外键为'申请人资格要求',键值为字符串列表,其中每个字符串对应原文中的一条要求,你的回答与原文内容一致,不要擅自总结删减。输出格式示例如下:
{
"申请人资格要求":[
"1.满足《中华人民共和国政府采购法》第二十二条规定;",
"1.1 法人或者其他组织的营业执照等证明文件,如供应商是自然人的提供身份证明材料;",
"2.未被列入“信用中国”网站(www.creditchina.gov.cn)信用服务栏失信被执行人、重大税收违法案件当事人名单;"
]
}
"""
# 执行查询并清洗结果
res1 = clean_json_string(qianwen_long(file_id1, user_query1))
# 提取申请人资格要求
requirements = res1.get("申请人资格要求", "未找到相关内容")
return "申请人资格要求", requirements
except Exception as e:
print(f"处理申请人资格要求时出错: {e}")
return "申请人资格要求", "处理失败"
# 初始化 ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=2) as executor:
future_to_key = {}
# 提交第二组查询
for query_info in second_query:
key = query_info["key"]
query = query_info["query"]
current_file_id = zige_file_id
future = executor.submit(process_second_query, key, query, current_file_id)
future_to_key[future] = key
# 有条件地提交通知处理
if notice_path:
future = executor.submit(process_notice, notice_path)
future_to_key[future] = "申请人资格要求"
else:
future = executor.submit(process_notice, invalid_path)
future_to_key[future] = "申请人资格要求"
# 收集结果(按完成顺序)
for future in as_completed(future_to_key):
key, result = future.result()
detailed_res[key] = result
# 定义所需的顺序
desired_order = ["申请人资格要求", "资格性审查"]
# print(json.dumps(detailed_res,ensure_ascii=False,indent=4))
# 创建一个新的有序字典
ordered_res = {}
for key in desired_order:
if key in detailed_res:
ordered_res[key] = detailed_res[key]
# 最终处理结果,例如打印或保存
return {"资格审查": ordered_res}