# -*- encoding:utf-8 -*- import json import os from flask_app.general.多线程提问 import multi_threading from flask_app.general.通义千问long import qianwen_long, upload_file from flask_app.general.json_utils import clean_json_string, combine_json_results from flask_app.货物标.截取pdf货物标版 import truncate_pdf_main def generate_key_paths(data, parent_key=''): """ 生成嵌套字典中的键路径,并提取最内层的键名。 参数: data (dict): 输入的字典数据 parent_key (str): 上级键路径,用于递归调用 返回: tuple: 包含键路径列表和最内层键名列表的元组 (key_paths, good_list) """ key_paths = [] good_list = [] no_keys_added = True # 默认假设没有添加任何键 for key, value in data.items(): # 构建当前的键路径 current_key = f"{parent_key}.{key}" if parent_key else key if isinstance(value, dict): if value: # 递归调用,并获取子路径和子 good_list sub_key_paths, sub_good_list, sub_no_keys_added = generate_key_paths(value, current_key) key_paths.extend(sub_key_paths) good_list.extend(sub_good_list) no_keys_added = no_keys_added and sub_no_keys_added else: # 空字典视为叶子节点 key_paths.append(current_key.replace(" ","")) good_list.append(key.replace(" ", "")) # 去掉空格后添加 no_keys_added = False elif isinstance(value, list): if value: # 非空列表视为叶子节点 key_paths.append(current_key.replace(" ","")) good_list.append(key.replace(" ", "")) # 去掉空格后添加 no_keys_added = False else: # 空列表也视为叶子节点(根据需求可以调整) key_paths.append(current_key.replace(" ","")) good_list.append(key.replace(" ", "")) # 去掉空格后添加 no_keys_added = False elif value in {"未知", "", "/"}: # 特定值视为叶子节点 key_paths.append(current_key.replace(" ","")) good_list.append(key.replace(" ", "")) # 去掉空格后添加 no_keys_added = False else: # 其他情况不视为叶子节点 key_paths.append(current_key.replace(" ","")) good_list.append(key.replace(" ", "")) # 去掉空格后添加 no_keys_added = False return key_paths, good_list, no_keys_added def combine_and_update_results(original_data, updates): def recursive_update(data, key, value): # 处理点分隔的键,递归定位并更新嵌套字典 keys = key.split('.') for k in keys[:-1]: data = data.setdefault(k, {}) if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict): data[keys[-1]] = {**data.get(keys[-1], {}), **value} else: data[keys[-1]] = value for key, value in updates.items(): recursive_update(original_data, key, value) return original_data def postprocess(data): """递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}' 或 '未知'""" def convert_dict(value): # 如果所有值是'/', '{}' 或 '未知' if all(v in ['/', '未知', {}] for v in value.values()): return list(value.keys()) else: # 如果不满足条件,则递归处理嵌套的字典 return {k: convert_dict(v) if isinstance(v, dict) else v for k, v in value.items()} # 递归处理顶层数据 return {key: convert_dict(val) if isinstance(val, dict) else val for key, val in data.items()} def get_technical_requirements(file_id,invalid_path): first_query="该文档中是否说明了采购需求,即需要采购哪些货物?如果有,请回答'是',否则,回答'否'" judge_res=qianwen_long(file_id,first_query) # print(judge_res) if '否' in judge_res: file_id=upload_file(invalid_path) user_query1 = """ 请你首先定位该采购文件中的采购清单或采购需求部分,请告诉我需要采购的货物,如果有采购清单,请直接根据清单上的货物(或系统)名称给出结果,若没有采购清单,你要从表格中或文中摘取需要采购的系统(或货物),采购需求中可能包含层次关系,例如采购的某大系统中可能包含几种货物,那么你需要用嵌套键值对表示这种关系,且不要遗漏该系统中包含的货物,你的输出请以json格式返回,最外层键名为'采购需求',嵌套键名为对应的系统名称或货物名称,需与原文保持一致,无需给出采购数量和单位,如有未知内容,在对应键值处填'未知'。以下为示例输出: { "采购需求": { "门禁管理系统": {}, "交通监控视频子系统": { "高清视频抓拍像机":{}, "补光灯":{} }, "LED全彩显示屏": {}, "数字会议发言主机": {}, } } """ res = qianwen_long(file_id, user_query1) # print(res) cleaned_res = clean_json_string(res) #转字典 keys_list,good_list,no_keys_added= generate_key_paths(cleaned_res['采购需求']) # 提取需要采购的货物清单 if '采购需求' in cleaned_res: cleaned_res['技术要求'] = cleaned_res.pop('采购需求') if no_keys_added: final_res = postprocess(cleaned_res) else: # user_query_template = "请你根据该货物标中采购要求部分的内容,请你给出\"{}\"的技术参数(或采购要求),请以json格式返回结果,外层键名为\"{}\", 键值对中的键是你对该要求的总结,而值需要完全与原文保持一致,不可擅自总结删减。" user_query_template = """ 请你根据该货物标中采购要求部分的内容,请你给出\"{}\"的技术参数(或采购要求),请以json格式返回结果,键名为\"{}\", 键值为一个列表,列表中包含若干描述\"{}\"的技术参数(或采购要求)的字符串。示例输出格式如下: {{ "摄像机控制键盘": [ "支持串行 RS232/RS422 和 IP 混合控制,允许在一个控制器上使用 RS232/RS422/IP 控制单个系统中的摄像机;", "支持 2 组 RS422 串口 VISCA 协议菊花链控制 2x7 台摄像机。" ] }} """ queries = [] for key in keys_list: # 将键中的 '.' 替换为 '下的' modified_key = key.replace('.', '下的') # 使用修改后的键填充第一个占位符,原始键填充第二个占位符 new_query = user_query_template.format(modified_key, key,modified_key) queries.append(new_query) results = multi_threading(queries, "", file_id, 2) technical_requirements = [] if not results: print("errror!未获得大模型的回答!") else: # 打印结果 for question, response in results: technical_requirements.append(response) technical_requirements_combined_res = combine_json_results(technical_requirements) """根据所有键是否已添加处理技术要求""" # 更新原始采购需求字典 combine_and_update_results(cleaned_res['技术要求'], technical_requirements_combined_res) final_res = postprocess(cleaned_res) final_res['技术要求']["货物列表"] = good_list #添加需要采购的货物 # 输出最终的 JSON 字符串 return final_res def test_all_files_in_folder(input_folder, output_folder): # 确保输出文件夹存在 if not os.path.exists(output_folder): os.makedirs(output_folder) # 遍历指定文件夹中的所有文件 for filename in os.listdir(input_folder): file_path = os.path.join(input_folder, filename) # 检查是否是文件 if os.path.isfile(file_path): print(f"处理文件: {file_path}") # 调用函数处理文件 try: json_result = get_technical_requirements(file_path) # 定义输出文件的路径 output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '.json') # 保存JSON结果到文件 with open(output_file_path, 'w', encoding='utf-8') as json_file: json.dump(json_result, json_file, ensure_ascii=False, indent=4) print(f"结果已保存到: {output_file_path}") except Exception as e: print(f"处理文件 {file_path} 时出错: {e}") if __name__ == "__main__": # # truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\bf225a5e-16d0-45c8-8c19-54a1a94cf3e2\\ztbfile_procurement.docx" # # truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\zbfilesdocx\\招标文件(107国道).docx" # invalid_path="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.pdf" # truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile_procurement.docx" # output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp" # file_id = upload_file(truncate_file) invalid_path="C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\包头市公安支队机动车查验监管系统招标文201907.pdf" file_id="file-fe-FcOjv4FiOGjHRG1pKaFrIBeG" res=get_technical_requirements(file_id,invalid_path) json_string = json.dumps(res, ensure_ascii=False, indent=4) print(json_string) # # input_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output1" # # output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output3" # # test_all_files_in_folder(input_folder, output_folder)