zbparse/flask_app/general/投标人须知正文提取指定内容.py
2024-10-30 18:08:46 +08:00

192 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# common.py
import re
from functools import cmp_to_key
def compare_headings(a, b):
"""
比较两个标题,用于排序。
"""
a_nums = [int(num) for num in a[0].rstrip('.').split('.') if num.isdigit()]
b_nums = [int(num) for num in b[0].rstrip('.').split('.') if num.isdigit()]
return (a_nums > b_nums) - (a_nums < b_nums)
def preprocess_data(data):
"""
预处理数据,自动添加缺失的父层级键,并按数字顺序排序。
"""
keys_to_add = set()
for key in data.keys():
parts = key.split('.')
if len(parts) > 1:
parent_key = parts[0] + '.'
if parent_key not in data:
keys_to_add.add(parent_key)
# 添加缺失的父层级键
for parent_key in keys_to_add:
data[parent_key] = parent_key.rstrip('.')
# 对键进行排序
sorted_data = dict(sorted(data.items(), key=cmp_to_key(compare_headings)))
return sorted_data
def transform_json(data):
"""
转换结构化的JSON数据。
"""
result = {}
temp = {0: result} # 初始化根字典
data = preprocess_data(data)
# 首先,创建一个临时字典用于检查是否存在三级标题
has_subkey = {}
for key in data.keys():
parts = key.split('.')
if len(parts) > 2 and parts[1]:
parent_key = parts[0] + '.' + parts[1]
has_subkey[parent_key] = True
for key, value in data.items():
match = re.match(r'(\d+)(?:\.(\d+))?(?:\.(\d+))?', key)
if match:
levels = [int(l) for l in match.groups() if l is not None]
if (len(levels) - 1) in temp:
parent = temp[len(levels) - 1]
else:
print(f"No parent found at level {len(levels) - 1} for key '{key}'. Check the data structure.")
continue
if len(levels) == 1: # 一级标题
# 优先按 '\n' 拆分
if '\n' in value:
new_key, *new_value = value.split('\n', 1)
new_key = new_key.strip()
new_value = new_value[0].strip() if new_value else ""
# 如果没有 '\n',再检查 ':' 或 '',并进行拆分
elif ':' in value or '' in value:
delimiter = ':' if ':' in value else ''
new_key, new_value = value.split(delimiter, 1)
new_key = new_key.strip()
new_value = new_value.strip()
else:
new_key = value.strip()
new_value = ""
parent[new_key] = {}
if new_value:
parent[new_key][new_key] = new_value # 使用 new_key 作为键名,而不是固定的 "content"
temp[len(levels)] = parent[new_key]
elif len(levels) == 2: # 二级标题
new_key, *new_value = value.split('\n', 1)
new_key = new_key.strip()
new_value = new_value[0].strip() if new_value else ""
if f"{levels[0]}.{levels[1]}" in has_subkey:
parent[new_key] = [new_value] if new_value else []
else:
parent[new_key] = new_value
temp[len(levels)] = parent[new_key]
else: # 三级标题
if isinstance(parent, dict):
parent_key = list(parent.keys())[-1]
if isinstance(parent[parent_key], list):
parent[parent_key].append(value)
elif parent[parent_key]:
parent[parent_key] = [parent[parent_key], value]
else:
parent[parent_key] = [value]
elif isinstance(parent, list):
parent.append(value)
def remove_single_item_lists(node):
if isinstance(node, dict):
for key in list(node.keys()):
node[key] = remove_single_item_lists(node[key])
if isinstance(node[key], list) and len(node[key]) == 1:
node[key] = node[key][0]
return node
return remove_single_item_lists(result)
def post_process(value):
"""
处理字符串将其根据特定的序号模式分割成列表每个块至少包含50个字符。
"""
# 如果传入的是非字符串值,直接返回原值
if not isinstance(value, str):
return value
# 定义可能的分割模式及其正则表达式
patterns = [
(r'\d+、', r'(?=\d+、)'), # 匹配 '1、'
(r'[(]\d+[)]', r'(?=[(]\d+[)])'), # 匹配 '(1)' 或 '1'
(r'\d+\.', r'(?=\d+\.)'), # 匹配 '1.'
(r'[一二三四五六七八九十]、', r'(?=[一二三四五六七八九十]、)'), # 匹配 '一、'、'二、' 等
(r'[一二三四五六七八九十]\.', r'(?=[一二三四五六七八九十]\.)') # 匹配 '一.'、'二.' 等
]
# 初始化用于保存最早匹配到的模式及其位置
first_match = None
first_match_position = len(value) # 初始值设为文本长度,确保任何匹配都会更新它
# 遍历所有模式,找到第一个出现的位置
for search_pattern, split_pattern_candidate in patterns:
match = re.search(search_pattern, value)
if match:
# 如果这个匹配的位置比当前记录的更靠前,更新匹配信息
if match.start() < first_match_position:
first_match = split_pattern_candidate
first_match_position = match.start()
# 如果找到了最早出现的匹配模式,使用它来分割文本
if first_match:
blocks = re.split(first_match, value)
else:
# 如果没有匹配的模式,保留原文本
blocks = [value]
processed_blocks = []
for block in blocks:
if not block:
continue
# 计算中英文字符总数如果大于50则加入列表
if block and len(re.findall(r'[\u4e00-\u9fff\w]', block)) >= 50:
processed_blocks.append(block.strip())
else:
# 如果发现有块长度小于50返回原数据
return value
# 如果所有的块都符合条件,返回分割后的列表
return processed_blocks
def process_nested_data(data):
"""
递归处理嵌套的数据结构(字典和列表)。
对最内层的字符串值应用 post_process 函数。
post_process 函数尝试将长字符串按特定模式分割成块每块至少包含50个中英文字符。
如果字典中所有值都是 """/" 或空列表,则返回''的列表。
"""
# 先检查是否所有值都是 ""、"/" 或空列表
if isinstance(data, dict) and all(v == "" or v == "/" or (isinstance(v, list) and not v) for v in data.values()):
return list(data.keys())
# 递归遍历字典,处理最内层的字符串
if isinstance(data, dict):
# 如果当前项是字典,继续递归遍历其键值对
result = {}
for key, value in data.items():
processed_value = process_nested_data(value)
# 如果处理后的值是只有一个元素的列表,就直接使用该元素
if isinstance(processed_value, list) and len(processed_value) == 1:
result[key] = processed_value[0]
else:
result[key] = processed_value
return result
elif isinstance(data, list):
# 如果是列表,直接返回列表,保持原样
return data
else:
# 到达最内层,处理非字典和非列表的元素(字符串)
return post_process(data)