zbparse/flask_app/货物标/技术参数要求提取后处理函数.py

227 lines
9.5 KiB
Python
Raw Normal View History

2024-11-18 14:25:03 +08:00
import json
2024-11-18 16:12:11 +08:00
import re
import string
from collections import OrderedDict
#传输技术参数需求的时候后处理
def extract_matching_keys(data_dict, good_list, special_keys=None):
"""
递归遍历data_dict查找good_list中存在的键完全匹配或以good_list中的键开头后跟-数字并将匹配的键及其值添加到结果字典中
对于重复的键名添加后缀 -a, -b, -c 等以确保唯一性
对于特殊键"系统功能"在键名前添加其父键名和''例如"交通信号的系统功能"
参数:
- data_dict (dict): 要遍历的嵌套字典
- good_list (list): 包含要查找的键的列表
- special_keys (list): 需要特殊处理的键的列表
返回:
- OrderedDict: 包含所有匹配键及其值的字典保持原始顺序
"""
if special_keys is None:
special_keys = []
result = OrderedDict()
key_count = {} # 用于统计每个匹配键的出现次数(不包括特殊键)
# 预编译正则模式以提高效率
patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in good_list]
def matches(key):
return any(pattern.match(key) for pattern in patterns)
# 第一次遍历:统计普通键的出现次数
def first_pass(current_dict):
if isinstance(current_dict, dict):
for key, value in current_dict.items():
if matches(key) and key not in special_keys:
key_count[key] = key_count.get(key, 0) + 1
# 递归遍历
if isinstance(value, dict):
first_pass(value)
elif isinstance(value, list):
first_pass(value)
elif isinstance(current_dict, list):
for item in current_dict:
first_pass(item)
first_pass(data_dict)
# 初始化用于跟踪每个重复键当前使用的后缀编号
suffix_map = {key: 0 for key, count in key_count.items() if count > 1}
def get_suffix(count):
"""
根据计数获取字母后缀例如
1 -> '-a'
2 -> '-b'
...
26 -> '-z'
27 -> '-aa'
"""
letters = string.ascii_lowercase
suffix = ''
while count > 0:
count, remainder = divmod(count - 1, 26)
suffix = letters[remainder] + suffix
return '-' + suffix
# 第二次遍历:添加后缀或前缀,并构建结果字典
def recurse(current_dict, parent_key=None):
if isinstance(current_dict, dict):
for key, value in current_dict.items():
if matches(key):
if key in special_keys:
# 对于特殊键,使用当前父键名加'的'前缀
new_key = f"{parent_key}{key}" if parent_key else key
result[new_key] = value
else:
if key_count.get(key, 0) > 1:
# 对于重复键,添加后缀以确保唯一性
suffix_map[key] += 1
suffix = get_suffix(suffix_map[key])
new_key = f"{key}{suffix}"
else:
new_key = key
result[new_key] = value
# 递归遍历
# 如果当前键是字典,新的父键是当前键
# 否则,保持原来的父键
if isinstance(value, dict):
recurse(value, parent_key=key)
elif isinstance(value, list):
recurse(value, parent_key=parent_key)
elif isinstance(current_dict, list):
for item in current_dict:
recurse(item, parent_key=parent_key)
recurse(data_dict)
return result
2024-11-18 14:25:03 +08:00
def postprocess(data):
"""递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'"""
def convert_dict(value):
# 如果所有值是'/', '{}' 或 '未知'
if all(v in ['/', '未知', {}] for v in value.values()):
return list(value.keys())
else:
# 如果不满足条件,则递归处理嵌套的字典
return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()}
# 递归处理顶层数据
return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()}
def all_postprocess(data):
temp=restructure_data(data)
processed_data = {}
for key, value_list in temp.items():
processed_data[key] = remove_common_prefixes(value_list)
return processed_data
def detect_depth(data):
"""
Detects the depth of the nested dictionary.
"""
if isinstance(data, dict):
return 1 + max((detect_depth(v) for v in data.values()), default=0)
elif isinstance(data, list):
return 1 # Lists are considered the terminal layer
else:
return 0 # Base case for non-nested elements
def restructure_data(data):
"""
Restructure data to normalize levels of nesting.
If depth is 2 throughout, return as-is.
If both 2 and 3 levels exist, restructure to align to 3-layer format.
"""
# Check if data contains mixed 2-layer and 3-layer structures
has_two_layers = False
has_three_layers = False
for key, value in data.items():
if isinstance(value, dict):
has_three_layers = True
elif isinstance(value, list):
has_two_layers = True
else:
raise ValueError(f"Unexpected data format for key '{key}': {type(value)}")
# If only 2-layer, return as-is
if has_two_layers and not has_three_layers:
return data
# If mixed or only 3-layer, normalize to 3-layer
structured_data = {}
for key, value in data.items():
if isinstance(value, dict):
# Already a 3-layer structure, keep as is
structured_data[key] = value
elif isinstance(value, list):
# Convert 2-layer structure to 3-layer
structured_data[key] = {key: value}
return structured_data
# 定义获取所有以''结尾的前缀的函数
def get_prefixes(s):
prefixes = []
for i in range(len(s)):
if s[i] == '':
prefixes.append(s[:i+1])
return prefixes
# 定义删除公共前缀的函数
def remove_common_prefixes(string_list):
# 构建前缀到字符串集合的映射
prefix_to_strings = {}
for s in string_list:
prefixes = get_prefixes(s)
unique_prefixes = set(prefixes)
for prefix in unique_prefixes:
if prefix not in prefix_to_strings:
prefix_to_strings[prefix] = set()
prefix_to_strings[prefix].add(s)
# 找出至少在两个字符串中出现的前缀
prefixes_occuring_in_multiple_strings = [prefix for prefix, strings in prefix_to_strings.items() if len(strings) >=2]
# 对每个字符串,找到其匹配的最长前缀并删除
new_string_list = []
for s in string_list:
applicable_prefixes = [prefix for prefix in prefixes_occuring_in_multiple_strings if s.startswith(prefix)]
if applicable_prefixes:
# 找到最长的前缀
longest_prefix = max(applicable_prefixes, key=len)
# 删除前缀
new_s = s[len(longest_prefix):]
new_string_list.append(new_s)
else:
new_string_list.append(s)
return new_string_list
if __name__ == "__main__":
# 示例数据
sample_data = {
"交通信号机": [
"★应采用区域控制信号机,并应与广水市交通信号控制系统兼容,信号机能接入已有系统平台,实现联网优化功能。",
"1、控制功能1区域协调控制可对单个孤立交叉口、干道多个交叉口和关联性较强的交叉口群进行综合性地信号控制。",
"1、控制功能2线性协调控制可对干道多个相邻交叉口进行协调控制。",
"1、控制功能3多时段控制可根据交叉口的交通状况将每天划分为多个不同的时段每个时段配置不同的控制方案能设置至少 10个时段、10种以上不同控制方案能根据不同周日类型对方案进行调整。信号机能够根据内置时钟选择各个时段的控制方案实现交叉口的合理控制。",
"2、采集功能1信号机支持接入线圈、地磁、视频、微波、超声波检测器、RFID等多种检测方式。",
"2、采集功能2信号机支持交通信息采集与统计,并支持交通流量共享。",
"3、运维功能1信号机能够自动检测地磁故障若故障能够自动上传故障信息至监控中心。"
],
"高清视频抓拍像机": [
"1摄像机有效像素≥900W像素",
"1摄像机最低照度彩色≤0.001lx",
"1摄像机传感器类型≥1英寸全局曝光 COMS/GMOS/GS COMS",
"1摄像机电子快门至少满足 1/25s至 1/100,000s可调",
"2视频图像视频压缩标准至少支持 H264、H265等",
"2视频图像视频分辨率≥4096×2160向下可设置",
],
}
# 处理数据
result = all_postprocess(sample_data)
# 输出处理结果
print(json.dumps(result,ensure_ascii=False,indent=4))