import json import re import string from collections import OrderedDict #传输技术参数需求的时候后处理 def extract_matching_keys(data_dict, good_list, special_keys=None): """ 递归遍历data_dict,查找good_list中存在的键(完全匹配或以good_list中的键开头,后跟“-数字”),并将匹配的键及其值添加到结果字典中。 对于重复的键名,添加后缀 -a, -b, -c 等以确保唯一性。 对于特殊键(如"系统功能"),在键名前添加其父键名和'的',例如:"交通信号的系统功能"。 参数: - data_dict (dict): 要遍历的嵌套字典。 - good_list (list): 包含要查找的键的列表。 - special_keys (list): 需要特殊处理的键的列表。 返回: - OrderedDict: 包含所有匹配键及其值的字典,保持原始顺序。 """ if special_keys is None: special_keys = [] result = OrderedDict() key_count = {} # 用于统计每个匹配键的出现次数(不包括特殊键) # 预编译正则模式以提高效率 patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in good_list] def matches(key): return any(pattern.match(key) for pattern in patterns) # 第一次遍历:统计普通键的出现次数 def first_pass(current_dict): if isinstance(current_dict, dict): for key, value in current_dict.items(): if matches(key) and key not in special_keys: key_count[key] = key_count.get(key, 0) + 1 # 递归遍历 if isinstance(value, dict): first_pass(value) elif isinstance(value, list): first_pass(value) elif isinstance(current_dict, list): for item in current_dict: first_pass(item) first_pass(data_dict) # 初始化用于跟踪每个重复键当前使用的后缀编号 suffix_map = {key: 0 for key, count in key_count.items() if count > 1} def get_suffix(count): """ 根据计数获取字母后缀,例如: 1 -> '-a' 2 -> '-b' ... 26 -> '-z' 27 -> '-aa' """ letters = string.ascii_lowercase suffix = '' while count > 0: count, remainder = divmod(count - 1, 26) suffix = letters[remainder] + suffix return '-' + suffix # 第二次遍历:添加后缀或前缀,并构建结果字典 def recurse(current_dict, parent_key=None): if isinstance(current_dict, dict): for key, value in current_dict.items(): if matches(key): if key in special_keys: # 对于特殊键,使用当前父键名加'的'前缀 new_key = f"{parent_key}的{key}" if parent_key else key result[new_key] = value else: if key_count.get(key, 0) > 1: # 对于重复键,添加后缀以确保唯一性 suffix_map[key] += 1 suffix = get_suffix(suffix_map[key]) new_key = f"{key}{suffix}" else: new_key = key result[new_key] = value # 递归遍历 # 如果当前键是字典,新的父键是当前键 # 否则,保持原来的父键 if isinstance(value, dict): recurse(value, parent_key=key) elif isinstance(value, list): recurse(value, parent_key=parent_key) elif isinstance(current_dict, list): for item in current_dict: recurse(item, parent_key=parent_key) recurse(data_dict) return result def postprocess(data): """递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}' 或 '未知'""" def convert_dict(value): # 如果所有值是'/', '{}' 或 '未知' if all(v in ['/', '未知', {}] for v in value.values()): return list(value.keys()) else: # 如果不满足条件,则递归处理嵌套的字典 return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()} # 递归处理顶层数据 return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()} def all_postprocess(data): temp=restructure_data(data) processed_data = {} for key, value_list in temp.items(): processed_data[key] = remove_common_prefixes(value_list) return processed_data def detect_depth(data): """ Detects the depth of the nested dictionary. """ if isinstance(data, dict): return 1 + max((detect_depth(v) for v in data.values()), default=0) elif isinstance(data, list): return 1 # Lists are considered the terminal layer else: return 0 # Base case for non-nested elements def restructure_data(data): """ Restructure data to normalize levels of nesting. If depth is 2 throughout, return as-is. If both 2 and 3 levels exist, restructure to align to 3-layer format. """ # Check if data contains mixed 2-layer and 3-layer structures has_two_layers = False has_three_layers = False for key, value in data.items(): if isinstance(value, dict): has_three_layers = True elif isinstance(value, list): has_two_layers = True else: raise ValueError(f"Unexpected data format for key '{key}': {type(value)}") # If only 2-layer, return as-is if has_two_layers and not has_three_layers: return data # If mixed or only 3-layer, normalize to 3-layer structured_data = {} for key, value in data.items(): if isinstance(value, dict): # Already a 3-layer structure, keep as is structured_data[key] = value elif isinstance(value, list): # Convert 2-layer structure to 3-layer structured_data[key] = {key: value} return structured_data # 定义获取所有以':'结尾的前缀的函数 def get_prefixes(s): prefixes = [] for i in range(len(s)): if s[i] == ':': prefixes.append(s[:i+1]) return prefixes # 定义删除公共前缀的函数 def remove_common_prefixes(string_list): # 构建前缀到字符串集合的映射 prefix_to_strings = {} for s in string_list: prefixes = get_prefixes(s) unique_prefixes = set(prefixes) for prefix in unique_prefixes: if prefix not in prefix_to_strings: prefix_to_strings[prefix] = set() prefix_to_strings[prefix].add(s) # 找出至少在两个字符串中出现的前缀 prefixes_occuring_in_multiple_strings = [prefix for prefix, strings in prefix_to_strings.items() if len(strings) >=2] # 对每个字符串,找到其匹配的最长前缀并删除 new_string_list = [] for s in string_list: applicable_prefixes = [prefix for prefix in prefixes_occuring_in_multiple_strings if s.startswith(prefix)] if applicable_prefixes: # 找到最长的前缀 longest_prefix = max(applicable_prefixes, key=len) # 删除前缀 new_s = s[len(longest_prefix):] new_string_list.append(new_s) else: new_string_list.append(s) return new_string_list if __name__ == "__main__": # 示例数据 sample_data = { "交通信号机": [ "★应采用区域控制信号机,并应与广水市交通信号控制系统兼容,信号机能接入已有系统平台,实现联网优化功能。", "1、控制功能:(1)区域协调控制:可对单个孤立交叉口、干道多个交叉口和关联性较强的交叉口群进行综合性地信号控制。", "1、控制功能:(2)线性协调控制:可对干道多个相邻交叉口进行协调控制。", "1、控制功能:(3)多时段控制:可根据交叉口的交通状况,将每天划分为多个不同的时段,每个时段配置不同的控制方案,能设置至少 10个时段、10种以上不同控制方案,能根据不同周日类型对方案进行调整。信号机能够根据内置时钟选择各个时段的控制方案,实现交叉口的合理控制。", "2、采集功能:(1)信号机支持接入线圈、地磁、视频、微波、超声波检测器、RFID等多种检测方式。", "2、采集功能:(2)信号机支持交通信息采集与统计,并支持交通流量共享。", "3、运维功能:(1)信号机能够自动检测地磁故障,若故障,能够自动上传故障信息至监控中心。" ], "高清视频抓拍像机": [ "1:摄像机:有效像素:≥900W像素", "1:摄像机:最低照度:彩色≤0.001lx", "1:摄像机:传感器类型:≥1英寸全局曝光 COMS/GMOS/GS COMS", "1:摄像机:电子快门:至少满足 1/25s至 1/100,000s,可调", "2:视频图像:视频压缩标准:至少支持 H264、H265等", "2:视频图像:视频分辨率:≥4096×2160,向下可设置", ], } # 处理数据 result = all_postprocess(sample_data) # 输出处理结果 print(json.dumps(result,ensure_ascii=False,indent=4))