zbparse/flask_app/main/形式响应评审.py
2024-08-29 16:37:09 +08:00

171 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import json
import time
from 多线程提问 import multi_threading
from 根据条款号整合json import process_and_merge_entries
from 通义千问long import qianwen_long
from json_utils import extract_content_from_json
prompt = """
# 角色
你是一个文档处理专家,专门负责理解和操作基于特定内容的文档任务,这包括解析、总结、搜索或生成与给定文档相关的各类信息。
## 技能
### 技能 1文档解析与摘要
- 深入理解并分析${document1}的内容,提取关键信息。
- 根据需求生成简洁明了的摘要,保持原文核心意义不变。
### 技能 2信息检索与关联
- 在${document1}中高效检索特定信息或关键词。
- 能够识别并链接到文档内部或外部的相关内容,增强信息的连贯性和深度。
## 限制
- 所有操作均需基于${document1}的内容,不可超出此范围创造信息。
- 在处理敏感或机密信息时,需遵守严格的隐私和安全规定。
- 确保所有生成或改编的内容逻辑连贯,无误导性信息。
请注意,上述技能执行时将直接利用并参考${document1}的具体内容,以确保所有产出紧密相关且高质量。
"""
def extract_matching_keys(json_data):
# 函数首先检查输入 json_data 是否为字符串类型。如果是,它会使用 json.loads() 将字符串解析为字典。
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
# 正则表达式匹配
include_patterns = [re.compile(r"符合第"), re.compile(r"第.*?章"), re.compile(r"第.*?款"), re.compile(r"第.*?项"), re.compile(r"第.*?目")]
additional_include_patterns = [re.compile(r"规定"), re.compile(r"条目")]
exclude_patterns = ["投标文件格式", "权利义务", "技术标准","工程量清单"]
additional_exclude_patterns = ["按要求", "按规定"]
# Initialize a list to hold filtered key-value pairs
final_matching = []
# Recursive function to traverse and filter data
def recursive_search(current_data, path=[]):
if isinstance(current_data, dict):
for key, value in current_data.items():
new_path = path + [key] # Update path for nested keys
if isinstance(value, (dict, list)):
recursive_search(value, new_path)
else:
process_value(key, str(value), new_path)
elif isinstance(current_data, list):
for item in current_data:
recursive_search(item, path)
# Function to process each value against the patterns
def process_value(key, value, path):
# Check exclude patterns first
if any(ex in key or ex in value for ex in exclude_patterns):
return
# Main include patterns
if any(pattern.search(value) for pattern in include_patterns):
# Additional exclude patterns
if not any(ex in key or ex in value for ex in additional_exclude_patterns):
# Additional include patterns
if any(pattern.search(value) for pattern in additional_include_patterns):
final_matching.append({".".join(path): value}) # Use dot notation for nested keys
# Start the recursive search
recursive_search(data)
return final_matching
def reformat_questions(match_keys):
"""
根据是否包含特定序号格式如3.7.4或3.7.4(5)或3.7.45重新格式化匹配到的评审条目。
若包含序号,则提取出来;若不包含,则生成格式化的问题字符串。
"""
entries_with_numbers = []
formatted_questions = []
# 正则表达式,同时匹配全角和半角括号
pattern = re.compile(r'(\d+(?:\.\d+)+)(?:[\(\](\d+)[\)\])?')
for entry in match_keys:
key, value = next(iter(entry.items()))
match = pattern.search(value)
if match:
# 如果存在序号,保存序号与对应的键值对,包括括号内的数字(如果存在)
num = match.group(1) + (f"({match.group(2)})" if match.group(2) else "")
entries_with_numbers.append({key: num})
else:
# 如果不存在序号,删除“符合”并格式化文本
revised_standard = re.sub(r'符合', '', value)
formatted_entry = f"关于‘{key}{revised_standard}的内容是怎样的请按json格式给我提供信息键名为'{key}',如果存在未知信息,请在对应键值处填'未知'"
formatted_questions.append(formatted_entry)
return entries_with_numbers, formatted_questions
def update_json_data(original_data, updates, second_response_list):
"""
根据提供的更新字典覆盖原始JSON数据中对应的键值支持点分隔的键来表示嵌套结构。
参数:
- original_data: dict, 原始的JSON数据。
- updates: dict, 包含需要更新的键值对。
- second_response_list: list, 包含多个字典,每个字典包含需要更新的键值对。
返回:
- updated_data: dict, 更新后的JSON数据。
"""
def recursive_update(data, key, value):
# 处理点分隔的键,递归定位并更新嵌套字典
keys = key.split('.')
for k in keys[:-1]:
data = data.setdefault(k, {})
if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict):
data[keys[-1]] = {**data.get(keys[-1], {}), **value}
else:
data[keys[-1]] = value
# 合并 updates 到 original_data 中
for key, value in updates.items():
recursive_update(original_data, key, value)
# 遍历 second_response_list 中的每个字典,并合并到 original_data 中
for response_dict in second_response_list:
for key, value in response_dict.items():
recursive_update(original_data, key, value)
return original_data
def process_reviews(original_dict_data,knowledge_name, truncate0_jsonpath,clause_json_path):
matched_keys = extract_matching_keys(original_dict_data) #[{'形式评审标准.投标文件签字盖章': '符合第二章“投标人须知”第 3.7.34目 规定'}, {'形式评审标准.多标段投标': '符合第二章“投标人须知”第 10.1款规定'}]
entries_with_numbers, formatted_questions = reformat_questions(matched_keys)
results_2 = multi_threading(formatted_questions, knowledge_name, True) #无序号的直接问大模型
second_response_list = []
for _, response in results_2:
try:
if response and len(response) > 1: # 检查response存在且有至少两个元素
temp = extract_content_from_json(response[1])
second_response_list.append(temp)
else:
print(f"Warning: Missing or incomplete response data for query index {_}.")
except Exception as e:
print(f"Error processing response for query index {_}: {e}")
# Assume JSON file paths are defined or configured correctly
combined_results = process_and_merge_entries(entries_with_numbers, truncate0_jsonpath, clause_json_path) #脚本提取的要求
updated_json = update_json_data(original_dict_data, combined_results, second_response_list)
return updated_json
if __name__ == "__main__":
start_time=time.time()
knowledge_name="zbfile"
truncate_tobidders_table_json_path="C:\\Users\\Administrator\\Desktop\\招标文件\\output\\truncate_output.json"
clause_path="C:\\Users\\Administrator\\Desktop\\招标文件\\output\\clause.json"
original_dict_data={'营业执照': '具备有效的营业执照', '资质条件': '符合第二章“投标人须知”第 1.4.1项规定', '财务状况': '符合第二章“投标人须知”第 1.4.1项规定', '类似业绩': '符合第二章“投标人须知”第 1.4.1项规定', '信誉': '符合第二章“投标人须知”第 1.4.1项规定', '项目经理资格': '符合第二章“投标人须知”第 1.4.1项规定', '设计负责人资格': '符合第二章“投标人须知”第 1.4.1项规定', '施工负责人资格': '符合第二章“投标人须知”第 1.4.1项规定', '施工机械设备': '符合第二章“投标人须知”第 1.4.1项规定', '项目管理机构及人员': '符合第二章“投标人须知”第 1.4.1项规定', '其他要求': '符合第二章“投标人须知”第 1.4.1项规定', '联合体投投人 (如有)': '符合第二章“投标人须知”第 1.4.2项规定', '不存在禁止投标的情形': '不存在第二章“投标人须知”第 1.4.3项规 定的任何一种情形'}
formal_json = process_reviews(original_dict_data,knowledge_name, truncate_tobidders_table_json_path, clause_path)
data = json.loads(formal_json)
end_time=time.time()
elapsed_time = end_time - start_time
print(f"Function execution took {elapsed_time} seconds.")