zbparse/flask_app/货物标/技术服务商务要求提取.py
2024-09-13 15:03:55 +08:00

129 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import os
from flask_app.main.多线程提问 import multi_threading
from flask_app.main.通义千问long import upload_file, qianwen_long
from flask_app.main.json_utils import clean_json_string, combine_json_results
def generate_key_paths(data, parent_key=''):
key_paths = []
for key, value in data.items():
current_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
if value:
# 检查字典中的值是否为字典、列表或字符串'未知'
contains_dict_list_or_unknown = any(isinstance(v, (dict, list)) or v == "未知" for v in value.values())
if contains_dict_list_or_unknown:
# 递归生成键路径
sub_paths = generate_key_paths(value, current_key)
if sub_paths:
# 如果子路径非空,则扩展
key_paths.extend(sub_paths)
else:
# 当前字典内部为空或值全为"未知"
key_paths.append(current_key)
else:
# 字典中所有值都不是字典、列表或"未知",添加当前键
key_paths.append(current_key)
else:
# 空字典,直接添加键路径
key_paths.append(current_key)
elif isinstance(value, list):
# 列表类型,添加包含列表的键的路径
if value: # 只有当列表非空时才添加
key_paths.append(current_key)
elif value == "未知":
# 值为"未知",添加键路径
key_paths.append(current_key)
return key_paths
def get_technical_requirements(truncate_file):
user_query1 = "这是一份货物标中采购要求部分的内容请告诉我需要采购的系统货物如果有采购清单请直接根据清单上的货物名称给出结果若没有采购清单你要从文中摘取需要采购的系统货物采购需求中可能包含层次关系如大系统中包含若干子系统你需要保留这种层次关系给出系统货物名称请以json格式返回外层键名为\"采购需求\",嵌套键名为对应的系统名称或货物名称,需与原文保持一致,无需给出采购数量和单位,如有未知内容,在对应键值处填\"未知\""
file_id = upload_file(truncate_file)
res = qianwen_long(file_id, user_query1)
print(res)
cleaned_res = clean_json_string(res)
keys_list = generate_key_paths(cleaned_res['采购需求']) # 提取需要采购的货物清单
user_query_template = "这是一份货物标中采购要求部分的内容,请你给出\"{}\"的技术参数或采购要求和数量请以json格式返回结果外层键名为\"{}\", 键值对中的键是你对该要求的总结,而值需要完全与原文保持一致,不可擅自总结删减。"
queries = []
for key in keys_list:
# 替换 user_query2 中的 "网络硬盘录像机" 为当前 key
new_query = user_query_template.format(key, key)
print(new_query)
queries.append(new_query)
results = multi_threading(queries, "", file_id, 2)
technical_requirements = []
if not results:
print("errror!未获得大模型的回答!")
else:
# 打印结果
for question, response in results:
technical_requirements.append(response)
technical_requirements_combined_res = combine_json_results(technical_requirements)
"""根据所有键是否已添加处理技术要求"""
# 更新原始采购需求字典
combine_and_update_results(cleaned_res['采购需求'], technical_requirements_combined_res)
final_res = postprocess(cleaned_res['采购需求'])
print("更新后的采购需求处理完成.")
# 输出最终的 JSON 字符串
json_string = json.dumps(final_res, ensure_ascii=False, indent=4)
return json_string
def combine_and_update_results(original_data, updates):
def recursive_update(data, key, value):
# 处理点分隔的键,递归定位并更新嵌套字典
keys = key.split('.')
for k in keys[:-1]:
data = data.setdefault(k, {})
if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict):
data[keys[-1]] = {**data.get(keys[-1], {}), **value}
else:
data[keys[-1]] = value
for key, value in updates.items():
recursive_update(original_data, key, value)
return original_data
def postprocess(data):
"""转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'"""
for key, value in data.items():
if all(v in ['/', '未知', {}] for v in value.values()):
data[key] = list(value.keys())
return data
def test_all_files_in_folder(input_folder, output_folder):
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 遍历指定文件夹中的所有文件
for filename in os.listdir(input_folder):
file_path = os.path.join(input_folder, filename)
# 检查是否是文件
if os.path.isfile(file_path):
print(f"处理文件: {file_path}")
# 调用函数处理文件
try:
json_result = get_technical_requirements(file_path)
# 定义输出文件的路径
output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '.json')
# 保存JSON结果到文件
with open(output_file_path, 'w', encoding='utf-8') as json_file:
json_file.write(json_result)
print(f"结果已保存到: {output_file_path}")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
if __name__ == "__main__":
truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\招标文件(107国道)_procurement.pdf"
res=get_technical_requirements(truncate_file)
print(res)
# input_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output1"
# output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output3"
# test_all_files_in_folder(input_folder, output_folder)