2024-09-30 16:23:39 +08:00
|
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
#提取两个大标题之间的内容
|
|
|
|
|
def extract_between_sections(data, target_values):
|
|
|
|
|
target_found = False
|
|
|
|
|
extracted_data = {}
|
|
|
|
|
current_section_title = ""
|
|
|
|
|
section_pattern = re.compile(r'^[一二三四五六七八九十]+$') # 匹配 "一", "二", "三" 等大标题
|
|
|
|
|
current_block = {}
|
|
|
|
|
|
|
|
|
|
# 遍历所有键值对
|
|
|
|
|
for key, value in data.items():
|
|
|
|
|
# 只匹配形如 "一": "竞争性磋商响应文件" 的章节标题
|
|
|
|
|
if section_pattern.match(key):
|
|
|
|
|
if target_found:
|
|
|
|
|
# 如果已经找到了符合的章节,并且遇到了另一个章节
|
|
|
|
|
# 保存当前块并重置
|
|
|
|
|
if current_block:
|
|
|
|
|
extracted_data[current_section_title] = current_block
|
|
|
|
|
current_block = {}
|
|
|
|
|
target_found = False
|
|
|
|
|
|
|
|
|
|
# 检查当前标题是否包含 target_values 中的任意关键词
|
|
|
|
|
if any(tv in value for tv in target_values):
|
|
|
|
|
target_found = True # 找到了目标章节,开始捕获后续内容
|
|
|
|
|
current_section_title = value # 保存章节标题内容
|
|
|
|
|
|
|
|
|
|
elif target_found: # 只捕获目标值之后的内容
|
|
|
|
|
current_block[key] = value
|
|
|
|
|
|
|
|
|
|
# 保存最后一个块(如果有的话)
|
|
|
|
|
if current_block:
|
|
|
|
|
extracted_data[current_section_title] = current_block
|
|
|
|
|
|
|
|
|
|
return extracted_data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_with_outer_key(data):
|
|
|
|
|
processed_data = {}
|
|
|
|
|
|
|
|
|
|
# 遍历外层的键值对
|
|
|
|
|
for outer_key, inner_data in data.items():
|
|
|
|
|
# 调用 transform_json 函数对内层数据进行处理
|
|
|
|
|
processed_inner_data = transform_json(inner_data)
|
|
|
|
|
|
|
|
|
|
# 将处理后的数据保留在外层键下
|
|
|
|
|
processed_data[outer_key] = processed_inner_data
|
|
|
|
|
|
|
|
|
|
return processed_data
|
|
|
|
|
|
|
|
|
|
# 转换结构化的JSON数据
|
|
|
|
|
def transform_json(data):
|
|
|
|
|
result = {}
|
|
|
|
|
temp = {0: result} # 初始化根字典
|
|
|
|
|
|
|
|
|
|
# 首先,创建一个临时字典用于检查是否存在三级标题
|
|
|
|
|
has_subkey = {}
|
|
|
|
|
for key in data.keys():
|
|
|
|
|
parts = key.split('.')
|
|
|
|
|
if len(parts) > 2 and parts[1]:
|
|
|
|
|
parent_key = parts[0] + '.' + parts[1]
|
|
|
|
|
has_subkey[parent_key] = True
|
|
|
|
|
|
|
|
|
|
for key, value in data.items():
|
|
|
|
|
match = re.match(r'(\d+)(?:\.(\d+))?(?:\.(\d+))?', key)
|
|
|
|
|
if match:
|
|
|
|
|
levels = [int(l) for l in match.groups() if l is not None]
|
|
|
|
|
if (len(levels) - 1) in temp:
|
|
|
|
|
parent = temp[len(levels) - 1]
|
|
|
|
|
else:
|
|
|
|
|
print(f"No parent found at level {len(levels) - 1} for key '{key}'. Check the data structure.")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if len(levels) == 1: # 一级标题
|
|
|
|
|
# 新增逻辑:判断值中是否有 ':' 或 ':',并进行拆分
|
|
|
|
|
# 优先按 '\n' 拆分
|
|
|
|
|
if '\n' in value:
|
|
|
|
|
new_key, *new_value = value.split('\n', 1)
|
|
|
|
|
new_key = new_key.strip()
|
|
|
|
|
new_value = new_value[0].strip() if new_value else ""
|
|
|
|
|
# 如果没有 '\n',再检查 ':' 或 ':',并进行拆分
|
|
|
|
|
elif ':' in value or ':' in value:
|
|
|
|
|
delimiter = ':' if ':' in value else ':'
|
|
|
|
|
new_key, new_value = value.split(delimiter, 1)
|
|
|
|
|
new_key = new_key.strip()
|
|
|
|
|
new_value = new_value.strip()
|
|
|
|
|
else:
|
|
|
|
|
new_key = value.strip()
|
|
|
|
|
new_value = ""
|
|
|
|
|
|
|
|
|
|
parent[new_key] = {}
|
|
|
|
|
if new_value:
|
|
|
|
|
parent[new_key][new_key] = new_value # 使用 new_key 作为键名,而不是固定的 "content"
|
|
|
|
|
temp[len(levels)] = parent[new_key]
|
|
|
|
|
elif len(levels) == 2: # 二级标题
|
|
|
|
|
new_key, *new_value = value.split('\n', 1)
|
|
|
|
|
new_key = new_key.strip()
|
|
|
|
|
new_value = new_value[0].strip() if new_value else ""
|
|
|
|
|
|
|
|
|
|
if f"{levels[0]}.{levels[1]}" in has_subkey:
|
|
|
|
|
parent[new_key] = [new_value] if new_value else []
|
|
|
|
|
else:
|
|
|
|
|
parent[new_key] = new_value
|
|
|
|
|
|
|
|
|
|
temp[len(levels)] = parent[new_key]
|
|
|
|
|
else: # 三级标题
|
|
|
|
|
if isinstance(parent, dict):
|
|
|
|
|
parent_key = list(parent.keys())[-1]
|
|
|
|
|
if isinstance(parent[parent_key], list):
|
|
|
|
|
parent[parent_key].append(value)
|
|
|
|
|
elif parent[parent_key]:
|
|
|
|
|
parent[parent_key] = [parent[parent_key], value]
|
|
|
|
|
else:
|
|
|
|
|
parent[parent_key] = [value]
|
|
|
|
|
elif isinstance(parent, list):
|
|
|
|
|
parent.append(value)
|
|
|
|
|
|
|
|
|
|
def remove_single_item_lists(node):
|
|
|
|
|
if isinstance(node, dict):
|
|
|
|
|
for key in list(node.keys()):
|
|
|
|
|
node[key] = remove_single_item_lists(node[key])
|
|
|
|
|
if isinstance(node[key], list) and len(node[key]) == 1:
|
|
|
|
|
node[key] = node[key][0]
|
|
|
|
|
return node
|
|
|
|
|
|
|
|
|
|
return remove_single_item_lists(result)
|
|
|
|
|
|
|
|
|
|
#主要是处理键值中若存在若干序号且每个序号块的内容>=50字符的时候,用列表表示。
|
|
|
|
|
def post_process(value):
|
|
|
|
|
# 如果传入的是非字符串值,直接返回原值
|
|
|
|
|
if not isinstance(value, str):
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
# 定义可能的分割模式及其正则表达式
|
|
|
|
|
patterns = [
|
|
|
|
|
(r'\d+、', r'(?=\d+、)'), # 匹配 '1、'
|
|
|
|
|
(r'[((]\d+[))]', r'(?=[((]\d+[))])'), # 匹配 '(1)' 或 '(1)'
|
|
|
|
|
(r'\d+\.', r'(?=\d+\.)'), # 匹配 '1.'
|
|
|
|
|
(r'[一二三四五六七八九十]、', r'(?=[一二三四五六七八九十]、)'), # 匹配 '一、'、'二、' 等
|
|
|
|
|
(r'[一二三四五六七八九十]\.', r'(?=[一二三四五六七八九十]\.)') # 匹配 '一.'、'二.' 等
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# 初始化用于保存最早匹配到的模式及其位置
|
|
|
|
|
first_match = None
|
|
|
|
|
first_match_position = len(value) # 初始值设为文本长度,确保任何匹配都会更新它
|
|
|
|
|
|
|
|
|
|
# 遍历所有模式,找到第一个出现的位置
|
|
|
|
|
for search_pattern, split_pattern_candidate in patterns:
|
|
|
|
|
match = re.search(search_pattern, value)
|
|
|
|
|
if match:
|
|
|
|
|
# 如果这个匹配的位置比当前记录的更靠前,更新匹配信息
|
|
|
|
|
if match.start() < first_match_position:
|
|
|
|
|
first_match = split_pattern_candidate
|
|
|
|
|
first_match_position = match.start()
|
|
|
|
|
|
|
|
|
|
# 如果找到了最早出现的匹配模式,使用它来分割文本
|
|
|
|
|
if first_match:
|
|
|
|
|
blocks = re.split(first_match, value)
|
|
|
|
|
else:
|
|
|
|
|
# 如果没有匹配的模式,保留原文本
|
|
|
|
|
blocks = [value]
|
|
|
|
|
|
|
|
|
|
processed_blocks = []
|
|
|
|
|
for block in blocks:
|
|
|
|
|
if not block:
|
|
|
|
|
continue
|
|
|
|
|
# 计算中英文字符总数,如果大于50,则加入列表
|
|
|
|
|
if block and len(re.findall(r'[\u4e00-\u9fff\w]', block)) >= 50:
|
|
|
|
|
processed_blocks.append(block.strip())
|
|
|
|
|
else:
|
|
|
|
|
# 如果发现有块长度小于50,返回原数据
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
# 如果所有的块都符合条件,返回分割后的列表
|
|
|
|
|
return processed_blocks
|
|
|
|
|
|
|
|
|
|
# 递归地处理嵌套结构
|
|
|
|
|
def process_nested_data(data):
|
|
|
|
|
# 先检查是否所有值都是 ""、"/" 或空列表
|
|
|
|
|
if isinstance(data, dict) and all(v == "" or v == "/" or (isinstance(v, list) and not v) for v in data.values()):
|
|
|
|
|
return list(data.keys())
|
|
|
|
|
# 递归遍历字典,处理最内层的字符串
|
|
|
|
|
if isinstance(data, dict):
|
|
|
|
|
# 如果当前项是字典,继续递归遍历其键值对
|
|
|
|
|
result = {}
|
|
|
|
|
for key, value in data.items():
|
|
|
|
|
result[key] = process_nested_data(value) # 递归处理子项
|
|
|
|
|
return result
|
|
|
|
|
elif isinstance(data, list):
|
|
|
|
|
# 如果是列表,直接返回列表,保持原样
|
|
|
|
|
return data
|
|
|
|
|
else:
|
|
|
|
|
# 到达最内层,处理非字典和非列表的元素(字符串)
|
|
|
|
|
return post_process(data)
|
|
|
|
|
|
|
|
|
|
# 读取JSON数据,提取内容,转换结构,并打印结果
|
|
|
|
|
def extract_from_notice(clause_path, type):
|
2024-10-17 15:33:58 +08:00
|
|
|
|
"""
|
|
|
|
|
从公告中提取特定类型的内容。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
clause_path (str): 包含条款的JSON文件路径。
|
|
|
|
|
type (int): 提取的类型。
|
|
|
|
|
1 - ["投标文件","响应文件","响应性文件"]
|
|
|
|
|
2 - ["开标", "评标", "定标","磋商程序","中标","程序","步骤"]
|
|
|
|
|
3 - ["重新招标、不再招标和终止招标", "重新招标", "不再招标", "终止招标"]
|
|
|
|
|
4 - ["评标"] # 测试
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict 或 str: 提取并处理后的数据,或在 `clause_path` 为空或发生错误时返回空字符串 `""`。
|
|
|
|
|
"""
|
|
|
|
|
# 定义默认的返回结果
|
|
|
|
|
DEFAULT_RESULT = ""
|
|
|
|
|
|
|
|
|
|
# 判断 clause_path 是否为空
|
|
|
|
|
if not clause_path:
|
|
|
|
|
print("Warning: clause_path is empty.")
|
|
|
|
|
return DEFAULT_RESULT
|
|
|
|
|
|
|
|
|
|
# 根据 type 设置 target_values
|
2024-09-30 16:23:39 +08:00
|
|
|
|
if type == 1:
|
2024-10-17 15:33:58 +08:00
|
|
|
|
target_values = ["投标文件", "响应文件", "响应性文件"]
|
2024-09-30 16:23:39 +08:00
|
|
|
|
elif type == 2:
|
2024-10-17 15:33:58 +08:00
|
|
|
|
target_values = ["开标", "评标", "定标", "磋商程序", "中标", "程序", "步骤"]
|
2024-09-30 16:23:39 +08:00
|
|
|
|
elif type == 3:
|
|
|
|
|
target_values = ["重新招标、不再招标和终止招标", "重新招标", "不再招标", "终止招标"]
|
|
|
|
|
elif type == 4:
|
2024-10-17 15:33:58 +08:00
|
|
|
|
target_values = ["评标"] # 测试
|
2024-09-30 16:23:39 +08:00
|
|
|
|
else:
|
2024-10-17 15:33:58 +08:00
|
|
|
|
print(f"Error: Invalid type specified: {type}. Use 1, 2, 3, or 4.")
|
|
|
|
|
return DEFAULT_RESULT
|
|
|
|
|
try:
|
|
|
|
|
# 打开并读取JSON文件
|
|
|
|
|
with open(clause_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
data = json.load(file)
|
|
|
|
|
|
|
|
|
|
# 提取目标部分
|
2024-09-30 16:23:39 +08:00
|
|
|
|
extracted_data = extract_between_sections(data, target_values) # 读取json
|
2024-10-17 15:33:58 +08:00
|
|
|
|
transformed_data = process_with_outer_key(extracted_data)
|
|
|
|
|
final_result = process_nested_data(transformed_data)
|
2024-09-30 16:23:39 +08:00
|
|
|
|
return final_result
|
|
|
|
|
|
2024-10-17 15:33:58 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error in extract_from_notice: {e}")
|
|
|
|
|
return DEFAULT_RESULT
|
|
|
|
|
|
2024-09-30 16:23:39 +08:00
|
|
|
|
if __name__ == "__main__":
|
2024-10-16 20:18:55 +08:00
|
|
|
|
file_path = 'D:\\flask_project\\flask_app\\static\\output\\87f48f9c-e6ee-4dc1-a981-5a10085c4635\\tmp\\clause1.json'
|
|
|
|
|
# file_path = 'D:\\flask_project\\flask_app\\static\\output\\87f48f9c-e6ee-4dc1-a981-5a10085c4635\\clause1.json'
|
2024-09-30 16:23:39 +08:00
|
|
|
|
try:
|
2024-10-16 20:18:55 +08:00
|
|
|
|
res = extract_from_notice(file_path, 2) # 可以改变此处的 type 参数测试不同的场景
|
2024-09-30 16:23:39 +08:00
|
|
|
|
res2=json.dumps(res,ensure_ascii=False,indent=4)
|
|
|
|
|
print(res2)
|
|
|
|
|
except ValueError as e:
|
|
|
|
|
print(e)
|