zbparse/flask_app/general/json_utils.py

501 lines
17 KiB
Python
Raw Permalink Normal View History

# -*- encoding:utf-8 -*-
2024-08-29 16:37:09 +08:00
import json
import re
2024-12-24 15:13:11 +08:00
from collections import defaultdict
2024-08-29 16:37:09 +08:00
2024-10-18 18:06:23 +08:00
def insert_missing_commas(json_str):
"""
使用正则表达式在缺失逗号的位置插入逗号
具体来说寻找一个值的结束引号后紧跟着下一个键的开始引号并在中间插入逗号
"""
# 这个正则匹配一个字符串结尾的引号,可能有空白字符,然后是另一个键的引号
pattern = r'(":\s*"[^"]*)"\s*(")'
replacement = r'\1", \2'
previous_str = None
while previous_str != json_str:
previous_str = json_str
json_str = re.sub(pattern, replacement, json_str)
return json_str
2024-11-26 15:06:57 +08:00
def fix_json_escape_sequences(json_str):
"""
修复 JSON 字符串中的非法转义序列
将所有不符合 JSON 规范的反斜杠进行转义
"""
# JSON 中合法的转义字符
valid_escapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
# 使用正则表达式找到所有反斜杠
# 如果反斜杠后面不是合法的转义字符,则进行转义
pattern = re.compile(r'\\(?!["\\/bfnrtu])')
fixed_str = pattern.sub(r'\\\\', json_str)
return fixed_str
2024-12-19 12:18:50 +08:00
def replace_latex_expressions_in_dict(obj):
"""
递归遍历字典或列表替换其中的 LaTeX 表达式
"""
if isinstance(obj, dict):
return {k: replace_latex_expressions_in_dict(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_latex_expressions_in_dict(item) for item in obj]
elif isinstance(obj, str):
# 仅处理被 $...$ 包围的内容
def replace_match(match):
expr = match.group(1)
return replace_latex_expressions(expr)
2024-11-26 15:06:57 +08:00
2024-12-19 12:18:50 +08:00
# 替换所有 $...$ 包围的内容
return re.sub(r'\$(.*?)\$', replace_match, obj)
else:
return obj
2024-11-26 15:06:57 +08:00
def replace_latex_expressions(json_str):
"""
替换 JSON 字符串中的 LaTeX 风格表达式
例如 $ \geq 2 m$ 替换为 2
"""
# 定义 LaTeX 符号到 Unicode 字符的映射
latex_mapping = {
r'\geq': '',
r'\leq': '',
r'\times': '×',
r'\frac': '/', # 简单处理分数
r'\neq': '',
r'\approx': '',
r'\pm': '±',
r'\alpha': 'α',
r'\beta': 'β',
r'\gamma': 'γ',
r'\delta': 'δ',
r'\pi': 'π',
r'\sqrt': '',
r'\infty': '',
r'\cup': '',
r'\cap': '',
r'\subseteq': '',
r'\supseteq': '',
r'\forall': '',
r'\exists': '',
r'\rightarrow': '',
r'\leftarrow': '',
# 添加更多需要的映射
}
# 处理每一个 LaTeX 表达式
def replace_match(match):
expr = match.group(1)
for latex, char in latex_mapping.items():
expr = expr.replace(latex, char)
# 替换单位符号,例如 ' m' 到 '米', ' s' 到 '秒', 等等
expr = re.sub(r'(\d+)\s*m', r'\1米', expr)
expr = re.sub(r'(\d+)\s*s', r'\1秒', expr)
expr = re.sub(r'(\d+)\s*kg', r'\1公斤', expr)
expr = re.sub(r'(\d+)\s*A', r'\1安', expr) # 例如电流单位安培
# 继续添加更多单位
return expr
# 替换所有 $...$ 包围的内容
fixed_str = re.sub(r'\$(.*?)\$', replace_match, json_str)
return fixed_str
2024-12-24 15:13:11 +08:00
def load_json_with_duplicates(json_str):
"""
使用 object_pairs_hook 来加载包含重复键的 JSON 字符串
并将重复的键存储为列表
"""
def parse_object(pairs):
obj = defaultdict(list)
for key, value in pairs:
obj[key].append(value)
return obj
return json.loads(json_str, object_pairs_hook=parse_object)
def merge_duplicates(obj):
"""
递归合并包含重复键的字典优先保留字典类型的值
"""
if isinstance(obj, dict):
merged = {}
for key, values in obj.items():
if len(values) == 1:
merged[key] = merge_duplicates(values[0])
else:
# 如果有多个值,优先保留字典类型的值
dict_values = [v for v in values if isinstance(v, dict)]
if dict_values:
# 如果存在字典类型的值,递归合并它们
merged_value = {}
for dv in dict_values:
merged_value.update(merge_duplicates(dv))
merged[key] = merged_value
else:
# 否则,保留最后一个值
merged[key] = merge_duplicates(values[-1])
return merged
elif isinstance(obj, list):
return [merge_duplicates(item) for item in obj]
else:
return obj
def parse_json_with_duplicates(raw_string):
"""
解析具有重复键的 JSON 字符串将所有重复的键值对存储为列表
Args:
json_string (str): 需要解析的 JSON 字符串
Returns:
dict: 解析后的字典重复的键对应的值为列表
2024-11-26 15:06:57 +08:00
2024-12-24 15:13:11 +08:00
eg:输入"综合实力": {
"评分": "2分",
"要求": "投标人具备电子与智能化工程专业承包二级资质及以上证书得 2分不能够提供不得分开标时需提供原件"
},
"综合实力": {
"评分": "2分",
"要求": "投标人具有建筑机电安装工程专业承包三级资质或以上资质得 2分否则不得分。证书开标原件备查"
}
输出"综合实力": [
{
"评分": "2分",
"要求": "投标人具备电子与智能化工程专业承包二级资质及以上证书得 2分不能够提供不得分开标时需提供原件"
},
{
"评分": "2分",
"要求": "投标人具有建筑机电安装工程专业承包三级资质或以上资质得 2分否则不得分。证书开标原件备查"
}]
"""
def custom_object_pairs_hook(pairs):
d = defaultdict(list)
for key, value in pairs:
try:
# 如果值是字典或列表,递归处理
if isinstance(value, dict):
value = process_dict(value)
elif isinstance(value, list):
value = process_list(value)
d[key].append(value)
except Exception as e:
d[key].append(value) # 根据需求决定是否跳过或保留原值
# 将有多个值的键转换为列表,单个值的键保持原样
return {key: (values if len(values) > 1 else values[0]) for key, values in d.items()}
def process_dict(d):
"""
递归处理字典确保所有重复键的值为列表
Args:
d (dict): 需要处理的字典
Returns:
dict: 处理后的字典
"""
try:
return custom_object_pairs_hook(d.items())
except Exception as e:
return {}
def process_list(l):
"""
递归处理列表确保列表中的所有字典也被处理
Args:
l (list): 需要处理的列表
Returns:
list: 处理后的列表
"""
try:
return [process_dict(item) if isinstance(item, dict) else item for item in l]
except Exception as e:
return []
"""输入字符串,提取 { 和 } 之间的内容,并将其解析为字典"""
if not raw_string or not raw_string.strip():
return {}
match = re.search(r'\{[\s\S]*\}', raw_string)
if match:
json_string = match.group(0)
return json.loads(json_string, object_pairs_hook=custom_object_pairs_hook)
2024-12-24 15:13:11 +08:00
else:
print("未找到有效的 JSON 内容。")
return {} # 返回空字典
2025-01-03 17:36:23 +08:00
#作为最后手段,因为它是统计{ }的数量,存在缺陷:如果字符串中包含'{ }' 会出错!
2025-01-03 10:45:32 +08:00
def extract_first_json(s):
"""
从字符串中提取第一个完整的 JSON 对象如果 JSON 对象不完整尝试补全缺失的括号
Args:
s (str): 输入字符串
Returns:
str or None: 提取到的 JSON 字符串 None 如果未找到
"""
start = s.find('{')
if start == -1:
return None
stack = []
for i in range(start, len(s)):
char = s[i]
if char == '{':
stack.append('{')
elif char == '}':
if stack:
stack.pop()
if not stack:
# 找到一个完整的 JSON 对象
return s[start:i+1]
else:
# 多余的右括号,忽略
pass
# 如果遍历完后栈不为空,补全缺失的右括号
if stack:
missing = '}' * len(stack)
return s[start:] + missing
return None
def extract_content_from_json(input_string,flag=False):
2024-11-26 15:06:57 +08:00
"""
输入字符串尝试解析 JSON 数据
2024-12-19 12:18:50 +08:00
1. 如果成功解析返回字典
2024-11-26 15:06:57 +08:00
"""
if not input_string or not input_string.strip():
2024-11-26 15:06:57 +08:00
return {}
# 提取第一个匹配的 JSON 对象
match = re.search(r'\{[\s\S]*\}', input_string)
2024-11-26 15:06:57 +08:00
if not match:
print("未找到有效的 JSON 内容。")
2024-12-19 12:18:50 +08:00
return {} # 返回空字典
2024-11-26 15:06:57 +08:00
original_json = match.group(0)
2024-12-24 15:13:11 +08:00
def parse_json(json_str):
"""
根据 flag 参数解析 JSON 字符串
Args:
json_str (str): 需要解析的 JSON 字符串
Returns:
dict: 解析后的字典
"""
if not flag:
data_with_duplicates = load_json_with_duplicates(json_str)
return merge_duplicates(data_with_duplicates)
else:
return parse_json_with_duplicates(json_str)
2024-11-26 15:06:57 +08:00
# 尝试直接解析原始 JSON 数据
try:
2024-12-24 15:13:11 +08:00
parsed_data = parse_json(original_json)
return parsed_data # 返回解析后的字典
2024-12-19 12:18:50 +08:00
except json.JSONDecodeError:
print("直接解析原始 JSON 失败。")
2024-11-26 15:06:57 +08:00
# 方法1逗号修复
try:
fixed_json1 = insert_missing_commas(original_json)
2024-12-24 15:13:11 +08:00
parsed_data = parse_json(fixed_json1)
print("使用方法1逗号修复并处理重复键成功。")
return parsed_data # 返回解析后的字典
2024-12-19 12:18:50 +08:00
except json.JSONDecodeError:
print("方法1逗号修复解析失败。")
2024-11-26 15:06:57 +08:00
# 方法2LaTeX 表达式替换
try:
fixed_json2 = replace_latex_expressions(original_json)
2024-12-24 15:13:11 +08:00
parsed_data = parse_json(fixed_json2)
print("使用方法2LaTeX 表达式替换并处理重复键成功。")
return parsed_data # 返回解析后的字典
2024-12-19 12:18:50 +08:00
except json.JSONDecodeError:
print("方法2LaTeX 替换)解析失败。")
2024-11-26 15:06:57 +08:00
# 方法3非法转义序列修复
try:
fixed_json3 = fix_json_escape_sequences(original_json)
2024-12-24 15:13:11 +08:00
parsed_data = parse_json(fixed_json3)
print("使用方法3非法转义序列修复并处理重复键成功。")
return parsed_data # 返回解析后的字典
2024-12-19 12:18:50 +08:00
except json.JSONDecodeError:
print("方法3非法转义修复解析失败。")
2025-01-03 10:45:32 +08:00
# 所有方法均失败后,尝试使用 extract_first_json 作为最后手段
2025-01-03 17:36:23 +08:00
print("尝试使用 extract_first_json 作为最后手段:")
print(input_string)
2025-01-03 10:45:32 +08:00
fixed_json_final = extract_first_json(input_string)
if fixed_json_final:
try:
parsed_data = parse_json(fixed_json_final)
print("使用 extract_first_json 后解析成功。")
return parsed_data
except json.JSONDecodeError:
print("使用 extract_first_json 后解析失败。")
else:
print("extract_first_json 未能提取到有效的 JSON。")
2024-12-19 12:18:50 +08:00
print("所有修复方法均失败。传入的字符串:")
print(input_string)
print("-------------------")
2024-12-20 17:24:49 +08:00
return {} # 返回空字典
2024-12-19 12:18:50 +08:00
2024-12-24 15:13:11 +08:00
def clean_json_string(json_string,flag=False):
2024-08-29 16:37:09 +08:00
"""清理JSON字符串移除多余的反引号并解析为字典"""
2024-12-24 15:13:11 +08:00
return extract_content_from_json(json_string,flag)
2024-08-29 16:37:09 +08:00
2024-10-19 17:25:56 +08:00
2024-08-29 16:37:09 +08:00
def combine_json_results(json_lists):
"""
2024-10-19 17:25:56 +08:00
将类json格式的列表整合成json数据即大括号{}包裹
支持列表中的元素既是字符串又是字典
2024-08-29 16:37:09 +08:00
"""
combined_result = {}
2024-10-19 17:25:56 +08:00
for item in json_lists:
if isinstance(item, str):
if item.strip():
json_data = clean_json_string(item)
if isinstance(json_data, dict):
combined_result.update(json_data)
else:
print(f"警告: 解析后的数据不是字典类型,跳过。内容: {item}")
elif isinstance(item, dict):
combined_result.update(item)
else:
print(f"警告: 不支持的类型 {type(item)},跳过。内容: {item}")
2024-08-29 16:37:09 +08:00
return combined_result
def nest_json_under_key(data, key):
"""
将给定的字典 data 嵌套在一个新的字典层级下该层级由 key 指定并返回 JSON 格式的字符串
参数:
- data: dict, 要嵌套的原始字典
- key: str, 新层级的键名
返回:
- 嵌套后的 JSON 字符串
"""
# 创建一个新字典,其中包含一个键,该键的值是原始字典
nested_dict = {key: data}
# 将字典转换成 JSON 字符串
nested_json = json.dumps(nested_dict, ensure_ascii=False, indent=4)
return nested_json
def add_keys_to_json(target_dict, source_dict):
"""
source_dict 的内容添加到 target_dict 中的唯一外层键下的字典中
参数:
target_dict (dict): 要更新的目标字典假定只有一个外层键
source_dict (dict): 源字典其内容将被添加到目标字典
返回:
dict: 更新后的字典
"""
if not target_dict:
2024-09-13 15:03:55 +08:00
print("json_utils: Error: Target dictionary is empty.")
2024-08-29 16:37:09 +08:00
return {}
if len(target_dict) != 1:
2024-09-13 15:03:55 +08:00
print("json_utils: Error: Target dictionary must contain exactly one top-level key.")
2024-08-29 16:37:09 +08:00
return target_dict
# 获取唯一的外层键
target_key, existing_dict = next(iter(target_dict.items()))
if not isinstance(existing_dict, dict):
2024-09-13 15:03:55 +08:00
print(f"json_utils: Error: The value under the key '{target_key}' is not a dictionary.")
2024-08-29 16:37:09 +08:00
return target_dict
# 合并字典
existing_dict.update(source_dict)
# 更新原字典
target_dict[target_key] = existing_dict
return target_dict
2024-12-12 16:06:20 +08:00
def add_outer_key(original_data, new_key):
"""
在传入的字典外部增加一个新的外键原始数据保持不变
"""
# 检查输入是否为有效的字典
2024-08-29 16:37:09 +08:00
if not original_data or not isinstance(original_data, dict):
2024-12-12 16:06:20 +08:00
print("json_utils: Error: Invalid input or input is not a dictionary.") # 输入无效或不是字典
return {}
2024-08-29 16:37:09 +08:00
2024-12-12 16:06:20 +08:00
# 在原始数据外层增加一个新键
new_data = {new_key: original_data}
2024-08-29 16:37:09 +08:00
2024-10-15 20:57:58 +08:00
return new_data
def rename_outer_key(data, new_key):
"""
将字典的最外层唯一键重命名为 new_key
参数:
data (dict): 只有一个外层键的字典
new_key (str): 新的键名
返回:
dict: 修改后的字典
"""
if not isinstance(data, dict) or len(data) != 1:
raise ValueError("输入数据必须是一个只有一个外层键的字典。")
# 获取原始键和值
old_key, value = next(iter(data.items()))
# 构造新的字典
return {new_key: value}
2024-08-29 16:37:09 +08:00
def transform_json_values(data):
if isinstance(data, dict):
2024-10-16 20:18:55 +08:00
return {key.replace(' ', ''): transform_json_values(value) for key, value in data.items()}
2024-08-29 16:37:09 +08:00
elif isinstance(data, list):
return [transform_json_values(item) for item in data]
elif isinstance(data, bool):
return '' if data else ''
elif isinstance(data, (int, float)):
return str(data)
elif isinstance(data, str):
return data.replace('\n', '<br>')
else:
return data
if __name__ == "__main__":
2024-12-19 12:18:50 +08:00
data="""{
2025-01-03 09:59:53 +08:00
"新内容": "六、残疾人福利性单位声明函(适用于货物类项目
广水市政府采购中心
本单位郑重声明根据财政部 民政部 中国残疾人联合会关于促进残疾人就业政府采购政策的通知财库2017141的规定本单位为符合条件的残疾人福利性单位详见残疾人福利性单位应当满足的条件
1本单位授权[$$blank_mark_1$$]投标人参加[$$blank_mark_2$$]采购人的项目项目编号[$$blank_mark_3$$]采购活动提供本单位制造的货物本条所称货物不包括使用非残疾人福利性单位注册商标的货物
2由本单位制造的货物清单见下表
本单位对上述声明的真实性负责如有虚假将依法承担相应责任
说明1投标人所投货物为自己制造的也应按本声明函格式填写
2组成联合体的大中型企业和其他自然人法人或者其他组织与残疾人福利性单位之间不得存在投资关系
3以联合体方式参与项目投标的供应商应由联合体双方签字盖章
制造商公章
制造商法定代表人签字或盖章
日期"
2024-12-19 12:18:50 +08:00
}
"""
res=clean_json_string(data)
print(json.dumps(res,ensure_ascii=False,indent=4))