446 lines
15 KiB
Python
446 lines
15 KiB
Python
# -*- encoding:utf-8 -*-
|
||
import json
|
||
import re
|
||
from collections import defaultdict
|
||
|
||
|
||
def insert_missing_commas(json_str):
|
||
"""
|
||
使用正则表达式在缺失逗号的位置插入逗号。
|
||
具体来说,寻找一个值的结束引号后紧跟着下一个键的开始引号,并在中间插入逗号。
|
||
"""
|
||
# 这个正则匹配一个字符串结尾的引号,可能有空白字符,然后是另一个键的引号
|
||
pattern = r'(":\s*"[^"]*)"\s*(")'
|
||
replacement = r'\1", \2'
|
||
|
||
previous_str = None
|
||
while previous_str != json_str:
|
||
previous_str = json_str
|
||
json_str = re.sub(pattern, replacement, json_str)
|
||
|
||
return json_str
|
||
|
||
|
||
def fix_json_escape_sequences(json_str):
|
||
"""
|
||
修复 JSON 字符串中的非法转义序列。
|
||
将所有不符合 JSON 规范的反斜杠进行转义。
|
||
"""
|
||
# JSON 中合法的转义字符
|
||
valid_escapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
|
||
|
||
# 使用正则表达式找到所有反斜杠
|
||
# 如果反斜杠后面不是合法的转义字符,则进行转义
|
||
pattern = re.compile(r'\\(?!["\\/bfnrtu])')
|
||
fixed_str = pattern.sub(r'\\\\', json_str)
|
||
return fixed_str
|
||
|
||
def replace_latex_expressions_in_dict(obj):
|
||
"""
|
||
递归遍历字典或列表,替换其中的 LaTeX 表达式。
|
||
"""
|
||
if isinstance(obj, dict):
|
||
return {k: replace_latex_expressions_in_dict(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [replace_latex_expressions_in_dict(item) for item in obj]
|
||
elif isinstance(obj, str):
|
||
# 仅处理被 $...$ 包围的内容
|
||
def replace_match(match):
|
||
expr = match.group(1)
|
||
return replace_latex_expressions(expr)
|
||
|
||
# 替换所有 $...$ 包围的内容
|
||
return re.sub(r'\$(.*?)\$', replace_match, obj)
|
||
else:
|
||
return obj
|
||
def replace_latex_expressions(json_str):
|
||
"""
|
||
替换 JSON 字符串中的 LaTeX 风格表达式。
|
||
例如,将 $三 \geq 2 m$ 替换为 三 ≥2米
|
||
"""
|
||
# 定义 LaTeX 符号到 Unicode 字符的映射
|
||
latex_mapping = {
|
||
r'\geq': '≥',
|
||
r'\leq': '≤',
|
||
r'\times': '×',
|
||
r'\frac': '/', # 简单处理分数
|
||
r'\neq': '≠',
|
||
r'\approx': '≈',
|
||
r'\pm': '±',
|
||
r'\alpha': 'α',
|
||
r'\beta': 'β',
|
||
r'\gamma': 'γ',
|
||
r'\delta': 'δ',
|
||
r'\pi': 'π',
|
||
r'\sqrt': '√',
|
||
r'\infty': '∞',
|
||
r'\cup': '∪',
|
||
r'\cap': '∩',
|
||
r'\subseteq': '⊆',
|
||
r'\supseteq': '⊇',
|
||
r'\forall': '∀',
|
||
r'\exists': '∃',
|
||
r'\rightarrow': '→',
|
||
r'\leftarrow': '←',
|
||
# 添加更多需要的映射
|
||
}
|
||
|
||
# 处理每一个 LaTeX 表达式
|
||
def replace_match(match):
|
||
expr = match.group(1)
|
||
for latex, char in latex_mapping.items():
|
||
expr = expr.replace(latex, char)
|
||
# 替换单位符号,例如 ' m' 到 '米', ' s' 到 '秒', 等等
|
||
expr = re.sub(r'(\d+)\s*m', r'\1米', expr)
|
||
expr = re.sub(r'(\d+)\s*s', r'\1秒', expr)
|
||
expr = re.sub(r'(\d+)\s*kg', r'\1公斤', expr)
|
||
expr = re.sub(r'(\d+)\s*A', r'\1安', expr) # 例如电流单位安培
|
||
# 继续添加更多单位
|
||
return expr
|
||
|
||
# 替换所有 $...$ 包围的内容
|
||
fixed_str = re.sub(r'\$(.*?)\$', replace_match, json_str)
|
||
return fixed_str
|
||
|
||
def load_json_with_duplicates(json_str):
|
||
"""
|
||
使用 object_pairs_hook 来加载包含重复键的 JSON 字符串,
|
||
并将重复的键存储为列表。
|
||
"""
|
||
def parse_object(pairs):
|
||
obj = defaultdict(list)
|
||
for key, value in pairs:
|
||
obj[key].append(value)
|
||
return obj
|
||
|
||
return json.loads(json_str, object_pairs_hook=parse_object)
|
||
|
||
def merge_duplicates(obj):
|
||
"""
|
||
递归合并包含重复键的字典,优先保留字典类型的值。
|
||
"""
|
||
if isinstance(obj, dict):
|
||
merged = {}
|
||
for key, values in obj.items():
|
||
if len(values) == 1:
|
||
merged[key] = merge_duplicates(values[0])
|
||
else:
|
||
# 如果有多个值,优先保留字典类型的值
|
||
dict_values = [v for v in values if isinstance(v, dict)]
|
||
if dict_values:
|
||
# 如果存在字典类型的值,递归合并它们
|
||
merged_value = {}
|
||
for dv in dict_values:
|
||
merged_value.update(merge_duplicates(dv))
|
||
merged[key] = merged_value
|
||
else:
|
||
# 否则,保留最后一个值
|
||
merged[key] = merge_duplicates(values[-1])
|
||
return merged
|
||
elif isinstance(obj, list):
|
||
return [merge_duplicates(item) for item in obj]
|
||
else:
|
||
return obj
|
||
|
||
def parse_json_with_duplicates(raw_string):
|
||
"""
|
||
解析具有重复键的 JSON 字符串,将所有重复的键值对存储为列表。
|
||
|
||
Args:
|
||
json_string (str): 需要解析的 JSON 字符串。
|
||
|
||
Returns:
|
||
dict: 解析后的字典,重复的键对应的值为列表。
|
||
|
||
eg:输入:"综合实力": {
|
||
"评分": "2分",
|
||
"要求": "投标人具备电子与智能化工程专业承包二级资质及以上证书得 2分,不能够提供不得分(开标时需提供原件)。"
|
||
},
|
||
"综合实力": {
|
||
"评分": "2分",
|
||
"要求": "投标人具有建筑机电安装工程专业承包三级资质或以上资质得 2分,否则不得分。(证书开标原件备查)。"
|
||
}
|
||
输出:"综合实力": [
|
||
{
|
||
"评分": "2分",
|
||
"要求": "投标人具备电子与智能化工程专业承包二级资质及以上证书得 2分,不能够提供不得分(开标时需提供原件)。"
|
||
},
|
||
{
|
||
"评分": "2分",
|
||
"要求": "投标人具有建筑机电安装工程专业承包三级资质或以上资质得 2分,否则不得分。(证书开标原件备查)。"
|
||
}]
|
||
"""
|
||
|
||
def custom_object_pairs_hook(pairs):
|
||
d = defaultdict(list)
|
||
for key, value in pairs:
|
||
try:
|
||
# 如果值是字典或列表,递归处理
|
||
if isinstance(value, dict):
|
||
value = process_dict(value)
|
||
elif isinstance(value, list):
|
||
value = process_list(value)
|
||
d[key].append(value)
|
||
except Exception as e:
|
||
d[key].append(value) # 根据需求决定是否跳过或保留原值
|
||
# 将有多个值的键转换为列表,单个值的键保持原样
|
||
return {key: (values if len(values) > 1 else values[0]) for key, values in d.items()}
|
||
|
||
def process_dict(d):
|
||
"""
|
||
递归处理字典,确保所有重复键的值为列表。
|
||
|
||
Args:
|
||
d (dict): 需要处理的字典。
|
||
|
||
Returns:
|
||
dict: 处理后的字典。
|
||
"""
|
||
try:
|
||
return custom_object_pairs_hook(d.items())
|
||
except Exception as e:
|
||
return {}
|
||
|
||
def process_list(l):
|
||
"""
|
||
递归处理列表,确保列表中的所有字典也被处理。
|
||
|
||
Args:
|
||
l (list): 需要处理的列表。
|
||
|
||
Returns:
|
||
list: 处理后的列表。
|
||
"""
|
||
try:
|
||
return [process_dict(item) if isinstance(item, dict) else item for item in l]
|
||
except Exception as e:
|
||
return []
|
||
|
||
"""输入字符串,提取 { 和 } 之间的内容,并将其解析为字典"""
|
||
if not raw_string or not raw_string.strip():
|
||
return {}
|
||
match = re.search(r'\{[\s\S]*\}', raw_string)
|
||
if match:
|
||
json_string = match.group(0)
|
||
return json.loads(json_string, object_pairs_hook=custom_object_pairs_hook)
|
||
else:
|
||
print("未找到有效的 JSON 内容。")
|
||
return {} # 返回空字典
|
||
|
||
def extract_content_from_json(input_string,flag=False):
|
||
"""
|
||
输入字符串,尝试解析 JSON 数据:
|
||
1. 如果成功解析,返回字典。
|
||
"""
|
||
if not input_string or not input_string.strip():
|
||
return {}
|
||
|
||
# 提取第一个匹配的 JSON 对象
|
||
match = re.search(r'\{[\s\S]*\}', input_string)
|
||
if not match:
|
||
print("未找到有效的 JSON 内容。")
|
||
return {} # 返回空字典
|
||
|
||
original_json = match.group(0)
|
||
|
||
def parse_json(json_str):
|
||
"""
|
||
根据 flag 参数解析 JSON 字符串。
|
||
|
||
Args:
|
||
json_str (str): 需要解析的 JSON 字符串。
|
||
|
||
Returns:
|
||
dict: 解析后的字典。
|
||
"""
|
||
if not flag:
|
||
data_with_duplicates = load_json_with_duplicates(json_str)
|
||
return merge_duplicates(data_with_duplicates)
|
||
else:
|
||
return parse_json_with_duplicates(json_str)
|
||
|
||
# 尝试直接解析原始 JSON 数据
|
||
try:
|
||
parsed_data = parse_json(original_json)
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("直接解析原始 JSON 失败。")
|
||
|
||
# 方法1:逗号修复
|
||
try:
|
||
fixed_json1 = insert_missing_commas(original_json)
|
||
parsed_data = parse_json(fixed_json1)
|
||
print("使用方法1:逗号修复并处理重复键成功。")
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("方法1(逗号修复)解析失败。")
|
||
|
||
# 方法2:LaTeX 表达式替换
|
||
try:
|
||
fixed_json2 = replace_latex_expressions(original_json)
|
||
parsed_data = parse_json(fixed_json2)
|
||
print("使用方法2:LaTeX 表达式替换并处理重复键成功。")
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("方法2(LaTeX 替换)解析失败。")
|
||
|
||
# 方法3:非法转义序列修复
|
||
try:
|
||
fixed_json3 = fix_json_escape_sequences(original_json)
|
||
parsed_data = parse_json(fixed_json3)
|
||
print("使用方法3:非法转义序列修复并处理重复键成功。")
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("方法3(非法转义修复)解析失败。")
|
||
|
||
# 如果所有方法都失败,检查字符串长度
|
||
print("所有修复方法均失败。传入的字符串:")
|
||
print(input_string)
|
||
return {} # 返回空字典
|
||
|
||
def clean_json_string(json_string,flag=False):
|
||
"""清理JSON字符串,移除多余的反引号并解析为字典"""
|
||
return extract_content_from_json(json_string,flag)
|
||
|
||
|
||
def combine_json_results(json_lists):
|
||
"""
|
||
将类json格式的列表整合成json数据(即大括号{}包裹)。
|
||
支持列表中的元素既是字符串又是字典。
|
||
"""
|
||
combined_result = {}
|
||
for item in json_lists:
|
||
if isinstance(item, str):
|
||
if item.strip():
|
||
json_data = clean_json_string(item)
|
||
if isinstance(json_data, dict):
|
||
combined_result.update(json_data)
|
||
else:
|
||
print(f"警告: 解析后的数据不是字典类型,跳过。内容: {item}")
|
||
elif isinstance(item, dict):
|
||
combined_result.update(item)
|
||
else:
|
||
print(f"警告: 不支持的类型 {type(item)},跳过。内容: {item}")
|
||
return combined_result
|
||
|
||
|
||
def nest_json_under_key(data, key):
|
||
"""
|
||
将给定的字典 data 嵌套在一个新的字典层级下,该层级由 key 指定,并返回 JSON 格式的字符串。
|
||
|
||
参数:
|
||
- data: dict, 要嵌套的原始字典。
|
||
- key: str, 新层级的键名。
|
||
|
||
返回:
|
||
- 嵌套后的 JSON 字符串。
|
||
"""
|
||
# 创建一个新字典,其中包含一个键,该键的值是原始字典
|
||
nested_dict = {key: data}
|
||
# 将字典转换成 JSON 字符串
|
||
nested_json = json.dumps(nested_dict, ensure_ascii=False, indent=4)
|
||
return nested_json
|
||
|
||
|
||
def add_keys_to_json(target_dict, source_dict):
|
||
"""
|
||
将 source_dict 的内容添加到 target_dict 中的唯一外层键下的字典中。
|
||
|
||
参数:
|
||
target_dict (dict): 要更新的目标字典,假定只有一个外层键。
|
||
source_dict (dict): 源字典,其内容将被添加到目标字典。
|
||
|
||
返回:
|
||
dict: 更新后的字典。
|
||
"""
|
||
if not target_dict:
|
||
print("json_utils: Error: Target dictionary is empty.")
|
||
return {}
|
||
|
||
if len(target_dict) != 1:
|
||
print("json_utils: Error: Target dictionary must contain exactly one top-level key.")
|
||
return target_dict
|
||
|
||
# 获取唯一的外层键
|
||
target_key, existing_dict = next(iter(target_dict.items()))
|
||
|
||
if not isinstance(existing_dict, dict):
|
||
print(f"json_utils: Error: The value under the key '{target_key}' is not a dictionary.")
|
||
return target_dict
|
||
|
||
# 合并字典
|
||
existing_dict.update(source_dict)
|
||
|
||
# 更新原字典
|
||
target_dict[target_key] = existing_dict
|
||
|
||
return target_dict
|
||
|
||
|
||
def add_outer_key(original_data, new_key):
|
||
"""
|
||
在传入的字典外部增加一个新的外键,原始数据保持不变。
|
||
"""
|
||
# 检查输入是否为有效的字典
|
||
if not original_data or not isinstance(original_data, dict):
|
||
print("json_utils: Error: Invalid input or input is not a dictionary.") # 输入无效或不是字典
|
||
return {}
|
||
|
||
# 在原始数据外层增加一个新键
|
||
new_data = {new_key: original_data}
|
||
|
||
return new_data
|
||
def transform_json_values(data):
|
||
if isinstance(data, dict):
|
||
return {key.replace(' ', ''): transform_json_values(value) for key, value in data.items()}
|
||
elif isinstance(data, list):
|
||
return [transform_json_values(item) for item in data]
|
||
elif isinstance(data, bool):
|
||
return '是' if data else '否'
|
||
elif isinstance(data, (int, float)):
|
||
return str(data)
|
||
elif isinstance(data, str):
|
||
return data.replace('\n', '<br>')
|
||
else:
|
||
return data
|
||
|
||
if __name__ == "__main__":
|
||
data="""{
|
||
"表头": {
|
||
"内容": "技术部分",
|
||
"范围": [(0, 0), (0, 4)]
|
||
},
|
||
"序号": {
|
||
"内容": "序号",
|
||
"位置": (1, 0),
|
||
"方向": "水平"
|
||
},
|
||
"产品": {
|
||
"内容": "",
|
||
"方向": ""
|
||
},
|
||
"重要参数要求": {
|
||
"内容": "采购文件要求 部分",
|
||
"位置": (1, 1),
|
||
"方向": "水平"
|
||
},
|
||
"对标": {
|
||
"内容": "响应文件响应 部分",
|
||
"位置": (1, 2),
|
||
"方向": "水平"
|
||
},
|
||
"偏离": {
|
||
"内容": "偏离说明",
|
||
"位置": (1, 3),
|
||
"方向": "水平"
|
||
},
|
||
"备注": {
|
||
"内容": "填写相应技术文 件/说明所在页码",
|
||
"位置": (1, 4),
|
||
"方向": "水平"
|
||
}
|
||
}
|
||
"""
|
||
res=clean_json_string(data)
|
||
print(json.dumps(res,ensure_ascii=False,indent=4))
|