zbparse/flask_app/货物标/技术参数要求提取后处理函数.py
2024-11-22 16:33:00 +08:00

227 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
import string
from collections import OrderedDict
from collections import defaultdict
#传输技术参数需求的时候后处理
def extract_matching_keys(data, good_list, special_keys=None, parent_key=''):
def get_suffix(n):
"""
根据数字n返回对应的字母后缀。
1 -> 'a', 2 -> 'b', ..., 26 -> 'z', 27 -> 'aa', 28 -> 'ab', ...
"""
suffix = ''
while n > 0:
n, r = divmod(n - 1, 26)
suffix = chr(97 + r) + suffix
return suffix
def count_matching_keys(data, patterns, special_keys, counter=None):
"""递归统计匹配键的出现次数,仅统计值为列表的键"""
if counter is None:
counter = defaultdict(int)
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, list):
if key not in special_keys and any(pattern.match(key) for pattern in patterns):
counter[key] += 1
elif isinstance(value, dict):
count_matching_keys(value, patterns, special_keys, counter)
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
count_matching_keys(item, patterns, special_keys, counter)
return counter
def process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key):
"""递归处理数据并构建结果"""
def get_suffix_label(key):
suffix_map[key] += 1
return get_suffix(suffix_map[key])
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, list):
# 处理值为列表的键
if any(pattern.match(key) for pattern in patterns):
new_key = generate_key(key, parent_key, key_counter, suffix_map, special_keys)
filtered_data[new_key] = value
elif isinstance(value, dict):
# 继续递归处理嵌套字典
new_parent_key = key if parent_key == '' else f"{parent_key}{key}"
process_data(value, patterns, special_keys, key_counter, suffix_map,
filtered_data, new_parent_key)
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
process_data(item, patterns, special_keys, key_counter, suffix_map,
filtered_data, parent_key)
def generate_key(key, parent_key, key_counter, suffix_map, special_keys):
"""生成新的键名"""
if key in special_keys and parent_key:
return f"{parent_key}{key}"
elif key_counter[key] > 1:
suffix = get_suffix(suffix_map[key] + 1)
suffix_map[key] += 1
return f"{key}-{suffix}"
return key
if special_keys is None:
special_keys = []
patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in good_list]
# 先统计所有匹配键的出现次数,仅统计值为列表的键
key_counter = count_matching_keys(data, patterns, special_keys)
# 初始化后缀映射
suffix_map = {key: 0 for key, count in key_counter.items() if count > 1}
# 用于存储最终结果
filtered_data = {}
# 递归处理数据
process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key)
return filtered_data
def postprocess(data):
"""递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'"""
def convert_dict(value):
# 如果所有值是'/', '{}' 或 '未知'
if all(v in ['/', '未知', {}] for v in value.values()):
return list(value.keys())
else:
# 如果不满足条件,则递归处理嵌套的字典
return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()}
# 递归处理顶层数据
return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()}
def all_postprocess(data):
temp = restructure_data(data)
def recursive_process(item):
if isinstance(item, dict):
return {k: recursive_process(v) for k, v in item.items()}
elif isinstance(item, list):
return remove_common_prefixes(item)
else:
return item
processed_data = recursive_process(temp)
return processed_data
def detect_depth(data):
"""
Detects the depth of the nested dictionary.
"""
if isinstance(data, dict):
return 1 + max((detect_depth(v) for v in data.values()), default=0)
elif isinstance(data, list):
return 1 # Lists are considered the terminal layer
else:
return 0 # Base case for non-nested elements
def restructure_data(data):
"""
重构数据以标准化嵌套层级。
如果整个数据都是2层结构则保持原样返回。
如果同时存在2层和3层结构则将所有数据重构为3层格式。
如果存在3层以上统一处理为3层取内层舍弃外层
"""
# 检查数据是否包含混合的2层和3层结构
has_two_layers = False
has_three_layers = False
for key, value in data.items():
if isinstance(value, dict):
has_three_layers = True
elif isinstance(value, list):
has_two_layers = True
else:
raise ValueError(f"'{key}'的数据格式异常: {type(value)}")
# 如果只有2层结构直接返回原数据
if has_two_layers and not has_three_layers:
return data
# 如果是混合结构或仅有3层结构统一标准化为3层
structured_data = {}
for key, value in data.items():
if isinstance(value, dict):
# 已经是3层结构保持不变
structured_data[key] = value
elif isinstance(value, list):
# 将2层结构转换为3层结构
structured_data[key] = {key: value}
return structured_data
# 定义获取所有以''结尾的前缀的函数
def get_prefixes(s):
prefixes = []
for i in range(len(s)):
if s[i] == '':
prefixes.append(s[:i+1])
return prefixes
# 定义删除公共前缀的函数
def remove_common_prefixes(string_list):
# 构建前缀到字符串集合的映射
prefix_to_strings = {}
for s in string_list:
prefixes = get_prefixes(s)
unique_prefixes = set(prefixes)
for prefix in unique_prefixes:
if prefix not in prefix_to_strings:
prefix_to_strings[prefix] = set()
prefix_to_strings[prefix].add(s)
# 找出至少在两个字符串中出现的前缀
prefixes_occuring_in_multiple_strings = [prefix for prefix, strings in prefix_to_strings.items() if len(strings) >=2]
# 对每个字符串,找到其匹配的最长前缀并删除
new_string_list = []
for s in string_list:
applicable_prefixes = [prefix for prefix in prefixes_occuring_in_multiple_strings if s.startswith(prefix)]
if applicable_prefixes:
# 找到最长的前缀
longest_prefix = max(applicable_prefixes, key=len)
# 删除前缀
new_s = s[len(longest_prefix):]
new_string_list.append(new_s)
else:
new_string_list.append(s)
return new_string_list
if __name__ == "__main__":
# 示例数据
sample_data = {
"交通信号机": [
"★应采用区域控制信号机,并应与广水市交通信号控制系统兼容,信号机能接入已有系统平台,实现联网优化功能。",
"1、控制功能1区域协调控制可对单个孤立交叉口、干道多个交叉口和关联性较强的交叉口群进行综合性地信号控制。",
"1、控制功能2线性协调控制可对干道多个相邻交叉口进行协调控制。",
"1、控制功能3多时段控制可根据交叉口的交通状况将每天划分为多个不同的时段每个时段配置不同的控制方案能设置至少 10个时段、10种以上不同控制方案能根据不同周日类型对方案进行调整。信号机能够根据内置时钟选择各个时段的控制方案实现交叉口的合理控制。",
"2、采集功能1信号机支持接入线圈、地磁、视频、微波、超声波检测器、RFID等多种检测方式。",
"2、采集功能2信号机支持交通信息采集与统计,并支持交通流量共享。",
"3、运维功能1信号机能够自动检测地磁故障若故障能够自动上传故障信息至监控中心。"
],
"高清视频抓拍像机": [
"1摄像机有效像素≥900W像素",
"1摄像机最低照度彩色≤0.001lx",
"1摄像机传感器类型≥1英寸全局曝光 COMS/GMOS/GS COMS",
"1摄像机电子快门至少满足 1/25s至 1/100,000s可调",
"2视频图像视频压缩标准至少支持 H264、H265等",
"2视频图像视频分辨率≥4096×2160向下可设置",
],
}
# 处理数据
result = all_postprocess(sample_data)
# 输出处理结果
print(json.dumps(result,ensure_ascii=False,indent=4))