zbparse/flask_app/货物标/技术参数要求提取.py
2024-10-25 17:50:20 +08:00

187 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import os
from flask_app.general.多线程提问 import multi_threading
from flask_app.general.通义千问long import qianwen_long, upload_file
from flask_app.general.json_utils import clean_json_string, combine_json_results
from flask_app.货物标.截取pdf货物标版 import truncate_pdf_main
def generate_key_paths(data, parent_key=''):
"""
生成嵌套字典中的键路径,并提取最内层的键名。
参数:
data (dict): 输入的字典数据
parent_key (str): 上级键路径,用于递归调用
返回:
tuple: 包含键路径列表和最内层键名列表的元组
(key_paths, good_list)
"""
key_paths = []
good_list = []
no_keys_added = True # 默认假设没有添加任何键
for key, value in data.items():
# 构建当前的键路径
current_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
if value:
# 递归调用,并获取子路径和子 good_list
sub_key_paths, sub_good_list, sub_no_keys_added = generate_key_paths(value, current_key)
key_paths.extend(sub_key_paths)
good_list.extend(sub_good_list)
no_keys_added = no_keys_added and sub_no_keys_added
else:
# 空字典视为叶子节点
key_paths.append(current_key)
good_list.append(key)
no_keys_added = False
elif isinstance(value, list):
if value:
# 非空列表视为叶子节点
key_paths.append(current_key)
good_list.append(key)
no_keys_added = False
else:
# 空列表也视为叶子节点(根据需求可以调整)
key_paths.append(current_key)
good_list.append(key)
no_keys_added = False
elif value in {"未知", "", "/"}:
# 特定值视为叶子节点
key_paths.append(current_key)
good_list.append(key)
no_keys_added = False
else:
# 其他情况不视为叶子节点
key_paths.append(current_key)
good_list.append(key)
no_keys_added = False
return key_paths, good_list, no_keys_added
def combine_and_update_results(original_data, updates):
def recursive_update(data, key, value):
# 处理点分隔的键,递归定位并更新嵌套字典
keys = key.split('.')
for k in keys[:-1]:
data = data.setdefault(k, {})
if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict):
data[keys[-1]] = {**data.get(keys[-1], {}), **value}
else:
data[keys[-1]] = value
for key, value in updates.items():
recursive_update(original_data, key, value)
return original_data
def postprocess(data):
"""递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'"""
def convert_dict(value):
# 如果所有值是'/', '{}' 或 '未知'
if all(v in ['/', '未知', {}] for v in value.values()):
return list(value.keys())
else:
# 如果不满足条件,则递归处理嵌套的字典
return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()}
# 递归处理顶层数据
return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()}
def get_technical_requirements(file_id):
user_query1 = """
这是一份货物标中采购要求部分的内容请告诉我需要采购的系统或货物如果有采购清单请直接根据清单上的货物名称给出结果若没有采购清单你要从文中摘取需要采购的系统或货物采购需求中可能包含层次关系如某大系统中包含若干货物那么需要用嵌套键值对表示这种关系请以json格式返回最外层键名为'采购需求',嵌套键名为对应的系统名称或货物名称,需与原文保持一致,无需给出采购数量和单位,如有未知内容,在对应键值处填'未知'。以下为示例输出:
{
"采购需求": {
"交通信号灯": {},
"交通监控视频子系统": {
"高清视频抓拍像机":{},
"补光灯":{}
},
"交通诱导子系统": {},
"电子警察子系统": {},
}
}
"""
res = qianwen_long(file_id, user_query1)
cleaned_res = clean_json_string(res) #转字典
# print(res)
keys_list,good_list,no_keys_added= generate_key_paths(cleaned_res['采购需求']) # 提取需要采购的货物清单
if '采购需求' in cleaned_res:
cleaned_res['技术要求'] = cleaned_res.pop('采购需求')
if no_keys_added:
final_res = postprocess(cleaned_res)
else:
user_query_template = "这是一份货物标中采购要求部分的内容,请你给出\"{}\"的技术参数或采购要求请以json格式返回结果外层键名为\"{}\", 键值对中的键是你对该要求的总结,而值需要完全与原文保持一致,不可擅自总结删减。"
queries = []
for key in keys_list:
# 将键中的 '.' 替换为 '下的'
modified_key = key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
new_query = user_query_template.format(modified_key, key)
queries.append(new_query)
results = multi_threading(queries, "", file_id, 2)
technical_requirements = []
if not results:
print("errror!未获得大模型的回答!")
else:
# 打印结果
for question, response in results:
technical_requirements.append(response)
technical_requirements_combined_res = combine_json_results(technical_requirements)
"""根据所有键是否已添加处理技术要求"""
# 更新原始采购需求字典
combine_and_update_results(cleaned_res['技术要求'], technical_requirements_combined_res)
final_res = postprocess(cleaned_res)
final_res['技术要求']["货物列表"] = good_list #添加需要采购的货物
# 输出最终的 JSON 字符串
return final_res
def test_all_files_in_folder(input_folder, output_folder):
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 遍历指定文件夹中的所有文件
for filename in os.listdir(input_folder):
file_path = os.path.join(input_folder, filename)
# 检查是否是文件
if os.path.isfile(file_path):
print(f"处理文件: {file_path}")
# 调用函数处理文件
try:
json_result = get_technical_requirements(file_path)
# 定义输出文件的路径
output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '.json')
# 保存JSON结果到文件
with open(output_file_path, 'w', encoding='utf-8') as json_file:
json.dump(json_result, json_file, ensure_ascii=False, indent=4)
print(f"结果已保存到: {output_file_path}")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
def get_technical_requirements_main(file_path,output_folder):
truncate_file=truncate_pdf_main(file_path,output_folder,1)[0]
print(truncate_file)
file_id=upload_file(truncate_file)
final_res=get_technical_requirements(file_id)
# 安全地提取 "技术要求" 内部的字典内容
if isinstance(final_res, dict) and '技术要求' in final_res and isinstance(final_res['技术要求'], dict):
return final_res['技术要求']
else:
return final_res
if __name__ == "__main__":
truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\广水农商行门禁控制主机及基础验证设备采购项目——磋商文件定稿三次_procurement.pdf"
# truncate_file="C:\\Users\\Administrator\\Desktop\\fsdownload\\217754b7-3efd-41b2-806b-0b5b1bc98904\\ztbfile.pdf"
output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp"
file_id = upload_file(truncate_file)
res=get_technical_requirements(file_id)
# res=get_technical_requirements_main(truncate_file,output_folder)
json_string = json.dumps(res, ensure_ascii=False, indent=4)
print(json_string)
# input_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output1"
# output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output3"
# test_all_files_in_folder(input_folder, output_folder)