2024-09-13 15:03:55 +08:00
|
|
|
|
# -*- encoding:utf-8 -*-
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
from flask_app.main.多线程提问 import multi_threading
|
|
|
|
|
from flask_app.main.通义千问long import upload_file, qianwen_long
|
|
|
|
|
from flask_app.main.json_utils import clean_json_string, combine_json_results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_key_paths(data, parent_key=''):
|
|
|
|
|
key_paths = []
|
2024-09-13 16:05:16 +08:00
|
|
|
|
no_keys_added = True # 默认假设没有添加任何键
|
2024-09-13 15:03:55 +08:00
|
|
|
|
|
|
|
|
|
for key, value in data.items():
|
|
|
|
|
current_key = f"{parent_key}.{key}" if parent_key else key
|
|
|
|
|
if isinstance(value, dict):
|
|
|
|
|
if value:
|
2024-09-13 16:05:16 +08:00
|
|
|
|
# 递归调用,并更新 no_keys_added 状态
|
|
|
|
|
sub_paths, sub_no_keys_added = generate_key_paths(value, current_key)
|
|
|
|
|
key_paths.extend(sub_paths)
|
|
|
|
|
no_keys_added = no_keys_added and sub_no_keys_added
|
2024-09-13 15:03:55 +08:00
|
|
|
|
else:
|
2024-09-13 16:05:16 +08:00
|
|
|
|
# 空字典也视为未添加键
|
2024-09-13 15:03:55 +08:00
|
|
|
|
key_paths.append(current_key)
|
2024-09-13 16:05:16 +08:00
|
|
|
|
no_keys_added = False
|
2024-09-13 15:03:55 +08:00
|
|
|
|
elif isinstance(value, list):
|
2024-09-13 16:05:16 +08:00
|
|
|
|
# 列表只在非空时视为添加了键
|
|
|
|
|
if value:
|
2024-09-13 15:03:55 +08:00
|
|
|
|
key_paths.append(current_key)
|
2024-09-13 16:05:16 +08:00
|
|
|
|
no_keys_added = False
|
|
|
|
|
else:
|
|
|
|
|
no_keys_added = False
|
|
|
|
|
elif value == "未知" or value == "" or value == "/":
|
|
|
|
|
# 处理为"未知"或空字符串
|
2024-09-13 15:03:55 +08:00
|
|
|
|
key_paths.append(current_key)
|
2024-09-13 16:05:16 +08:00
|
|
|
|
no_keys_added = False
|
|
|
|
|
else:
|
|
|
|
|
# 值不是字典、列表、"未知"或空字符串,也不添加到键路径
|
|
|
|
|
no_keys_added = True and no_keys_added # 只保持 True 如果之前所有键都未添加
|
2024-09-13 15:03:55 +08:00
|
|
|
|
|
2024-09-13 16:05:16 +08:00
|
|
|
|
return key_paths, no_keys_added
|
2024-09-13 15:03:55 +08:00
|
|
|
|
|
|
|
|
|
def combine_and_update_results(original_data, updates):
|
|
|
|
|
def recursive_update(data, key, value):
|
|
|
|
|
# 处理点分隔的键,递归定位并更新嵌套字典
|
|
|
|
|
keys = key.split('.')
|
|
|
|
|
for k in keys[:-1]:
|
|
|
|
|
data = data.setdefault(k, {})
|
|
|
|
|
if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict):
|
|
|
|
|
data[keys[-1]] = {**data.get(keys[-1], {}), **value}
|
|
|
|
|
else:
|
|
|
|
|
data[keys[-1]] = value
|
|
|
|
|
for key, value in updates.items():
|
|
|
|
|
recursive_update(original_data, key, value)
|
|
|
|
|
return original_data
|
|
|
|
|
def postprocess(data):
|
2024-09-13 16:05:16 +08:00
|
|
|
|
"""递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}' 或 '未知'"""
|
|
|
|
|
def convert_dict(value):
|
|
|
|
|
# 如果所有值是'/', '{}' 或 '未知'
|
2024-09-13 15:03:55 +08:00
|
|
|
|
if all(v in ['/', '未知', {}] for v in value.values()):
|
2024-09-13 16:05:16 +08:00
|
|
|
|
return list(value.keys())
|
|
|
|
|
else:
|
|
|
|
|
# 如果不满足条件,则递归处理嵌套的字典
|
|
|
|
|
return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()}
|
|
|
|
|
|
|
|
|
|
# 递归处理顶层数据
|
|
|
|
|
return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()}
|
|
|
|
|
def get_technical_requirements(truncate_file):
|
|
|
|
|
user_query1 = "这是一份货物标中采购要求部分的内容,请告诉我需要采购的系统(货物),如果有采购清单,请直接根据清单上的货物名称给出结果,若没有采购清单,你要从文中摘取需要采购的系统(货物),采购需求中可能包含层次关系,如大系统中包含若干子系统,你需要保留这种层次关系,给出系统(货物)名称,请以json格式返回,外层键名为\"采购需求\",嵌套键名为对应的系统名称或货物名称,需与原文保持一致,无需给出采购数量和单位,如有未知内容,在对应键值处填\"未知\"。"
|
|
|
|
|
file_id = upload_file(truncate_file)
|
|
|
|
|
res = qianwen_long(file_id, user_query1)
|
|
|
|
|
print(res)
|
|
|
|
|
cleaned_res = clean_json_string(res)
|
|
|
|
|
keys_list ,no_keys_added= generate_key_paths(cleaned_res['采购需求']) # 提取需要采购的货物清单
|
|
|
|
|
if no_keys_added:
|
|
|
|
|
final_res = postprocess(cleaned_res['采购需求'])
|
|
|
|
|
else:
|
|
|
|
|
user_query_template = "这是一份货物标中采购要求部分的内容,请你给出\"{}\"的技术参数(或采购要求)和数量,请以json格式返回结果,外层键名为\"{}\", 键值对中的键是你对该要求的总结,而值需要完全与原文保持一致,不可擅自总结删减。"
|
|
|
|
|
queries = []
|
|
|
|
|
for key in keys_list:
|
|
|
|
|
# 替换 user_query2 中的 "网络硬盘录像机" 为当前 key
|
|
|
|
|
new_query = user_query_template.format(key, key)
|
|
|
|
|
print(new_query)
|
|
|
|
|
queries.append(new_query)
|
|
|
|
|
results = multi_threading(queries, "", file_id, 2)
|
|
|
|
|
technical_requirements = []
|
|
|
|
|
if not results:
|
|
|
|
|
print("errror!未获得大模型的回答!")
|
|
|
|
|
else:
|
|
|
|
|
# 打印结果
|
|
|
|
|
for question, response in results:
|
|
|
|
|
technical_requirements.append(response)
|
|
|
|
|
technical_requirements_combined_res = combine_json_results(technical_requirements)
|
|
|
|
|
"""根据所有键是否已添加处理技术要求"""
|
|
|
|
|
# 更新原始采购需求字典
|
|
|
|
|
combine_and_update_results(cleaned_res['采购需求'], technical_requirements_combined_res)
|
|
|
|
|
final_res = postprocess(cleaned_res['采购需求'])
|
|
|
|
|
print("更新后的采购需求处理完成.")
|
|
|
|
|
# 输出最终的 JSON 字符串
|
|
|
|
|
json_string = json.dumps(final_res, ensure_ascii=False, indent=4)
|
|
|
|
|
return json_string
|
|
|
|
|
|
2024-09-13 15:03:55 +08:00
|
|
|
|
def test_all_files_in_folder(input_folder, output_folder):
|
|
|
|
|
# 确保输出文件夹存在
|
|
|
|
|
if not os.path.exists(output_folder):
|
|
|
|
|
os.makedirs(output_folder)
|
|
|
|
|
|
|
|
|
|
# 遍历指定文件夹中的所有文件
|
|
|
|
|
for filename in os.listdir(input_folder):
|
|
|
|
|
file_path = os.path.join(input_folder, filename)
|
|
|
|
|
# 检查是否是文件
|
|
|
|
|
if os.path.isfile(file_path):
|
|
|
|
|
print(f"处理文件: {file_path}")
|
|
|
|
|
# 调用函数处理文件
|
|
|
|
|
try:
|
|
|
|
|
json_result = get_technical_requirements(file_path)
|
|
|
|
|
# 定义输出文件的路径
|
|
|
|
|
output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '.json')
|
|
|
|
|
# 保存JSON结果到文件
|
|
|
|
|
with open(output_file_path, 'w', encoding='utf-8') as json_file:
|
|
|
|
|
json_file.write(json_result)
|
|
|
|
|
print(f"结果已保存到: {output_file_path}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"处理文件 {file_path} 时出错: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2024-09-13 16:05:16 +08:00
|
|
|
|
truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\招标文件(2)_procurement.pdf"
|
2024-09-13 15:03:55 +08:00
|
|
|
|
res=get_technical_requirements(truncate_file)
|
|
|
|
|
print(res)
|
|
|
|
|
# input_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output1"
|
|
|
|
|
# output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output3"
|
|
|
|
|
# test_all_files_in_folder(input_folder, output_folder)
|