zbparse/flask_app/general/通用功能函数.py
2025-01-03 17:36:23 +08:00

220 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import ast
import logging
import re
from flask_app.general.json_utils import clean_json_string
from flask_app.general.多线程提问 import multi_threading
from flask_app.general.通义千问long import upload_file, qianwen_long
from flask_app.工程标.判断是否分包等 import read_questions_from_judge
def get_deviation_requirements(invalid_path):
file_id=upload_file(invalid_path)
user_query="""该招标文件对响应文件投标文件偏离项的要求或内容是怎样的请不要回答具体的技术参数也不要回答具体的评分要求。请以json格式给我提供信息外层键名为'偏离',若存在嵌套信息,嵌套内容键名为文件中对应字段或是你的总结,而嵌套键值必须与原文保持一致,若文中未涉及相关内容,在键值中填'未知'
注意:不使用任何预设的示例作为回答,示例仅作为格式参考。
禁止内容:
确保所有输出内容均基于提供的实际招标文件内容,不使用任何预设的示例作为回答。
示例1嵌套键值对情况
{
"偏离":{
"技术要求":"以★标示的内容不允许负偏离",
"商务要求":"以★标示的内容不允许负偏离"
}
}
示例2无嵌套键值对情况
{
"偏离":"所有参数需在技术响应偏离表内响应,如应答有缺项,且无有效证明材料的,评标委员会有权不予认可,视同负偏离处理"
}
"""
model_res=qianwen_long(file_id,user_query)
return clean_json_string(model_res)
def process_judge_questions(judge_file_path, chosen_numbers, invalid_path, baseinfo_list1):
file_id=upload_file(invalid_path)
judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers)
judge_consortium = judge_consortium_bidding(baseinfo_list1)
if judge_consortium:
judge_consortium_question = (
"该招标文件对于联合体投标的要求是怎样的请按json格式给我提供信息"
"外层键名为'联合体投标要求',其中有一个嵌套键值对为:\"是否接受联合体投标\":\"\""
)
judge_questions.append(judge_consortium_question)
if not judge_questions:
print("process_judge_questions:没有需要处理的 judge_questions跳过 multi_threading 调用。")
return # 或者根据需要返回其他值
# file_id3 = upload_file(merged_baseinfo_path)
res2 = multi_threading(judge_questions, "", file_id, 2)
for question, response in res2:
baseinfo_list1.append(clean_json_string(response))
def aggregate_basic_info(baseinfo_list,mode="engineering"):
"""
将基础信息列表中的数据进行合并和分类。
参数:
- baseinfo_list (list): 包含多个基础信息的列表。
返回:
- dict: 合并和分类后的基础信息字典。
"""
key_groups = {
"招标人/代理信息": ["招标人", "招标人联系方式", "招标代理机构", "招标代理机构联系方式","项目联系方式"],
"项目信息": ["项目名称", "项目编号", "项目概况", "项目基本情况", "招标控制价", "投标竞争下浮率"],
"关键时间/内容": [
"投标文件递交截止日期",
"开标时间",
"开标地点",
"澄清招标文件的截止时间",
"投标有效期",
"信息公示媒介"
],
"保证金相关": [],
"其他信息": [
"重新招标、不再招标和终止招标",
"投标费用承担",
"是否退还投标文件",
]
}
# 如果模式是 'goods',则添加 "采购要求" 组
if mode == 'goods':
key_groups["采购要求"] = [
"采购需求",
"技术要求",
"服务要求",
"商务要求",
"其他要求"
]
combined_data = {}
relevant_keys_detected = set()
# 合并所有基础信息并收集相关键
for baseinfo in baseinfo_list:
combined_data.update(baseinfo)
relevant_keys_detected.update(baseinfo.keys())
# 动态调整键组
dynamic_key_handling(key_groups, relevant_keys_detected)
# 创建一个副本以存储未分类的项目
unclassified_items = {k: v for k, v in combined_data.items() if
k not in [item for sublist in key_groups.values() for item in sublist]}
# 按键组分类并嵌套
for group_name, keys in key_groups.items():
group_data = {key: combined_data.get(key, "未提供") for key in keys}
# 仅对 "项目联系方式" 进行特殊处理
if group_name == "招标人/代理信息" and "项目联系方式" in group_data:
project_contact = group_data["项目联系方式"]
if isinstance(project_contact, dict):
if all(inner_value == "未知" for inner_value in project_contact.values()):
# 移除 "项目联系方式"
del group_data["项目联系方式"]
combined_data[group_name] = group_data
# 从 unclassified_items 中移除已分类的键
for key in keys:
unclassified_items.pop(key, None)
# 将剩余未分类的键值对添加到 "其他信息" 组
combined_data["其他信息"].update(unclassified_items)
# 移除顶层的未分类键值对
for key in list(combined_data.keys()):
if key not in key_groups:
del combined_data[key]
return combined_data
def dynamic_key_handling(key_groups, detected_keys):
# 检查和调整键组配置
for key in detected_keys:
if "技术、服务要求" in detected_keys:
# 如果检测到“技术、服务要求”,则移除“技术要求”和“服务要求”
if "技术要求" in key_groups["采购要求"]:
key_groups["采购要求"].remove("技术要求")
if "服务要求" in key_groups["采购要求"]:
key_groups["采购要求"].remove("服务要求")
# 确保"技术、服务要求"存在于"采购要求"组中
if "技术、服务要求" not in key_groups["采购要求"]:
key_groups["采购要求"].insert(1, "技术、服务要求")
# 处理“保证金相关”组
elif "保证金" in key:
if "保证金相关" not in key_groups:
key_groups["保证金相关"] = []
# 直接追加到 "保证金相关" 组的末尾
if key not in key_groups["保证金相关"]:
key_groups["保证金相关"].append(key)
elif "联合体" in key:
key_groups["项目信息"].append(key)
elif "分包" in key:
key_groups["项目信息"].append(key)
elif "踏勘现场" in key:
key_groups["其他信息"].append(key)
elif "投标预备会" in key:
key_groups["其他信息"].append(key)
elif "偏离" in key:
key_groups["其他信息"].append(key)
elif "递交方式" in key or "递交地点" in key:
group = key_groups["关键时间/内容"]
insert_after = "投标文件递交截止日期"
if insert_after in group:
index = group.index(insert_after)
# 确保新键不重复
if key not in group:
group.insert(index + 1, key)
else:
# 如果“投标文件递交截止日期”不存在,则追加到末尾
if key not in group:
group.append(key)
def judge_consortium_bidding(baseinfo_list):
updated_list = []
accept_bidding = False
for baseinfo in baseinfo_list:
# 检查 "是否接受联合体投标" 键是否存在且其值为 "是"
if "是否接受联合体投标" in baseinfo and baseinfo["是否接受联合体投标"] == "":
accept_bidding = True
# 从字典中移除特定键值对
baseinfo.pop("是否接受联合体投标", None)
# # 将修改后的 json 数据转换回 JSON 字符串(如果需要)
# updated_info = json.dumps(json_data)
updated_list.append(baseinfo)
# 更新原始列表,如果你想保留修改
baseinfo_list[:] = updated_list
return accept_bidding
#字符串列表转为普通列表从qianwen回答中提取
def process_string_list(string_list):
# 使用正则表达式匹配方括号内的内容
match = re.search(r'\[(.*?)\]', string_list)
if match:
# 获取匹配的内容,即方括号内的部分
content_inside_brackets = match.group(1)
if content_inside_brackets: # 检查内容是否为空
# 检查内容是否是数字列表
if all(item.strip().isdigit() for item in content_inside_brackets.split(',')):
# 如果是数字,不用加引号,直接保留数字
formatted_list = '[' + ', '.join(item.strip() for item in content_inside_brackets.split(',') if item.strip()) + ']'
else:
# 如果不全是数字,按字符串处理
formatted_list = '[' + ', '.join(f"'{item.strip()}'" for item in content_inside_brackets.split(',') if item.strip()) + ']'
else:
return [] # 直接返回空列表如果内容为空
# 使用 ast.literal_eval 来解析格式化后的字符串
try:
actual_list = ast.literal_eval(formatted_list)
return actual_list
except SyntaxError as e:
print(f"禁止投标情形: Error parsing list: {e}")
return []
else:
# 如果没有匹配到内容,返回空列表
return []
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger