# -*- encoding:utf-8 -*- import json import os import re import time from collections import defaultdict from copy import deepcopy from flask_app.general.file2markdown import convert_pdf_to_markdown from flask_app.general.format_change import pdf2docx from flask_app.general.多线程提问 import multi_threading from flask_app.general.通义千问long import qianwen_long, upload_file from flask_app.general.json_utils import clean_json_string, combine_json_results from flask_app.货物标.截取pdf货物标版 import truncate_pdf_main from flask_app.general.doubao import doubao_model, generate_full_user_query, pdf2txt, read_txt_to_string from flask_app.货物标.技术参数要求提取后处理函数 import postprocess, all_postprocess def truncate_system_keys(data): """ 遍历输入的字典,若键名中包含'系统'或'软件',且其子键数量 >= 3,则将其子键的值设置为[],限制深度为一层。 Args: data (dict or list): 输入的数据,可以是嵌套的字典或列表。 Returns: dict or list: 处理后的数据。 """ if isinstance(data, dict): new_data = {} for key, value in data.items(): # 检查键名是否包含'系统'或'软件' if '系统' in key or '软件' in key: if isinstance(value, dict): child_count = len(value) # 判断子键数量是否 >= 3 if child_count >= 3: # 截断子键,将其值设置为 [] new_data[key] = {subkey: [] for subkey in value.keys()} else: # 子键数量少于3,递归处理子键 new_data[key] = truncate_system_keys(value) else: # 如果值不是字典(例如列表),直接设置为 [] new_data[key] = [] else: # 不包含'系统'或'软件',递归处理子字典或列表 new_data[key] = truncate_system_keys(value) return new_data elif isinstance(data, list): # 如果是列表,递归处理列表中的每个元素 return [truncate_system_keys(item) for item in data] else: # 对于其他类型的数据,保持不变 return data def generate_key_paths(data): """ 处理输入的字典,生成 key_paths, grouped_paths 和 good_list,并根据条件修改原始字典。 参数: data (dict): 输入的嵌套字典。 返回: tuple: 包含 key_paths, grouped_paths 和 good_list 的元组。 """ # 编译用于匹配后缀的正则表达式模式 pattern = re.compile(r'(.+)-\d+$') # 初始化结果列表和字典 key_paths = [] grouped_counts = defaultdict(int) # 用于记录每个 grouped_path 的数量 good_list = [] def recurse(current_dict, path): """ 递归遍历字典,处理 key_paths 和 grouped_paths,并收集 good_list 和 grouped_counts。 参数: current_dict (dict): 当前遍历的字典。 path (list): 当前路径的键列表。 """ # 第一遍遍历,统计每个基名的出现次数 base_name_count = {} base_names = {} for key in current_dict.keys(): match = pattern.match(key) if match: base = match.group(1) else: base = key base_names[key] = base base_name_count[base] = base_name_count.get(base, 0) + 1 # 第二遍遍历,根据基名的出现次数分类 keys_to_rename = {} for key, base in base_names.items(): if base_name_count[base] == 1: # 检查是否是最内层(值为列表) value = current_dict[key] if isinstance(value, list): current_path = '.'.join(path + [base]) key_paths.append(current_path) # 收集 good_list,保持顺序且不重复 if base not in good_list: good_list.append(base) # 如果原键名有后缀,需要记录以便后续重命名 if key != base: keys_to_rename[key] = base elif isinstance(value, dict): # 继续递归处理 recurse(value, path + [base]) else: # 记录分组路径,并统计数量 grouped_path = '.'.join(path + [base]) grouped_counts[grouped_path] += 1 # 执行键名的重命名,同时保持原有顺序 if keys_to_rename: new_ordered_dict = {} for key in current_dict.keys(): if key in keys_to_rename: new_key = keys_to_rename[key] new_ordered_dict[new_key] = current_dict[key] else: new_ordered_dict[key] = current_dict[key] current_dict.clear() current_dict.update(new_ordered_dict) # 对于基名重复的键,继续递归(如果值是字典) for key, base in base_names.items(): if base_name_count[base] > 1: value = current_dict[key] if isinstance(value, dict): recurse(value, path + [base]) elif isinstance(value, list): # 如果值是列表,仍需收集基名到 good_list if base not in good_list: good_list.append(base) # 深拷贝数据以避免修改原始输入 data_copy = deepcopy(data) # 开始递归遍历 recurse(data_copy, []) def collect_grouped_paths(current_dict, path, collected): for key in current_dict.keys(): match = pattern.match(key) if match: base = match.group(1) else: base = key current_path = '.'.join(path + [base]) if current_path in grouped_counts and current_path not in collected: collected.append(current_path) value = current_dict[key] if isinstance(value, dict): collect_grouped_paths(value, path + [base], collected) collected_grouped_paths = [] collect_grouped_paths(data_copy, [], collected_grouped_paths) # 将 grouped_paths 转换为包含数量的字典列表 grouped_paths = [{path: grouped_counts[path]} for path in collected_grouped_paths] return key_paths, grouped_paths, good_list, data_copy def rename_keys(data): """ 对整个数据结构进行重命名处理。 """ def rename_keys_recursive(current_dict): """ 递归地重名字典中的键,确保同一层级下具有相同基名的键被正确编号。 """ if not isinstance(current_dict, dict): return current_dict key_order = list(current_dict.keys()) base_name_dict = defaultdict(list) # 辅助函数:提取基名(去除可能的 -数字 后缀) def get_base_name(key): if '-' in key: parts = key.rsplit('-', 1) if parts[1].isdigit(): return parts[0] return key # 将键按基名分组 for key in key_order: base = get_base_name(key) base_name_dict[base].append(key) new_dict = {} for key in key_order: base = get_base_name(key) keys = base_name_dict[base] if len(keys) > 1: # 如果存在同基名的多个键,则进行重命名 if base not in new_dict: # 按原始顺序对需要重命名的键进行排序 sorted_keys = sorted(keys, key=lambda x: key_order.index(x)) for idx, original_key in enumerate(sorted_keys, start=1): new_key = f"{base}-{idx}" # 如果值是字典,递归处理 if isinstance(current_dict[original_key], dict): new_dict[new_key] = rename_keys_recursive(current_dict[original_key]) else: new_dict[new_key] = current_dict[original_key] else: # 如果没有重复的基名,保持原名 if isinstance(current_dict[key], dict): new_dict[key] = rename_keys_recursive(current_dict[key]) else: new_dict[key] = current_dict[key] return new_dict # 对整个数据结构进行递归重命名 return rename_keys_recursive(data) def combine_and_update_results(original_data, updates): """ 先规范化original和updates中的字典,防止空格的情况导致匹配不上无法更新 """ def normalize_key(key): """ 规范化键名: - 替换全角点号为半角点号。 - 删除所有空格(包括半角和全角)。 """ # 替换全角点号(.、。)为半角点号(.) key = key.replace('.', '.').replace('。', '.') # 删除所有空格(半角空格和全角空格) key = key.replace(' ', '').replace('\u3000', '') return key def normalize_original_data(d): """ 递归规范化原始数据字典的键。 """ if not isinstance(d, dict): return d normalized = {} for k, v in d.items(): nk = normalize_key(k) normalized[nk] = normalize_original_data(v) return normalized def normalize_update_value(value): """ 递归规范化更新字典中嵌套的字典的键。 """ if isinstance(value, dict): return {normalize_key(k): normalize_update_value(v) for k, v in value.items()} else: return value def recursive_update(data, key, value): """ 递归更新嵌套字典。 """ keys = key.split('.') for k in keys[:-1]: data = data.setdefault(k, {}) if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict): data[keys[-1]] = {**data.get(keys[-1], {}), **value} else: data[keys[-1]] = value # 1. 规范化原始数据字典的键 original_data = normalize_original_data(original_data) # 2. 规范化更新字典的键 normalized_updates = {} for key, value in updates.items(): nk = normalize_key(key) nv = normalize_update_value(value) normalized_updates[nk] = nv # 3. 执行递归更新 for key, value in normalized_updates.items(): recursive_update(original_data, key, value) return original_data def generate_prompt(judge_res, full_text=None): """ 获取需要采购的货物名称 根据 `judge_res` 和 `full_text` 动态生成 prompt。 如果 `judge_res` 包含 '否',则不添加文件内容部分。 如果 `judge_res` 不包含 '否' 且有 `full_text`,则添加文件内容部分。 """ base_prompt = ''' 任务:你负责解析采购文件,提取采购需求,并以JSON格式返回,不要遗漏该项目需要采购的货物(或系统)。 要求与指南: 1. 精准定位:请运用文档理解能力,定位文件中的采购需求部分。若有采购清单,请直接根据采购清单上的货物(或系统)名称给出结果,注意你无需提取诸如'说明'、'规格'、'技术参数'、'描述'等列的内容,即你不需要给出详细的采购要求,仅返回采购的货物或系统或模块名称;若没有采购清单,则从表格或文本中摘取采购信息。 2. 采购目标:采购种类通常有硬件(如设备、货物)和软件(如系统软件、应用APP),一次采购活动可以同时包含这两种类型。 3. 系统归属:一些采购活动可能将采购目标划分为若干系统和货物,每个系统可能包含若干货物,则将这些货物名称作为该系统的二级键,注意这种包含关系是通过表格结构或文档标题层次来得出的;系统可以只包含总体'系统功能'而无货物。 5. 软件需求:对于软件应用或系统软件采购,若有多个系统且序号分明,请不要遗漏,最多仅需列出系统模块构成(若有),并作为该系统键值的一部分,无需在模块下再细分功能。 5. 系统功能:若采购的某系统提及总体系统功能,则在系统值中添加'系统功能'二级键,不展开具体内容。 6. 完整性:确保不遗漏系统内的货物,也不添加未提及的内容。若'采购清单'中未提取的货物(或系统)名称在形如'主要设备功能指标'的标题下有详细参数指标要求,请将该货物名也添加至返回中。 特殊情况: 1.若采购的货物或系统或模块名称前存在三角▲,△、五角★,☆,注意是名称前而非具体的技术参数或采购要求前,在返回名称时请保留前面的▲,△或★,☆符号,如'★高清摄像机'。 2.若同一层级(如同一系统中)下存在同名但采购要求不同的货物,请以'货物名-编号'区分,编号从1递增,例如若同层级下存在两种型号的交换机,那么命名分别是'交换机-1'和'交换机-2',以规避重复键名;否则无需在名称后添加编号。 输出格式: 1.JSON格式,最外层键名为'采购需求'。 2.层次关系用嵌套键值对表示。 3.嵌套键名为系统或货物或模块名称,与原文保持一致。 4.最内层键值应为空列表[]。 示例输出1,普通系统、货物类采购: {{ "采购需求": {{ "交换机-1": [], "交换机-2": [], "门禁管理系统": {{ "系统功能":[] }}, "交通监控视频子系统": {{ "系统功能": [], "交换机":[], "高清视频抓拍像机": [], "补光灯": [] }}, "LED全彩显示屏": [] // 其他系统和货物 }} }} 示例输出2,软件系统类采购: {{ "采购需求": {{ "信息管理系统": {{ "通用模块":[], "用户管理":[] }}, "信息检索系统": {{ "系统功能":[], "权限管理模块":[] }}, "XX管理系统":[], //其他系统 }} }} ''' if '否' not in judge_res and full_text: # 添加文件内容部分 base_prompt += f"\n文件内容:\n{full_text}\n" base_prompt += "\n注意事项:\n1.严格按照上述要求执行,确保输出准确性和规范性。\n" return base_prompt #文件内容以markdown格式组织,其中表格部分(若有)以html语法组织, def get_technical_requirements(file_path,invalid_path,processed_filepath): # docx_file_path=pdf2docx(file_path) file_id=upload_file(file_path) #目前传入的为docx文档 first_query_template="该文件是否说明了采购需求,即需要采购哪些货物?如果有,请回答'是',否则,回答'否'" #防止截取失败 judge_res=qianwen_long(file_id,first_query_template) if '否' in judge_res: print("no!调用invalid_path") file_id=upload_file(invalid_path) user_query = generate_prompt(judge_res) model_res=qianwen_long(file_id,user_query) print(model_res) else: # processed_filepath = convert_pdf_to_markdown(file_path) # 转markdown格式 # processed_filepath=r"C:\Users\Administrator\Desktop\货物标\extract_files\107国道.txt" full_text = read_txt_to_string(processed_filepath) user_query=generate_prompt(judge_res,full_text) model_res=doubao_model(user_query) print(model_res) cleaned_res = clean_json_string(model_res) #转字典 processed_data=truncate_system_keys(cleaned_res['采购需求']) key_paths, grouped_paths, good_list, data_copy= generate_key_paths(processed_data) # 提取需要采购的货物清单 key_list:交通监控视频子系统.高清视频抓拍像机 ... grouped_paths是同一系统下同时有'交换机-1'和'交换机-2',提取'交换机' ,输出eg:{'交通标志.标志牌铝板', '交通信号灯.交换机'} modified_data=rename_keys(data_copy) user_query_template = """请根据货物标中采购要求部分的内容,告诉我\"{}\"的技术参数或采购要求是什么。请以 JSON 格式返回结果,键名为\"{}\",键值为一个列表,列表中包含若干描述\"{}\"的技术参数或采购要求的字符串,请按原文内容回答,保留三角▲、五角★和序号,不可擅自增删内容,尤其是不可擅自添加序号。 要求与指南: 1. 如果该货物没有相关采购要求或技术参数要求,键值应为空列表[]。 2. 如果存在嵌套结构,且原文为Markdown 的表格语法,如'摄像机|有效像素|≥900W像素', 请不要返回该Markdown语法,而是使用冒号':'将相关信息拼接在一起,生成一条完整且清晰的技术参数(或采购要求)描述,作为列表中的一个字符串。如"摄像机:有效像素:≥900W像素"。 3. 字符串中的内容为具体的技术参数要求或采购要求,请不要返回诸如'(1)高清录像功能'这种标题性质且不能体现要求的内容。 4. 你的键值应该全面,对于同一个单元格内的数据,尽量全面,不要遗漏,对于单元格内以序号分隔的各条参数要求,请分别添加进键值(即字符串列表)中。 ### 示例输出1如下: {{ "摄像机控制键盘": [ "1、▲支持串行 RS232/RS422 和 IP 混合控制,允许在一个控制器上使用 RS232/RS422/IP 控制单个系统中的摄像机;", "2、支持 2 组 RS422 串口 VISCA 协议菊花链控制 2x7 台摄像机。", "★能够自动对焦,提供检测报告" ] }} ### 示例输出2如下(包含嵌套结构): {{ "摄像机": [ "摄像机:有效像素:≥900W像素", "摄像机:最低照度:彩色≤0.001lx", "协议:routes 接口开放:具备;▲支持标准 ONVIF 协议与第三方厂家设备进行互联;支持 GB/T28181;应提供 SDK" ] }} """ user_query_template_two="""请根据货物标中采购要求部分的内容,告诉我\"{}\"的技术参数或采购要求是什么。由于该货物存在 {} 种不同的采购要求或技术参数,请逐一列出,并以 JSON 格式返回结果。请以'货物名-编号'区分多种型号,编号为从 1 开始的自然数,依次递增,即第一个键名为\"{}-1\";键值为一个列表,列表中包含若干描述\"{}\"的技术参数(或采购要求)的字符串,请按原文内容回答,保留三角▲、五角★和序号(若有),不可擅自增删内容,尤其是不可擅自添加序号。 要求与指南: 1. 如果该货物没有相关采购要求或技术参数要求,键值应为空列表。 2. 如果存在嵌套结构,且原文为Markdown 的表格语法,如'摄像机|有效像素|≥900W像素', 请不要返回该Markdown语法,而是使用冒号':'将相关信息拼接在一起,生成一条完整且清晰的技术参数(或采购要求)描述,作为列表中的一个字符串。如"摄像机:有效像素:≥900W像素"。 3. 字符串中的内容为具体的技术参数要求或采购要求,请不要返回诸如'(1)高清录像功能'这种标题性质且不能体现要求的内容。 4. 你的键值应该全面,对于同一个单元格内的数据,尽量全面,不要遗漏,对于单元格内以序号分隔的各条参数要求,请分别添加进键值(即字符串列表)中。 ### 示例输出1如下: {{ "交换机-1": [ "★1、支持固化千兆电口≥8 个,固化千兆光口≥2 个,桌面型设备;", "2、支持静态链路聚合" ], "交换机-2": [ "1、交换容量≥52Gbps,包转发率≥38.69Mpps,", "2、提供国家强制性产品认证证书及测试报告(3C)", "★能实现信号控制独立传输" ] }} ### 示例输出2如下(包含嵌套结构): {{ "摄像机-1": [ "摄像机:有效像素:≥900W像素", "摄像机:最低照度:彩色≤0.001lx", "协议:routes 接口开放:具备;▲支持标准 ONVIF 协议与第三方厂家设备进行互联;支持 GB/T28181;应提供 SDK" ], "摄像机-2": [ "支持夜视", "支持云存储" ] }} """ queries = [] for key in key_paths: # 将键中的 '.' 替换为 '下的' modified_key = key.replace('.', '下的') # 使用修改后的键填充第一个占位符,原始键填充第二个占位符 # full_text = read_txt_to_string(processed_filepath) # new_query = user_query_template.format(modified_key, key, modified_key,full_text) #转豆包后取消注释 new_query = user_query_template.format(modified_key, key, modified_key) queries.append(new_query) # 处理 grouped_paths 中的项,应用 user_query_template_two for grouped_dict in grouped_paths: for grouped_key, grouped_key_cnt in grouped_dict.items(): # 将键中的 '.' 替换为 '下的' modified_grouped_key = grouped_key.replace('.', '下的') # 使用修改后的键填充第一个占位符,原始键填充第二个占位符 # 如果需要使用 full_text,可以取消注释并提供相应的实现 # full_text = read_txt_to_string(processed_filepath) # new_query = user_query_template_two.format(modified_grouped_key, grouped_key, modified_grouped_key, full_text) # 根据您的需求,生成新的查询字符串 new_query = user_query_template_two.format(modified_grouped_key, grouped_key_cnt,grouped_key, modified_grouped_key) queries.append(new_query) results = multi_threading(queries, "", file_id, 2) #通义 # results = multi_threading(queries, "", "", 3) #豆包 technical_requirements = [] if not results: print("errror!未获得大模型的回答!") else: # 打印结果 for question, response in results: technical_requirements.append(response) # print(response) technical_requirements_combined_res = combine_json_results(technical_requirements) """根据所有键是否已添加处理技术要求""" # 更新原始采购需求字典 final_res=combine_and_update_results(modified_data, technical_requirements_combined_res) ffinal_res=all_postprocess(final_res) ffinal_res["货物列表"] = good_list # 输出最终的 JSON 字符串 return {"采购需求":ffinal_res} def test_all_files_in_folder(input_folder, output_folder): # 确保输出文件夹存在 if not os.path.exists(output_folder): os.makedirs(output_folder) # 遍历指定文件夹中的所有文件 for filename in os.listdir(input_folder): file_path = os.path.join(input_folder, filename) # 检查是否是文件 if os.path.isfile(file_path): print(f"处理文件: {file_path}") # 调用函数处理文件 try: json_result = get_technical_requirements(file_path) # 定义输出文件的路径 output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '.json') # 保存JSON结果到文件 with open(output_file_path, 'w', encoding='utf-8') as json_file: json.dump(json_result, json_file, ensure_ascii=False, indent=4) print(f"结果已保存到: {output_file_path}") except Exception as e: print(f"处理文件 {file_path} 时出错: {e}") if __name__ == "__main__": start_time=time.time() # truncate_file="C:\\Users\\Administrator\\Desktop\\fsdownload\\469d2aee-9024-4993-896e-2ac7322d41b7\\ztbfile_procurement.docx" truncate_docfile=r"C:\Users\Administrator\Desktop\货物标\output1\6_2定版视频会议磋商文件_procurement.docx" truncate_file=r'C:\Users\Administrator\Desktop\货物标\output1\6.2定版视频会议磋商文件_procurement.pdf' # invalid_path="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.pdf" # truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile_procurement.docx" # output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp" # file_id = upload_file(truncate_file) invalid_path=r"C:\Users\Administrator\Desktop\fsdownload\a110ed59-00e8-47ec-873a-bd4579a6e628\ztbfile.pdf" # file_id=upload_file(truncate_file) processed_filepath = pdf2txt(truncate_file) # processed_filepath=r"C:\Users\Administrator\Desktop\fsdownload\e702f1e6-095d-443d-bb7d-ef2e42037cb1\金水河沿线排涝泵站提档升级项目.txt" res=get_technical_requirements(truncate_docfile,invalid_path,processed_filepath) json_string = json.dumps(res, ensure_ascii=False, indent=4) print(json_string) # # input_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output1" # # output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output3" # # test_all_files_in_folder(input_folder, output_folder) end_time=time.time() print("耗时:"+str(end_time-start_time))