zbparse/flask_app/货物标/技术参数要求提取.py
2024-12-06 17:34:31 +08:00

530 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import os
import re
import time
from collections import defaultdict
from copy import deepcopy
from flask_app.general.file2markdown import convert_pdf_to_markdown
from flask_app.general.format_change import pdf2docx
from flask_app.general.多线程提问 import multi_threading
from flask_app.general.通义千问long import qianwen_long, upload_file
from flask_app.general.json_utils import clean_json_string, combine_json_results
from flask_app.货物标.截取pdf货物标版 import truncate_pdf_main
from flask_app.general.doubao import doubao_model, generate_full_user_query, pdf2txt, read_txt_to_string
from flask_app.货物标.技术参数要求提取后处理函数 import postprocess, all_postprocess
def truncate_system_keys(data):
"""
遍历输入的字典,若键名中包含'系统''软件',且其子键数量 >= 3则将其子键的值设置为[],限制深度为一层。
Args:
data (dict or list): 输入的数据,可以是嵌套的字典或列表。
Returns:
dict or list: 处理后的数据。
"""
if isinstance(data, dict):
new_data = {}
for key, value in data.items():
# 检查键名是否包含'系统'或'软件'
if '系统' in key or '软件' in key:
if isinstance(value, dict):
child_count = len(value)
# 判断子键数量是否 >= 3
if child_count >= 3:
# 截断子键,将其值设置为 []
new_data[key] = {subkey: [] for subkey in value.keys()}
else:
# 子键数量少于3递归处理子键
new_data[key] = truncate_system_keys(value)
else:
# 如果值不是字典(例如列表),直接设置为 []
new_data[key] = []
else:
# 不包含'系统'或'软件',递归处理子字典或列表
new_data[key] = truncate_system_keys(value)
return new_data
elif isinstance(data, list):
# 如果是列表,递归处理列表中的每个元素
return [truncate_system_keys(item) for item in data]
else:
# 对于其他类型的数据,保持不变
return data
def generate_key_paths(data):
"""
处理输入的字典,生成 key_paths, grouped_paths 和 good_list并根据条件修改原始字典。
参数:
data (dict): 输入的嵌套字典。
返回:
tuple: 包含 key_paths, grouped_paths 和 good_list 的元组。
"""
# 编译用于匹配后缀的正则表达式模式
pattern = re.compile(r'(.+)-\d+$')
# 初始化结果列表和字典
key_paths = []
grouped_counts = defaultdict(int) # 用于记录每个 grouped_path 的数量
good_list = []
def recurse(current_dict, path):
"""
递归遍历字典,处理 key_paths 和 grouped_paths并收集 good_list 和 grouped_counts。
参数:
current_dict (dict): 当前遍历的字典。
path (list): 当前路径的键列表。
"""
# 第一遍遍历,统计每个基名的出现次数
base_name_count = {}
base_names = {}
for key in current_dict.keys():
match = pattern.match(key)
if match:
base = match.group(1)
else:
base = key
base_names[key] = base
base_name_count[base] = base_name_count.get(base, 0) + 1
# 第二遍遍历,根据基名的出现次数分类
keys_to_rename = {}
for key, base in base_names.items():
if base_name_count[base] == 1:
# 检查是否是最内层(值为列表)
value = current_dict[key]
if isinstance(value, list):
current_path = '.'.join(path + [base])
key_paths.append(current_path)
# 收集 good_list保持顺序且不重复
if base not in good_list:
good_list.append(base)
# 如果原键名有后缀,需要记录以便后续重命名
if key != base:
keys_to_rename[key] = base
elif isinstance(value, dict):
# 继续递归处理
recurse(value, path + [base])
else:
# 记录分组路径,并统计数量
grouped_path = '.'.join(path + [base])
grouped_counts[grouped_path] += 1
# 执行键名的重命名,同时保持原有顺序
if keys_to_rename:
new_ordered_dict = {}
for key in current_dict.keys():
if key in keys_to_rename:
new_key = keys_to_rename[key]
new_ordered_dict[new_key] = current_dict[key]
else:
new_ordered_dict[key] = current_dict[key]
current_dict.clear()
current_dict.update(new_ordered_dict)
# 对于基名重复的键,继续递归(如果值是字典)
for key, base in base_names.items():
if base_name_count[base] > 1:
value = current_dict[key]
if isinstance(value, dict):
recurse(value, path + [base])
elif isinstance(value, list):
# 如果值是列表,仍需收集基名到 good_list
if base not in good_list:
good_list.append(base)
# 深拷贝数据以避免修改原始输入
data_copy = deepcopy(data)
# 开始递归遍历
recurse(data_copy, [])
def collect_grouped_paths(current_dict, path, collected):
for key in current_dict.keys():
match = pattern.match(key)
if match:
base = match.group(1)
else:
base = key
current_path = '.'.join(path + [base])
if current_path in grouped_counts and current_path not in collected:
collected.append(current_path)
value = current_dict[key]
if isinstance(value, dict):
collect_grouped_paths(value, path + [base], collected)
collected_grouped_paths = []
collect_grouped_paths(data_copy, [], collected_grouped_paths)
# 将 grouped_paths 转换为包含数量的字典列表
grouped_paths = [{path: grouped_counts[path]} for path in collected_grouped_paths]
return key_paths, grouped_paths, good_list, data_copy
def rename_keys(data):
"""
对整个数据结构进行重命名处理。
"""
def rename_keys_recursive(current_dict):
"""
递归地重名字典中的键,确保同一层级下具有相同基名的键被正确编号。
"""
if not isinstance(current_dict, dict):
return current_dict
key_order = list(current_dict.keys())
base_name_dict = defaultdict(list)
# 辅助函数:提取基名(去除可能的 -数字 后缀)
def get_base_name(key):
if '-' in key:
parts = key.rsplit('-', 1)
if parts[1].isdigit():
return parts[0]
return key
# 将键按基名分组
for key in key_order:
base = get_base_name(key)
base_name_dict[base].append(key)
new_dict = {}
for key in key_order:
base = get_base_name(key)
keys = base_name_dict[base]
if len(keys) > 1:
# 如果存在同基名的多个键,则进行重命名
if base not in new_dict:
# 按原始顺序对需要重命名的键进行排序
sorted_keys = sorted(keys, key=lambda x: key_order.index(x))
for idx, original_key in enumerate(sorted_keys, start=1):
new_key = f"{base}-{idx}"
# 如果值是字典,递归处理
if isinstance(current_dict[original_key], dict):
new_dict[new_key] = rename_keys_recursive(current_dict[original_key])
else:
new_dict[new_key] = current_dict[original_key]
else:
# 如果没有重复的基名,保持原名
if isinstance(current_dict[key], dict):
new_dict[key] = rename_keys_recursive(current_dict[key])
else:
new_dict[key] = current_dict[key]
return new_dict
# 对整个数据结构进行递归重命名
return rename_keys_recursive(data)
def combine_and_update_results(original_data, updates):
"""
先规范化original和updates中的字典防止空格的情况导致匹配不上无法更新
"""
def normalize_key(key):
"""
规范化键名:
- 替换全角点号为半角点号。
- 删除所有空格(包括半角和全角)。
"""
# 替换全角点号(.、。)为半角点号(.
key = key.replace('', '.').replace('', '.')
# 删除所有空格(半角空格和全角空格)
key = key.replace(' ', '').replace('\u3000', '')
return key
def normalize_original_data(d):
"""
递归规范化原始数据字典的键。
"""
if not isinstance(d, dict):
return d
normalized = {}
for k, v in d.items():
nk = normalize_key(k)
normalized[nk] = normalize_original_data(v)
return normalized
def normalize_update_value(value):
"""
递归规范化更新字典中嵌套的字典的键。
"""
if isinstance(value, dict):
return {normalize_key(k): normalize_update_value(v) for k, v in value.items()}
else:
return value
def recursive_update(data, key, value):
"""
递归更新嵌套字典。
"""
keys = key.split('.')
for k in keys[:-1]:
data = data.setdefault(k, {})
if isinstance(value, dict) and isinstance(data.get(keys[-1], None), dict):
data[keys[-1]] = {**data.get(keys[-1], {}), **value}
else:
data[keys[-1]] = value
# 1. 规范化原始数据字典的键
original_data = normalize_original_data(original_data)
# 2. 规范化更新字典的键
normalized_updates = {}
for key, value in updates.items():
nk = normalize_key(key)
nv = normalize_update_value(value)
normalized_updates[nk] = nv
# 3. 执行递归更新
for key, value in normalized_updates.items():
recursive_update(original_data, key, value)
return original_data
def generate_prompt(judge_res, full_text=None):
"""
获取需要采购的货物名称
根据 `judge_res` 和 `full_text` 动态生成 prompt。
如果 `judge_res` 包含 '',则不添加文件内容部分。
如果 `judge_res` 不包含 '' 且有 `full_text`,则添加文件内容部分。
"""
base_prompt = '''
任务你负责解析采购文件提取采购需求并以JSON格式返回不要遗漏该项目需要采购的货物或系统
要求与指南:
1. 精准定位:请运用文档理解能力,定位文件中的采购需求部分。若有采购清单,请直接根据采购清单上的货物(或系统)名称给出结果,注意你无需提取诸如'说明''规格''技术参数''描述'等列的内容,即你不需要给出详细的采购要求,仅返回采购的货物或系统或模块名称;若没有采购清单,则从表格或文本中摘取采购信息。
2. 采购目标采购种类通常有硬件如设备、货物和软件如系统软件、应用APP一次采购活动可以同时包含这两种类型。
3. 系统归属:一些采购活动可能将采购目标划分为若干系统和货物,每个系统可能包含若干货物,则将这些货物名称作为该系统的二级键,注意这种包含关系是通过表格结构或文档标题层次来得出的;系统可以只包含总体'系统功能'而无货物。
5. 软件需求:对于软件应用或系统软件采购,若有多个系统且序号分明,请不要遗漏,最多仅需列出系统模块构成(若有),并作为该系统键值的一部分,无需在模块下再细分功能。
5. 系统功能:若采购的某系统提及总体系统功能,则在系统值中添加'系统功能'二级键,不展开具体内容。
6. 完整性:确保不遗漏系统内的货物,也不添加未提及的内容。若'采购清单'中未提取的货物(或系统)名称在形如'主要设备功能指标'的标题下有详细参数指标要求,请将该货物名也添加至返回中。
特殊情况:
1.若采购的货物或系统或模块名称前存在三角▲,△、五角★,☆,注意是名称前而非具体的技术参数或采购要求前,在返回名称时请保留前面的▲,△或★,☆符号,如'★高清摄像机'
2.若同一层级(如同一系统中)下存在同名但采购要求不同的货物,请以'货物名-编号'区分编号从1递增例如若同层级下存在两种型号的交换机那么命名分别是'交换机-1''交换机-2',以规避重复键名;否则无需在名称后添加编号。
输出格式:
1.JSON格式最外层键名为'采购需求'
2.层次关系用嵌套键值对表示。
3.嵌套键名为系统或货物或模块名称,与原文保持一致。
4.最内层键值应为空列表[]。
示例输出1普通系统、货物类采购
{{
"采购需求": {{
"交换机-1": [],
"交换机-2": [],
"门禁管理系统": {{
"系统功能":[]
}},
"交通监控视频子系统": {{
"系统功能": [],
"交换机":[],
"高清视频抓拍像机": [],
"补光灯": []
}},
"LED全彩显示屏": []
// 其他系统和货物
}}
}}
示例输出2软件系统类采购
{{
"采购需求": {{
"信息管理系统": {{
"通用模块":[],
"用户管理":[]
}},
"信息检索系统": {{
"系统功能":[],
"权限管理模块":[]
}},
"XX管理系统":[],
//其他系统
}}
}}
'''
if '' not in judge_res and full_text:
# 添加文件内容部分
base_prompt += f"\n文件内容:\n{full_text}\n"
base_prompt += "\n注意事项:\n1.严格按照上述要求执行,确保输出准确性和规范性。\n"
return base_prompt
#文件内容以markdown格式组织其中表格部分若有以html语法组织
def get_technical_requirements(file_path,invalid_path,processed_filepath):
# docx_file_path=pdf2docx(file_path)
file_id=upload_file(file_path) #目前传入的为docx文档
first_query_template="该文件是否说明了采购需求,即需要采购哪些货物?如果有,请回答'',否则,回答''" #防止截取失败
judge_res=qianwen_long(file_id,first_query_template)
if '' in judge_res:
print("no!调用invalid_path")
file_id=upload_file(invalid_path)
user_query = generate_prompt(judge_res)
model_res=qianwen_long(file_id,user_query)
print(model_res)
else:
# processed_filepath = convert_pdf_to_markdown(file_path) # 转markdown格式
# processed_filepath=r"C:\Users\Administrator\Desktop\货物标\extract_files\107国道.txt"
full_text = read_txt_to_string(processed_filepath)
user_query=generate_prompt(judge_res,full_text)
model_res=doubao_model(user_query)
print(model_res)
cleaned_res = clean_json_string(model_res) #转字典
processed_data=truncate_system_keys(cleaned_res['采购需求'])
key_paths, grouped_paths, good_list, data_copy= generate_key_paths(processed_data) # 提取需要采购的货物清单 key_list交通监控视频子系统.高清视频抓拍像机 ... grouped_paths是同一系统下同时有'交换机-1'和'交换机-2',提取'交换机' 输出eg:{'交通标志.标志牌铝板', '交通信号灯.交换机'}
modified_data=rename_keys(data_copy)
user_query_template = """请根据货物标中采购要求部分的内容,告诉我\"{}\"的技术参数或采购要求是什么。请以 JSON 格式返回结果,键名为\"{}\",键值为一个列表,列表中包含若干描述\"{}\"的技术参数或采购要求或功能说明的字符串,请按原文内容回答,保留三角▲、五角★和序号,不可擅自增删内容,尤其是不可擅自添加序号。
要求与指南:
1. 你的键值应该全面,不要遗漏。
-a.若技术参数或采购要求在表格中,那么单元格内的内容基本都要涵盖
-对于单元格内以序号分隔的各条参数要求,应逐条提取,并分别作为键值中的字符串列表项。
-对于无序号标明且在同一单元格内的参数要求或功能说明,也要根据语义分别添加进键值中。
-b.若技术参数或采购要求在正文部分,应准确定位到与目标货物(设备、系统、功能模块)相关的内容,将其后的技术参数或采购要求或功能说明完整提取,逐一添加到键值的字符串列表中,不得擅自添加或修改序号。
2. 如果存在嵌套结构且原文为Markdown 的表格语法,如'摄像机|有效像素|≥900W像素' 请不要返回该Markdown语法而是使用冒号':'将相关信息拼接在一起,生成一条完整且清晰的技术参数(或采购要求)描述,作为列表中的一个字符串。如"摄像机有效像素≥900W像素"
3. 字符串中的内容为具体的技术参数要求或采购要求,请不要返回诸如'1高清录像功能'这种标题性质且不能体现要求的内容。
4. 如果该货物没有相关采购要求或技术参数要求,键值应为空列表[]。
### 示例输出1如下
{{
"摄像机控制键盘": [
"1、▲支持串行 RS232/RS422 和 IP 混合控制,允许在一个控制器上使用 RS232/RS422/IP 控制单个系统中的摄像机;",
"2、支持 2 组 RS422 串口 VISCA 协议菊花链控制 2x7 台摄像机。",
"★能够自动对焦,提供检测报告"
]
}}
### 示例输出2如下包含嵌套结构
{{
"摄像机": [
"摄像机有效像素≥900W像素",
"摄像机最低照度彩色≤0.001lx",
"协议routes 接口开放:具备;▲支持标准 ONVIF 协议与第三方厂家设备进行互联;支持 GB/T28181应提供 SDK"
]
}}
"""
user_query_template_two="""请根据货物标中采购要求部分的内容,告诉我\"{}\"的技术参数或采购要求是什么。由于该货物存在 {} 种不同的采购要求或技术参数,请逐一列出,并以 JSON 格式返回结果。请以'货物名-编号'区分多种型号,编号为从 1 开始的自然数,依次递增,即第一个键名为\"{}-1\";键值为一个列表,列表中包含若干描述\"{}\"的技术参数或采购要求或功能说明的字符串,请按原文内容回答,保留三角▲、五角★和序号(若有),不可擅自增删内容,尤其是不可擅自添加序号。
要求与指南:
1. 你的键值应该全面,不要遗漏。
-a.若技术参数或采购要求在表格中,那么单元格内的内容基本都要涵盖
-对于单元格内以序号分隔的各条参数要求,应逐条提取,并分别作为键值中的字符串列表项。
-对于无序号标明且在同一单元格内的参数要求或功能说明,也要根据语义分别添加进键值中。
-b.若技术参数或采购要求在正文部分,应准确定位到与目标货物(设备、系统、功能模块)相关的内容,将其后的技术参数或采购要求或功能说明完整提取,逐一添加到键值的字符串列表中,不得擅自添加或修改序号。
2. 如果存在嵌套结构且原文为Markdown 的表格语法,如'摄像机|有效像素|≥900W像素' 请不要返回该Markdown语法而是使用冒号':'将相关信息拼接在一起,生成一条完整且清晰的技术参数(或采购要求)描述,作为列表中的一个字符串。如"摄像机有效像素≥900W像素"
3. 字符串中的内容为具体的技术参数要求或采购要求,请不要返回诸如'1高清录像功能'这种标题性质且不能体现要求的内容。
4. 如果该货物没有相关采购要求或技术参数要求,键值应为空列表[]。
### 示例输出1如下
{{
"交换机-1": [
"★1、支持固化千兆电口≥8 个固化千兆光口≥2 个,桌面型设备;",
"2、支持静态链路聚合"
],
"交换机-2": [
"1、交换容量≥52Gbps包转发率≥38.69Mpps",
"2、提供国家强制性产品认证证书及测试报告3C",
"★能实现信号控制独立传输"
]
}}
### 示例输出2如下包含嵌套结构
{{
"摄像机-1": [
"摄像机有效像素≥900W像素",
"摄像机最低照度彩色≤0.001lx",
"协议routes 接口开放:具备;▲支持标准 ONVIF 协议与第三方厂家设备进行互联;支持 GB/T28181应提供 SDK"
],
"摄像机-2": [
"支持夜视", "支持云存储"
]
}}
"""
queries = []
for key in key_paths:
# 将键中的 '.' 替换为 '下的'
modified_key = key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
# full_text = read_txt_to_string(processed_filepath)
# new_query = user_query_template.format(modified_key, key, modified_key,full_text) #转豆包后取消注释
new_query = user_query_template.format(modified_key, key, modified_key)
queries.append(new_query)
# 处理 grouped_paths 中的项,应用 user_query_template_two
for grouped_dict in grouped_paths:
for grouped_key, grouped_key_cnt in grouped_dict.items():
# 将键中的 '.' 替换为 '下的'
modified_grouped_key = grouped_key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
# 如果需要使用 full_text可以取消注释并提供相应的实现
# full_text = read_txt_to_string(processed_filepath)
# new_query = user_query_template_two.format(modified_grouped_key, grouped_key, modified_grouped_key, full_text)
# 根据您的需求,生成新的查询字符串
new_query = user_query_template_two.format(modified_grouped_key, grouped_key_cnt,grouped_key, modified_grouped_key)
queries.append(new_query)
results = multi_threading(queries, "", file_id, 2) #通义
# results = multi_threading(queries, "", "", 3) #豆包
technical_requirements = []
if not results:
print("errror!未获得大模型的回答!")
else:
# 打印结果
for question, response in results:
technical_requirements.append(response)
# print(response)
technical_requirements_combined_res = combine_json_results(technical_requirements)
"""根据所有键是否已添加处理技术要求"""
# 更新原始采购需求字典
final_res=combine_and_update_results(modified_data, technical_requirements_combined_res)
ffinal_res=all_postprocess(final_res)
ffinal_res["货物列表"] = good_list
# 输出最终的 JSON 字符串
return {"采购需求":ffinal_res}
def test_all_files_in_folder(input_folder, output_folder):
# 确保输出文件夹存在
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 遍历指定文件夹中的所有文件
for filename in os.listdir(input_folder):
file_path = os.path.join(input_folder, filename)
# 检查是否是文件
if os.path.isfile(file_path):
print(f"处理文件: {file_path}")
# 调用函数处理文件
try:
json_result = get_technical_requirements(file_path)
# 定义输出文件的路径
output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '.json')
# 保存JSON结果到文件
with open(output_file_path, 'w', encoding='utf-8') as json_file:
json.dump(json_result, json_file, ensure_ascii=False, indent=4)
print(f"结果已保存到: {output_file_path}")
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
if __name__ == "__main__":
start_time=time.time()
# truncate_file="C:\\Users\\Administrator\\Desktop\\fsdownload\\469d2aee-9024-4993-896e-2ac7322d41b7\\ztbfile_procurement.docx"
truncate_docfile=r"C:\Users\Administrator\Desktop\货物标\output1\6_2定版视频会议磋商文件_procurement.docx"
truncate_file=r'C:\Users\Administrator\Desktop\货物标\output1\6.2定版视频会议磋商文件_procurement.pdf'
# invalid_path="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.pdf"
# truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile_procurement.docx"
# output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp"
# file_id = upload_file(truncate_file)
invalid_path=r"C:\Users\Administrator\Desktop\fsdownload\a110ed59-00e8-47ec-873a-bd4579a6e628\ztbfile.pdf"
# file_id=upload_file(truncate_file)
processed_filepath = pdf2txt(truncate_file)
# processed_filepath=r"C:\Users\Administrator\Desktop\fsdownload\e702f1e6-095d-443d-bb7d-ef2e42037cb1\金水河沿线排涝泵站提档升级项目.txt"
res=get_technical_requirements(truncate_docfile,invalid_path,processed_filepath)
json_string = json.dumps(res, ensure_ascii=False, indent=4)
print(json_string)
# # input_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output1"
# # output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\output3"
# # test_all_files_in_folder(input_folder, output_folder)
end_time=time.time()
print("耗时:"+str(end_time-start_time))