476 lines
17 KiB
Python
476 lines
17 KiB
Python
# -*- encoding:utf-8 -*-
|
||
import json
|
||
import re
|
||
from collections import defaultdict
|
||
|
||
|
||
def insert_missing_commas(json_str):
|
||
"""
|
||
使用正则表达式在缺失逗号的位置插入逗号。
|
||
具体来说,寻找一个值的结束引号后紧跟着下一个键的开始引号,并在中间插入逗号。
|
||
"""
|
||
# 这个正则匹配一个字符串结尾的引号,可能有空白字符,然后是另一个键的引号
|
||
pattern = r'(":\s*"[^"]*)"\s*(")'
|
||
replacement = r'\1", \2'
|
||
|
||
previous_str = None
|
||
while previous_str != json_str:
|
||
previous_str = json_str
|
||
json_str = re.sub(pattern, replacement, json_str)
|
||
|
||
return json_str
|
||
|
||
|
||
def fix_json_escape_sequences(json_str):
|
||
"""
|
||
修复 JSON 字符串中的非法转义序列。
|
||
将所有不符合 JSON 规范的反斜杠进行转义。
|
||
"""
|
||
# JSON 中合法的转义字符
|
||
valid_escapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
|
||
|
||
# 使用正则表达式找到所有反斜杠
|
||
# 如果反斜杠后面不是合法的转义字符,则进行转义
|
||
pattern = re.compile(r'\\(?!["\\/bfnrtu])')
|
||
fixed_str = pattern.sub(r'\\\\', json_str)
|
||
return fixed_str
|
||
|
||
def replace_latex_expressions_in_dict(obj):
|
||
"""
|
||
递归遍历字典或列表,替换其中的 LaTeX 表达式。
|
||
"""
|
||
if isinstance(obj, dict):
|
||
return {k: replace_latex_expressions_in_dict(v) for k, v in obj.items()}
|
||
elif isinstance(obj, list):
|
||
return [replace_latex_expressions_in_dict(item) for item in obj]
|
||
elif isinstance(obj, str):
|
||
# 仅处理被 $...$ 包围的内容
|
||
def replace_match(match):
|
||
expr = match.group(1)
|
||
return replace_latex_expressions(expr)
|
||
|
||
# 替换所有 $...$ 包围的内容
|
||
return re.sub(r'\$(.*?)\$', replace_match, obj)
|
||
else:
|
||
return obj
|
||
def replace_latex_expressions(json_str):
|
||
"""
|
||
替换 JSON 字符串中的 LaTeX 风格表达式。
|
||
例如,将 $三 \geq 2 m$ 替换为 三 ≥2米
|
||
"""
|
||
# 定义 LaTeX 符号到 Unicode 字符的映射
|
||
latex_mapping = {
|
||
r'\geq': '≥',
|
||
r'\leq': '≤',
|
||
r'\times': '×',
|
||
r'\frac': '/', # 简单处理分数
|
||
r'\neq': '≠',
|
||
r'\approx': '≈',
|
||
r'\pm': '±',
|
||
r'\alpha': 'α',
|
||
r'\beta': 'β',
|
||
r'\gamma': 'γ',
|
||
r'\delta': 'δ',
|
||
r'\pi': 'π',
|
||
r'\sqrt': '√',
|
||
r'\infty': '∞',
|
||
r'\cup': '∪',
|
||
r'\cap': '∩',
|
||
r'\subseteq': '⊆',
|
||
r'\supseteq': '⊇',
|
||
r'\forall': '∀',
|
||
r'\exists': '∃',
|
||
r'\rightarrow': '→',
|
||
r'\leftarrow': '←',
|
||
# 添加更多需要的映射
|
||
}
|
||
|
||
# 处理每一个 LaTeX 表达式
|
||
def replace_match(match):
|
||
expr = match.group(1)
|
||
for latex, char in latex_mapping.items():
|
||
expr = expr.replace(latex, char)
|
||
# 替换单位符号,例如 ' m' 到 '米', ' s' 到 '秒', 等等
|
||
expr = re.sub(r'(\d+)\s*m', r'\1米', expr)
|
||
expr = re.sub(r'(\d+)\s*s', r'\1秒', expr)
|
||
expr = re.sub(r'(\d+)\s*kg', r'\1公斤', expr)
|
||
expr = re.sub(r'(\d+)\s*A', r'\1安', expr) # 例如电流单位安培
|
||
# 继续添加更多单位
|
||
return expr
|
||
|
||
# 替换所有 $...$ 包围的内容
|
||
fixed_str = re.sub(r'\$(.*?)\$', replace_match, json_str)
|
||
return fixed_str
|
||
|
||
def load_json_with_duplicates(json_str):
|
||
"""
|
||
使用 object_pairs_hook 来加载包含重复键的 JSON 字符串,
|
||
并将重复的键存储为列表。
|
||
"""
|
||
def parse_object(pairs):
|
||
obj = defaultdict(list)
|
||
for key, value in pairs:
|
||
obj[key].append(value)
|
||
return obj
|
||
|
||
return json.loads(json_str, object_pairs_hook=parse_object)
|
||
|
||
def merge_duplicates(obj):
|
||
"""
|
||
递归合并包含重复键的字典,优先保留字典类型的值。
|
||
"""
|
||
if isinstance(obj, dict):
|
||
merged = {}
|
||
for key, values in obj.items():
|
||
if len(values) == 1:
|
||
merged[key] = merge_duplicates(values[0])
|
||
else:
|
||
# 如果有多个值,优先保留字典类型的值
|
||
dict_values = [v for v in values if isinstance(v, dict)]
|
||
if dict_values:
|
||
# 如果存在字典类型的值,递归合并它们
|
||
merged_value = {}
|
||
for dv in dict_values:
|
||
merged_value.update(merge_duplicates(dv))
|
||
merged[key] = merged_value
|
||
else:
|
||
# 否则,保留最后一个值
|
||
merged[key] = merge_duplicates(values[-1])
|
||
return merged
|
||
elif isinstance(obj, list):
|
||
return [merge_duplicates(item) for item in obj]
|
||
else:
|
||
return obj
|
||
|
||
def parse_json_with_duplicates(raw_string):
|
||
"""
|
||
解析具有重复键的 JSON 字符串,将所有重复的键值对存储为列表。
|
||
|
||
Args:
|
||
json_string (str): 需要解析的 JSON 字符串。
|
||
|
||
Returns:
|
||
dict: 解析后的字典,重复的键对应的值为列表。
|
||
|
||
eg:输入:"综合实力": {
|
||
"评分": "2分",
|
||
"要求": "投标人具备电子与智能化工程专业承包二级资质及以上证书得 2分,不能够提供不得分(开标时需提供原件)。"
|
||
},
|
||
"综合实力": {
|
||
"评分": "2分",
|
||
"要求": "投标人具有建筑机电安装工程专业承包三级资质或以上资质得 2分,否则不得分。(证书开标原件备查)。"
|
||
}
|
||
输出:"综合实力": [
|
||
{
|
||
"评分": "2分",
|
||
"要求": "投标人具备电子与智能化工程专业承包二级资质及以上证书得 2分,不能够提供不得分(开标时需提供原件)。"
|
||
},
|
||
{
|
||
"评分": "2分",
|
||
"要求": "投标人具有建筑机电安装工程专业承包三级资质或以上资质得 2分,否则不得分。(证书开标原件备查)。"
|
||
}]
|
||
"""
|
||
|
||
def custom_object_pairs_hook(pairs):
|
||
d = defaultdict(list)
|
||
for key, value in pairs:
|
||
try:
|
||
# 如果值是字典或列表,递归处理
|
||
if isinstance(value, dict):
|
||
value = process_dict(value)
|
||
elif isinstance(value, list):
|
||
value = process_list(value)
|
||
d[key].append(value)
|
||
except Exception as e:
|
||
d[key].append(value) # 根据需求决定是否跳过或保留原值
|
||
# 将有多个值的键转换为列表,单个值的键保持原样
|
||
return {key: (values if len(values) > 1 else values[0]) for key, values in d.items()}
|
||
|
||
def process_dict(d):
|
||
"""
|
||
递归处理字典,确保所有重复键的值为列表。
|
||
|
||
Args:
|
||
d (dict): 需要处理的字典。
|
||
|
||
Returns:
|
||
dict: 处理后的字典。
|
||
"""
|
||
try:
|
||
return custom_object_pairs_hook(d.items())
|
||
except Exception as e:
|
||
return {}
|
||
|
||
def process_list(l):
|
||
"""
|
||
递归处理列表,确保列表中的所有字典也被处理。
|
||
|
||
Args:
|
||
l (list): 需要处理的列表。
|
||
|
||
Returns:
|
||
list: 处理后的列表。
|
||
"""
|
||
try:
|
||
return [process_dict(item) if isinstance(item, dict) else item for item in l]
|
||
except Exception as e:
|
||
return []
|
||
|
||
"""输入字符串,提取 { 和 } 之间的内容,并将其解析为字典"""
|
||
if not raw_string or not raw_string.strip():
|
||
return {}
|
||
match = re.search(r'\{[\s\S]*\}', raw_string)
|
||
if match:
|
||
json_string = match.group(0)
|
||
return json.loads(json_string, object_pairs_hook=custom_object_pairs_hook)
|
||
else:
|
||
print("未找到有效的 JSON 内容。")
|
||
return {} # 返回空字典
|
||
|
||
def extract_first_json(s):
|
||
"""
|
||
从字符串中提取第一个完整的 JSON 对象。如果 JSON 对象不完整,尝试补全缺失的括号。
|
||
|
||
Args:
|
||
s (str): 输入字符串。
|
||
|
||
Returns:
|
||
str or None: 提取到的 JSON 字符串,或 None 如果未找到。
|
||
"""
|
||
start = s.find('{')
|
||
if start == -1:
|
||
return None
|
||
|
||
stack = []
|
||
for i in range(start, len(s)):
|
||
char = s[i]
|
||
if char == '{':
|
||
stack.append('{')
|
||
elif char == '}':
|
||
if stack:
|
||
stack.pop()
|
||
if not stack:
|
||
# 找到一个完整的 JSON 对象
|
||
return s[start:i+1]
|
||
else:
|
||
# 多余的右括号,忽略
|
||
pass
|
||
# 如果遍历完后栈不为空,补全缺失的右括号
|
||
if stack:
|
||
missing = '}' * len(stack)
|
||
return s[start:] + missing
|
||
return None
|
||
|
||
def extract_content_from_json(input_string,flag=False):
|
||
"""
|
||
输入字符串,尝试解析 JSON 数据:
|
||
1. 如果成功解析,返回字典。
|
||
"""
|
||
if not input_string or not input_string.strip():
|
||
return {}
|
||
|
||
# 提取第一个匹配的 JSON 对象
|
||
match = re.search(r'\{[\s\S]*\}', input_string)
|
||
if not match:
|
||
print("未找到有效的 JSON 内容。")
|
||
return {} # 返回空字典
|
||
|
||
original_json = match.group(0)
|
||
|
||
def parse_json(json_str):
|
||
"""
|
||
根据 flag 参数解析 JSON 字符串。
|
||
|
||
Args:
|
||
json_str (str): 需要解析的 JSON 字符串。
|
||
|
||
Returns:
|
||
dict: 解析后的字典。
|
||
"""
|
||
if not flag:
|
||
data_with_duplicates = load_json_with_duplicates(json_str)
|
||
return merge_duplicates(data_with_duplicates)
|
||
else:
|
||
return parse_json_with_duplicates(json_str)
|
||
|
||
# 尝试直接解析原始 JSON 数据
|
||
try:
|
||
parsed_data = parse_json(original_json)
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("直接解析原始 JSON 失败。")
|
||
|
||
# 方法1:逗号修复
|
||
try:
|
||
fixed_json1 = insert_missing_commas(original_json)
|
||
parsed_data = parse_json(fixed_json1)
|
||
print("使用方法1:逗号修复并处理重复键成功。")
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("方法1(逗号修复)解析失败。")
|
||
|
||
# 方法2:LaTeX 表达式替换
|
||
try:
|
||
fixed_json2 = replace_latex_expressions(original_json)
|
||
parsed_data = parse_json(fixed_json2)
|
||
print("使用方法2:LaTeX 表达式替换并处理重复键成功。")
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("方法2(LaTeX 替换)解析失败。")
|
||
|
||
# 方法3:非法转义序列修复
|
||
try:
|
||
fixed_json3 = fix_json_escape_sequences(original_json)
|
||
parsed_data = parse_json(fixed_json3)
|
||
print("使用方法3:非法转义序列修复并处理重复键成功。")
|
||
return parsed_data # 返回解析后的字典
|
||
except json.JSONDecodeError:
|
||
print("方法3(非法转义修复)解析失败。")
|
||
# 所有方法均失败后,尝试使用 extract_first_json 作为最后手段
|
||
print("尝试使用 extract_first_json 作为最后手段。")
|
||
fixed_json_final = extract_first_json(input_string)
|
||
if fixed_json_final:
|
||
try:
|
||
parsed_data = parse_json(fixed_json_final)
|
||
print("使用 extract_first_json 后解析成功。")
|
||
return parsed_data
|
||
except json.JSONDecodeError:
|
||
print("使用 extract_first_json 后解析失败。")
|
||
else:
|
||
print("extract_first_json 未能提取到有效的 JSON。")
|
||
|
||
print("所有修复方法均失败。传入的字符串:")
|
||
print(input_string)
|
||
print("-------------------")
|
||
return {} # 返回空字典
|
||
|
||
def clean_json_string(json_string,flag=False):
|
||
"""清理JSON字符串,移除多余的反引号并解析为字典"""
|
||
return extract_content_from_json(json_string,flag)
|
||
|
||
|
||
def combine_json_results(json_lists):
|
||
"""
|
||
将类json格式的列表整合成json数据(即大括号{}包裹)。
|
||
支持列表中的元素既是字符串又是字典。
|
||
"""
|
||
combined_result = {}
|
||
for item in json_lists:
|
||
if isinstance(item, str):
|
||
if item.strip():
|
||
json_data = clean_json_string(item)
|
||
if isinstance(json_data, dict):
|
||
combined_result.update(json_data)
|
||
else:
|
||
print(f"警告: 解析后的数据不是字典类型,跳过。内容: {item}")
|
||
elif isinstance(item, dict):
|
||
combined_result.update(item)
|
||
else:
|
||
print(f"警告: 不支持的类型 {type(item)},跳过。内容: {item}")
|
||
return combined_result
|
||
|
||
|
||
def nest_json_under_key(data, key):
|
||
"""
|
||
将给定的字典 data 嵌套在一个新的字典层级下,该层级由 key 指定,并返回 JSON 格式的字符串。
|
||
|
||
参数:
|
||
- data: dict, 要嵌套的原始字典。
|
||
- key: str, 新层级的键名。
|
||
|
||
返回:
|
||
- 嵌套后的 JSON 字符串。
|
||
"""
|
||
# 创建一个新字典,其中包含一个键,该键的值是原始字典
|
||
nested_dict = {key: data}
|
||
# 将字典转换成 JSON 字符串
|
||
nested_json = json.dumps(nested_dict, ensure_ascii=False, indent=4)
|
||
return nested_json
|
||
|
||
|
||
def add_keys_to_json(target_dict, source_dict):
|
||
"""
|
||
将 source_dict 的内容添加到 target_dict 中的唯一外层键下的字典中。
|
||
|
||
参数:
|
||
target_dict (dict): 要更新的目标字典,假定只有一个外层键。
|
||
source_dict (dict): 源字典,其内容将被添加到目标字典。
|
||
|
||
返回:
|
||
dict: 更新后的字典。
|
||
"""
|
||
if not target_dict:
|
||
print("json_utils: Error: Target dictionary is empty.")
|
||
return {}
|
||
|
||
if len(target_dict) != 1:
|
||
print("json_utils: Error: Target dictionary must contain exactly one top-level key.")
|
||
return target_dict
|
||
|
||
# 获取唯一的外层键
|
||
target_key, existing_dict = next(iter(target_dict.items()))
|
||
|
||
if not isinstance(existing_dict, dict):
|
||
print(f"json_utils: Error: The value under the key '{target_key}' is not a dictionary.")
|
||
return target_dict
|
||
|
||
# 合并字典
|
||
existing_dict.update(source_dict)
|
||
|
||
# 更新原字典
|
||
target_dict[target_key] = existing_dict
|
||
|
||
return target_dict
|
||
|
||
|
||
def add_outer_key(original_data, new_key):
|
||
"""
|
||
在传入的字典外部增加一个新的外键,原始数据保持不变。
|
||
"""
|
||
# 检查输入是否为有效的字典
|
||
if not original_data or not isinstance(original_data, dict):
|
||
print("json_utils: Error: Invalid input or input is not a dictionary.") # 输入无效或不是字典
|
||
return {}
|
||
|
||
# 在原始数据外层增加一个新键
|
||
new_data = {new_key: original_data}
|
||
|
||
return new_data
|
||
def transform_json_values(data):
|
||
if isinstance(data, dict):
|
||
return {key.replace(' ', ''): transform_json_values(value) for key, value in data.items()}
|
||
elif isinstance(data, list):
|
||
return [transform_json_values(item) for item in data]
|
||
elif isinstance(data, bool):
|
||
return '是' if data else '否'
|
||
elif isinstance(data, (int, float)):
|
||
return str(data)
|
||
elif isinstance(data, str):
|
||
return data.replace('\n', '<br>')
|
||
else:
|
||
return data
|
||
|
||
if __name__ == "__main__":
|
||
data="""{
|
||
"新内容": "六、残疾人福利性单位声明函(适用于货物类项目
|
||
|
||
广水市政府采购中心:
|
||
本单位郑重声明,根据《财政部 民政部 中国残疾人联合会关于促进残疾人就业政府采购政策的通知》(财库〔2017〕141号)的规定,本单位为符合条件的残疾人福利性单位(详见“残疾人福利性单位应当满足的条件”)。
|
||
1.本单位授权[$$blank_mark_1$$](投标人)参加[$$blank_mark_2$$](采购人)的项目(项目编号:[$$blank_mark_3$$])采购活动提供本单位制造的货物。本条所称货物不包括使用非残疾人福利性单位注册商标的货物。
|
||
2.由本单位制造的货物清单见下表:
|
||
|
||
|
||
本单位对上述声明的真实性负责。如有虚假,将依法承担相应责任。
|
||
|
||
说明:1、投标人所投货物为自己制造的,也应按本声明函格式填写。
|
||
2、组成联合体的大中型企业和其他自然人、法人或者其他组织,与残疾人福利性单位之间不得存在投资关系。
|
||
3、以联合体方式参与项目投标的供应商,应由联合体双方签字盖章。
|
||
|
||
制造商(公章):
|
||
制造商法定代表人(签字或盖章):
|
||
日期:"
|
||
}
|
||
"""
|
||
res=clean_json_string(data)
|
||
print(json.dumps(res,ensure_ascii=False,indent=4))
|