zbparse/flask_app/货物标/技术参数要求提取后处理函数.py

232 lines
10 KiB
Python
Raw Normal View History

2024-11-18 14:25:03 +08:00
import json
2024-11-18 16:12:11 +08:00
import re
import string
from collections import OrderedDict
from collections import defaultdict
2024-11-18 16:12:11 +08:00
#传输技术参数需求的时候后处理
def extract_matching_keys(data, good_list, special_keys=None, parent_key=''):
2024-11-23 15:38:52 +08:00
import re
from collections import defaultdict
def get_suffix(n):
2024-11-22 16:06:57 +08:00
"""
根据数字n返回对应的字母后缀
1 -> 'a', 2 -> 'b', ..., 26 -> 'z', 27 -> 'aa', 28 -> 'ab', ...
"""
suffix = ''
while n > 0:
n, r = divmod(n - 1, 26)
suffix = chr(97 + r) + suffix
return suffix
2024-11-22 16:06:57 +08:00
def count_matching_keys(data, patterns, special_keys, counter=None):
2024-11-22 16:06:57 +08:00
"""递归统计匹配键的出现次数,仅统计值为列表的键"""
if counter is None:
counter = defaultdict(int)
if isinstance(data, dict):
for key, value in data.items():
2024-11-23 15:38:52 +08:00
clean_key = key.replace(" ", "") # 去除键中的空格
2024-11-22 16:06:57 +08:00
if isinstance(value, list):
2024-11-23 15:38:52 +08:00
if clean_key not in special_keys and any(pattern.match(clean_key) for pattern in patterns):
counter[clean_key] += 1
2024-11-22 16:06:57 +08:00
elif isinstance(value, dict):
count_matching_keys(value, patterns, special_keys, counter)
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
count_matching_keys(item, patterns, special_keys, counter)
return counter
def process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key):
"""递归处理数据并构建结果"""
def get_suffix_label(key):
suffix_map[key] += 1
return get_suffix(suffix_map[key])
if isinstance(data, dict):
for key, value in data.items():
2024-11-23 15:38:52 +08:00
clean_key = key.replace(" ", "") # 去除键中的空格
if isinstance(value, list):
2024-11-22 16:06:57 +08:00
# 处理值为列表的键
2024-11-23 15:38:52 +08:00
if any(pattern.match(clean_key) for pattern in patterns):
new_key = generate_key(clean_key, parent_key, key_counter, suffix_map, special_keys)
filtered_data[new_key] = value
elif isinstance(value, dict):
# 继续递归处理嵌套字典
2024-11-23 15:38:52 +08:00
new_parent_key = clean_key if parent_key == '' else f"{parent_key}{clean_key}"
process_data(value, patterns, special_keys, key_counter, suffix_map,
2024-11-23 15:38:52 +08:00
filtered_data, new_parent_key)
2024-11-22 16:06:57 +08:00
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
process_data(item, patterns, special_keys, key_counter, suffix_map,
2024-11-23 15:38:52 +08:00
filtered_data, parent_key)
def generate_key(key, parent_key, key_counter, suffix_map, special_keys):
"""生成新的键名"""
if key in special_keys and parent_key:
return f"{parent_key}{key}"
elif key_counter[key] > 1:
suffix = get_suffix(suffix_map[key] + 1)
suffix_map[key] += 1
return f"{key}-{suffix}"
return key
2024-11-18 16:12:11 +08:00
if special_keys is None:
2024-11-23 15:38:52 +08:00
special_keys = ["系统功能"] # 默认值为 ["系统功能"]
# 去除 good_list 中的空格
clean_good_list = [g.replace(" ", "") for g in good_list]
2024-11-18 16:12:11 +08:00
2024-11-23 15:38:52 +08:00
# 构建匹配的正则表达式
patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in clean_good_list]
2024-11-18 16:12:11 +08:00
2024-11-22 16:06:57 +08:00
# 先统计所有匹配键的出现次数,仅统计值为列表的键
key_counter = count_matching_keys(data, patterns, special_keys)
2024-11-18 16:12:11 +08:00
# 初始化后缀映射
suffix_map = {key: 0 for key, count in key_counter.items() if count > 1}
2024-11-18 16:12:11 +08:00
# 用于存储最终结果
filtered_data = {}
# 递归处理数据
process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key)
return filtered_data
2024-11-18 16:12:11 +08:00
2024-11-23 15:38:52 +08:00
2024-11-18 14:25:03 +08:00
def postprocess(data):
"""递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'"""
def convert_dict(value):
# 如果所有值是'/', '{}' 或 '未知'
if all(v in ['/', '未知', {}] for v in value.values()):
return list(value.keys())
else:
# 如果不满足条件,则递归处理嵌套的字典
return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()}
# 递归处理顶层数据
return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()}
2024-11-20 15:44:05 +08:00
2024-11-18 14:25:03 +08:00
def all_postprocess(data):
2024-11-20 15:44:05 +08:00
temp = restructure_data(data)
def recursive_process(item):
if isinstance(item, dict):
return {k: recursive_process(v) for k, v in item.items()}
elif isinstance(item, list):
return remove_common_prefixes(item)
else:
return item
processed_data = recursive_process(temp)
2024-11-18 14:25:03 +08:00
return processed_data
def restructure_data(data):
"""
2024-11-22 17:41:06 +08:00
重构数据以标准化嵌套层级至三层
- 如果所有顶层键的值都是列表两层结构直接返回原数据
- 如果存在混合的两层和三层结构或更深层级则将所有数据统一为三层结构
2024-11-18 14:25:03 +08:00
"""
2024-11-22 17:41:06 +08:00
# 检查是否所有顶层键的值都是列表(即两层结构)
all_two_layers = all(isinstance(value, list) for value in data.values())
2024-11-18 14:25:03 +08:00
2024-11-22 17:41:06 +08:00
if all_two_layers:
# 所有数据都是两层结构,直接返回
2024-11-18 14:25:03 +08:00
return data
2024-11-22 17:41:06 +08:00
# 否则,存在混合或更深层级,需要重构为三层结构
2024-11-18 14:25:03 +08:00
structured_data = {}
for key, value in data.items():
if isinstance(value, dict):
2024-11-22 17:41:06 +08:00
# 检查是否有子值是字典(即原始深度 >=4
has_deeper = any(isinstance(sub_value, dict) for sub_value in value.values())
if has_deeper:
# 如果存在更深层级,展开至三层
for sub_key, sub_value in value.items():
if isinstance(sub_value, dict):
# 这里假设需要展开到三层,将其保留为三层
structured_data[sub_key] = sub_value
elif isinstance(sub_value, list):
# 将两层结构转换为三层结构
structured_data[sub_key] = {sub_key: sub_value}
else:
raise ValueError(f"'{sub_key}'的数据格式异常: {type(sub_value)}")
else:
# 已经是三层结构,保持不变
structured_data[key] = value
2024-11-18 14:25:03 +08:00
elif isinstance(value, list):
2024-11-22 17:41:06 +08:00
# 将两层结构转换为三层结构
2024-11-18 14:25:03 +08:00
structured_data[key] = {key: value}
2024-11-22 17:41:06 +08:00
else:
raise ValueError(f"'{key}'的数据格式异常: {type(value)}")
2024-11-18 14:25:03 +08:00
return structured_data
2024-11-22 17:41:06 +08:00
2024-11-18 14:25:03 +08:00
# 定义获取所有以''结尾的前缀的函数
def get_prefixes(s):
prefixes = []
for i in range(len(s)):
if s[i] == '':
prefixes.append(s[:i+1])
return prefixes
# 定义删除公共前缀的函数
def remove_common_prefixes(string_list):
# 构建前缀到字符串集合的映射
prefix_to_strings = {}
for s in string_list:
prefixes = get_prefixes(s)
unique_prefixes = set(prefixes)
for prefix in unique_prefixes:
if prefix not in prefix_to_strings:
prefix_to_strings[prefix] = set()
prefix_to_strings[prefix].add(s)
# 找出至少在两个字符串中出现的前缀
prefixes_occuring_in_multiple_strings = [prefix for prefix, strings in prefix_to_strings.items() if len(strings) >=2]
# 对每个字符串,找到其匹配的最长前缀并删除
new_string_list = []
for s in string_list:
applicable_prefixes = [prefix for prefix in prefixes_occuring_in_multiple_strings if s.startswith(prefix)]
if applicable_prefixes:
# 找到最长的前缀
longest_prefix = max(applicable_prefixes, key=len)
# 删除前缀
new_s = s[len(longest_prefix):]
new_string_list.append(new_s)
else:
new_string_list.append(s)
return new_string_list
2024-11-22 17:41:06 +08:00
#TODO:目前对于超过四层的数据无法平坦化为3层
2024-11-18 14:25:03 +08:00
if __name__ == "__main__":
# 示例数据
sample_data = {
"交通信号机": [
"★应采用区域控制信号机,并应与广水市交通信号控制系统兼容,信号机能接入已有系统平台,实现联网优化功能。",
"1、控制功能1区域协调控制可对单个孤立交叉口、干道多个交叉口和关联性较强的交叉口群进行综合性地信号控制。",
"1、控制功能2线性协调控制可对干道多个相邻交叉口进行协调控制。",
"1、控制功能3多时段控制可根据交叉口的交通状况将每天划分为多个不同的时段每个时段配置不同的控制方案能设置至少 10个时段、10种以上不同控制方案能根据不同周日类型对方案进行调整。信号机能够根据内置时钟选择各个时段的控制方案实现交叉口的合理控制。",
"2、采集功能1信号机支持接入线圈、地磁、视频、微波、超声波检测器、RFID等多种检测方式。",
"2、采集功能2信号机支持交通信息采集与统计,并支持交通流量共享。",
"3、运维功能1信号机能够自动检测地磁故障若故障能够自动上传故障信息至监控中心。"
],
"高清视频抓拍像机": [
"1摄像机有效像素≥900W像素",
"1摄像机最低照度彩色≤0.001lx",
"1摄像机传感器类型≥1英寸全局曝光 COMS/GMOS/GS COMS",
"1摄像机电子快门至少满足 1/25s至 1/100,000s可调",
"2视频图像视频压缩标准至少支持 H264、H265等",
"2视频图像视频分辨率≥4096×2160向下可设置",
],
}
# 处理数据
result = all_postprocess(sample_data)
# 输出处理结果
print(json.dumps(result,ensure_ascii=False,indent=4))