zbparse/flask_app/test_case/test_generate_key_paths.py
2024-12-03 17:37:02 +08:00

352 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
from collections import defaultdict
from copy import deepcopy
def generate_key_paths(data):
"""
处理输入的字典,生成 key_paths, grouped_paths 和 good_list并根据条件修改原始字典。
参数:
data (dict): 输入的嵌套字典。
返回:
tuple: 包含 key_paths, grouped_paths 和 good_list 的元组。
"""
# 编译用于匹配后缀的正则表达式模式
pattern = re.compile(r'(.+)-\d+$')
# 初始化结果列表和字典
key_paths = []
grouped_counts = defaultdict(int) # 用于记录每个 grouped_path 的数量
good_list = []
def recurse(current_dict, path):
"""
递归遍历字典,处理 key_paths 和 grouped_paths并收集 good_list 和 grouped_counts。
参数:
current_dict (dict): 当前遍历的字典。
path (list): 当前路径的键列表。
"""
# 第一遍遍历,统计每个基名的出现次数
base_name_count = {}
base_names = {}
for key in current_dict.keys():
match = pattern.match(key)
if match:
base = match.group(1)
else:
base = key
base_names[key] = base
base_name_count[base] = base_name_count.get(base, 0) + 1
# 第二遍遍历,根据基名的出现次数分类
keys_to_rename = {}
for key, base in base_names.items():
if base_name_count[base] == 1:
# 检查是否是最内层(值为列表)
value = current_dict[key]
if isinstance(value, list):
current_path = '.'.join(path + [base])
key_paths.append(current_path)
# 收集 good_list保持顺序且不重复
if base not in good_list:
good_list.append(base)
# 如果原键名有后缀,需要记录以便后续重命名
if key != base:
keys_to_rename[key] = base
elif isinstance(value, dict):
# 继续递归处理
recurse(value, path + [base])
else:
# 记录分组路径,并统计数量
grouped_path = '.'.join(path + [base])
grouped_counts[grouped_path] += 1
# 执行键名的重命名,同时保持原有顺序
if keys_to_rename:
new_ordered_dict = {}
for key in current_dict.keys():
if key in keys_to_rename:
new_key = keys_to_rename[key]
new_ordered_dict[new_key] = current_dict[key]
else:
new_ordered_dict[key] = current_dict[key]
current_dict.clear()
current_dict.update(new_ordered_dict)
# 对于基名重复的键,继续递归(如果值是字典)
for key, base in base_names.items():
if base_name_count[base] > 1:
value = current_dict[key]
if isinstance(value, dict):
recurse(value, path + [base])
elif isinstance(value, list):
# 如果值是列表,仍需收集基名到 good_list
if base not in good_list:
good_list.append(base)
# 深拷贝数据以避免修改原始输入
data_copy = deepcopy(data)
# 开始递归遍历
recurse(data_copy, [])
def collect_grouped_paths(current_dict, path, collected):
for key in current_dict.keys():
match = pattern.match(key)
if match:
base = match.group(1)
else:
base = key
current_path = '.'.join(path + [base])
if current_path in grouped_counts and current_path not in collected:
collected.append(current_path)
value = current_dict[key]
if isinstance(value, dict):
collect_grouped_paths(value, path + [base], collected)
collected_grouped_paths = []
collect_grouped_paths(data_copy, [], collected_grouped_paths)
# 将 grouped_paths 转换为包含数量的字典列表
grouped_paths = [{path: grouped_counts[path]} for path in collected_grouped_paths]
return key_paths, grouped_paths, good_list, data_copy
def rename_keys(data):
"""
对整个数据结构进行重命名处理。
"""
def rename_keys_recursive(current_dict):
"""
递归地重名字典中的键,确保同一层级下具有相同基名的键被正确编号。
"""
if not isinstance(current_dict, dict):
return current_dict
key_order = list(current_dict.keys())
base_name_dict = defaultdict(list)
# 辅助函数:提取基名(去除可能的 -数字 后缀)
def get_base_name(key):
if '-' in key:
parts = key.rsplit('-', 1)
if parts[1].isdigit():
return parts[0]
return key
# 将键按基名分组
for key in key_order:
base = get_base_name(key)
base_name_dict[base].append(key)
new_dict = {}
for key in key_order:
base = get_base_name(key)
keys = base_name_dict[base]
if len(keys) > 1:
# 如果存在同基名的多个键,则进行重命名
if base not in new_dict:
# 按原始顺序对需要重命名的键进行排序
sorted_keys = sorted(keys, key=lambda x: key_order.index(x))
for idx, original_key in enumerate(sorted_keys, start=1):
new_key = f"{base}-{idx}"
# 如果值是字典,递归处理
if isinstance(current_dict[original_key], dict):
new_dict[new_key] = rename_keys_recursive(current_dict[original_key])
else:
new_dict[new_key] = current_dict[original_key]
else:
# 如果没有重复的基名,保持原名
if isinstance(current_dict[key], dict):
new_dict[key] = rename_keys_recursive(current_dict[key])
else:
new_dict[key] = current_dict[key]
return new_dict
# 对整个数据结构进行递归重命名
return rename_keys_recursive(data)
def test_generate_key_paths():
data0={
"显示系统": {
"LED全彩显示屏": [],
"控制盒及电源": [],
"大屏播控系统": [],
"配电柜(含PLC)": [],
"钢结构底座及铝型材支架": [],
"电缆及信号线缆": [],
"控制终端": [],
"50寸液晶电视机": [],
"50寸电视机地面推车": [],
"高清监视器": []
},
"摄像系统": {
"高清摄像机": [],
"摄像机三脚架": [],
"摄像机壁装架": [],
"摄像机控制键盘": []
},
"视频处理系统": {
"高清视频拼控矩阵(16*16)": [],
"分量信号接口器": [],
"高清四画面分割器": []
},
"发言系统": {
"数字会议发言主机": [],
"方形短杆代表话筒": [],
"专用连接线缆": [],
"手持无线话筒": []
},
"视频会议系统": {
"多点控制器": [],
"多串口控制服务器": [],
"综合会议管理调度平台": [],
"高清会议终端(主会场)": [],
"高清会议终端(分会场)": [],
"65寸电视机移动推车(9楼)": [],
"65寸液晶电视机(分会场)": [],
"控制平板及软件": [],
"鹅颈话筒": []
},
"辅助系统": {
"时序电源": [],
"多媒体地插盒": [],
"线材辅料": [],
"墙体拆除及修复": []
}
}
# 定义测试数据
data = {
"fruits": [],
"fruits-1": [],
"vegetables": {
"root": [],
"root-1": [],
"leafy": []
},
"grains": {
"交换机":[],
"whole grains": [], # 键名包含空格
"whole-grains-1": [] # 同一基名,带后缀
},
"misc": {
"system functions": [], # 特殊键,包含空格
"system functions-1": [], # 特殊键,带后缀
"fruits-1": [],
"fruits-2": [],
"fruits-3": []
},
"beverages": []
}
# 运行函数
key_paths, grouped_paths, good_list, data_copy = generate_key_paths(data0)
# 打印结果
print("实际 data_copy:")
print(json.dumps(data_copy,ensure_ascii=False,indent=4))
modi=rename_keys(data_copy)
print(json.dumps(modi, ensure_ascii=False, indent=4))
print(key_paths)
print(grouped_paths)
print(good_list)
user_query_template = """请根据货物标中采购要求部分的内容,告诉我\"{}\"的技术参数或采购要求是什么。请以 JSON 格式返回结果,键名为\"{}\",键值为一个列表,列表中包含若干描述\"{}\"的技术参数或采购要求的字符串,请按原文内容回答,保留三角▲、五角★和序号,不可擅自增删内容,尤其是不可擅自添加序号。
要求与指南:
1. 如果该货物没有相关采购要求或技术参数要求,键值应为空列表[]。
2. 如果存在嵌套结构且原文为Markdown 的表格语法,如'摄像机|有效像素|≥900W像素' 请不要返回该Markdown语法而是使用冒号':'将相关信息拼接在一起,生成一条完整且清晰的技术参数(或采购要求)描述,作为列表中的一个字符串。如"摄像机有效像素≥900W像素"
3. 字符串中的内容为具体的技术参数要求或采购要求,请不要返回诸如'1高清录像功能'这种标题性质且不能体现要求的内容。
4. 你的键值应该全面,对于同一个单元格内的数据,尽量全面,不要遗漏,对于单元格内以序号分隔的各条参数要求,请分别添加进键值(即字符串列表)中。
### 示例输出1如下
{{
"摄像机控制键盘": [
"1、▲支持串行 RS232/RS422 和 IP 混合控制,允许在一个控制器上使用 RS232/RS422/IP 控制单个系统中的摄像机;",
"2、支持 2 组 RS422 串口 VISCA 协议菊花链控制 2x7 台摄像机。",
"★能够自动对焦,提供检测报告"
]
}}
### 示例输出2如下包含嵌套结构
{{
"摄像机": [
"摄像机有效像素≥900W像素",
"摄像机最低照度彩色≤0.001lx",
"协议routes 接口开放:具备;▲支持标准 ONVIF 协议与第三方厂家设备进行互联;支持 GB/T28181应提供 SDK"
]
}}
"""
user_query_template_two="""请根据货物标中采购要求部分的内容,告诉我\"{}\"的技术参数或采购要求是什么。由于该货物存在 {} 种不同的采购要求或技术参数,请逐一列出,并以 JSON 格式返回结果。请以'货物名-编号'区分多种型号,编号为从 1 开始的自然数,依次递增,即第一个键名为\"{}-1\", 键值为一个列表,列表中包含若干描述\"{}\"的技术参数(或采购要求)的字符串,请按原文内容回答,保留三角▲、五角★和序号(若有),不可擅自增添内容。
请注意以下特殊情况:
要求与指南:
1. 如果该货物没有相关采购要求或技术参数要求,键值应为空列表。
2. 如果存在嵌套结构且原文为Markdown 的表格语法,如'摄像机|有效像素|≥900W像素' 请不要返回该Markdown语法而是使用冒号':'将相关信息拼接在一起,生成一条完整且清晰的技术参数(或采购要求)描述,作为列表中的一个字符串。如"摄像机有效像素≥900W像素"
3. 字符串中的内容为具体的技术参数要求或采购要求,请不要返回诸如'1高清录像功能'这种标题性质且不能体现要求的内容。
4. 你的键值应该全面,对于同一个单元格内的数据,尽量全面,不要遗漏,对于单元格内以序号分隔的各条参数要求,请分别添加进键值(即字符串列表)中。
### 示例输出1如下
{{
"交换机-1": [
"★1、支持固化千兆电口≥8 个固化千兆光口≥2 个,桌面型设备;",
"2、支持静态链路聚合"
],
"交换机-2": [
"1、交换容量≥52Gbps包转发率≥38.69Mpps",
"2、提供国家强制性产品认证证书及测试报告3C",
"★能实现信号控制独立传输"
]
}}
### 示例输出2如下包含嵌套结构
{{
"摄像机-1": [
"摄像机有效像素≥900W像素",
"摄像机最低照度彩色≤0.001lx",
"协议routes 接口开放:具备;▲支持标准 ONVIF 协议与第三方厂家设备进行互联;支持 GB/T28181应提供 SDK"
],
"摄像机-2": [
"支持夜视", "支持云存储"
]
}}
"""
def test_generate_ques_two(grouped_paths):
queries=[]
for grouped_dict in grouped_paths:
for grouped_key, grouped_key_cnt in grouped_dict.items():
# 将键中的 '.' 替换为 '下的'
modified_grouped_key = grouped_key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
# 如果需要使用 full_text可以取消注释并提供相应的实现
# full_text = read_txt_to_string(processed_filepath)
# new_query = user_query_template_two.format(modified_grouped_key, grouped_key, modified_grouped_key, full_text)
# 根据您的需求,生成新的查询字符串
new_query = user_query_template_two.format(modified_grouped_key, grouped_key_cnt,grouped_key, modified_grouped_key)
queries.append(new_query)
print(new_query)
def test_generate_ques_one(key_paths):
queries=[]
for key in key_paths:
# 将键中的 '.' 替换为 '下的'
modified_key = key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
# full_text = read_txt_to_string(processed_filepath)
# new_query = user_query_template.format(modified_key, key, modified_key,full_text) #转豆包后取消注释
new_query = user_query_template.format(modified_key, key, modified_key)
queries.append(new_query)
print(new_query)
if __name__ == "__main__":
# test_generate_key_paths()
# grouped_paths=[{'fruits': 2}, {'vegetables.root': 2}, {'misc.system functions': 2}, {'misc.fruits': 3}]
# key_paths=['显示系统.LED全彩显示屏', '显示系统.控制盒及电源', '显示系统.大屏播控系统', '显示系统.配电柜(含PLC)', '显示系统.钢结构底座及铝型材支架', '显示系统.电缆及信号线缆', '显示系统.控制终端', '显示系统.50寸液晶电视机', '显示系统.50寸电视机地面推车', '显示系统.高清监视器', '摄像系统.高清摄像机', '摄像系统.摄像机三脚架', '摄像系统.摄像机壁装架', '摄像系统.摄像机控制键盘', '视频处理系统.高清视频拼控矩阵(16*16)', '视频处理系统.分量信号接口器', '视频处理系统.高清四画面分割器', '发言系统.数字会议发言主机', '发言系统.方形短杆代表话筒', '发言系统.专用连接线缆', '发言系统.手持无线话筒', '视频会议系统.多点控制器', '视频会议系统.多串口控制服务器', '视频会议系统.综合会议管理调度平台', '视频会议系统.高清会议终端(主会场)', '视频会议系统.高清会议终端(分会场)', '视频会议系统.65寸电视机移动推车(9楼)', '视频会议系统.65寸液晶电视机(分会场)', '视频会议系统.控制平板及软件', '视频会议系统.鹅颈话筒', '辅助系统.时序电源', '辅助系统.多媒体地插盒', '辅助系统.线材辅料', '辅助系统.墙体拆除及修复']
key_paths=['显示系统.LED全彩显示屏', '显示系统.控制盒及电源', '显示系统.大屏播控系统']
# test_generate_ques(grouped_paths)
test_generate_ques_one(key_paths)