Merge branch 'develop-test' into develop

# Conflicts:
#	flask_app/general/无效标和废标公共代码.py
This commit is contained in:
zy123 2024-12-30 10:32:35 +08:00
commit fd4882e593
14 changed files with 420 additions and 364 deletions

View File

@ -296,6 +296,7 @@ def extract_content_from_json(input_string,flag=False):
# 如果所有方法都失败,检查字符串长度 # 如果所有方法都失败,检查字符串长度
print("所有修复方法均失败。传入的字符串:") print("所有修复方法均失败。传入的字符串:")
print(input_string) print(input_string)
print("-------------------")
return {} # 返回空字典 return {} # 返回空字典
def clean_json_string(json_string,flag=False): def clean_json_string(json_string,flag=False):

View File

@ -215,3 +215,12 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
else: else:
print(f"合并失败,没有生成 '{output_path}'") print(f"合并失败,没有生成 '{output_path}'")
return "" return ""
if __name__ == "__main__":
path1=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_evaluation_method.pdf'
path2=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp\gcHBDL-2024-0017-001-招标文件_qualification.pdf'
output_folder=r'D:\flask_project\flask_app\static\output\output1\2779a32c-1d74-44d7-8381-87fb2a88d80a\tmp'
output=os.path.join(output_folder,"merged_qualification.pdf")
path=[path1,path2]
output_path=merge_pdfs(path,output)
print(output_path)

View File

@ -3,8 +3,7 @@ import json
import re import re
from flask_app.general.format_date import format_chinese_date from flask_app.general.format_date import format_chinese_date
from flask_app.general.format_amout import format_amount from flask_app.general.format_amout import format_amount
from flask_app.routes.偏离表main import extract_matching_keys, prepare_for_zige_info, \ from flask_app.routes.偏离表main import extract_matching_keys, prepare_for_zige_info, process_functions_in_parallel
process_functions_in_parallel
# 定义一个辅助函数用于获取嵌套字典中的值 # 定义一个辅助函数用于获取嵌套字典中的值
@ -235,17 +234,21 @@ def outer_post_processing(combined_data, includes, good_list):
tuple: (processed_data, extracted_info, procurement_reqs) tuple: (processed_data, extracted_info, procurement_reqs)
""" """
# 初始化结果字典,预设'其他'分类为空字典 # 初始化结果字典,预设'其他'分类为空字典
processed_data = {"其他": {}} processed_data = {"其他": {}}
# 初始化提取的信息字典 # 初始化提取的信息字典
extracted_info = {} extracted_info = {}
tech_eval_info=""
busi_eval_info=""
zige_info = "" zige_info = ""
fuhe_info = "" fuhe_info = ""
zigefuhe_info = "" zigefuhe_info = ""
# 初始化采购要求的技术要求 # 初始化采购要求的技术要求
tech_deviation_info = "" tech_deviation_info = ""
procurement_reqs={} busi_requirements_info=""
tech_deviation={} tech_deviation={}
busi_requirements={}
# 检查 '基础信息' 是否在 includes 中 # 检查 '基础信息' 是否在 includes 中
if "基础信息" in includes: if "基础信息" in includes:
base_info = combined_data.get("基础信息", {}) base_info = combined_data.get("基础信息", {})
@ -256,7 +259,8 @@ def outer_post_processing(combined_data, includes, good_list):
# 提取 '采购要求' 下的 '采购需求' # 提取 '采购要求' 下的 '采购需求'
procurement_reqs=get_nested(base_info, ["采购要求"], {}) procurement_reqs=get_nested(base_info, ["采购要求"], {})
tech_requirements = get_nested(procurement_reqs, ["采购需求"], {}) tech_requirements = get_nested(procurement_reqs, ["采购需求"], {})
busi_requirements = {k: v for k, v in procurement_reqs.items() if k != "采购需求"}
busi_requirements_info = json.dumps(busi_requirements, ensure_ascii=False, indent=4)
if tech_requirements: if tech_requirements:
tech_deviation = extract_matching_keys(tech_requirements, good_list) tech_deviation = extract_matching_keys(tech_requirements, good_list)
tech_deviation_info=json.dumps(tech_deviation,ensure_ascii=False,indent=4) tech_deviation_info=json.dumps(tech_deviation,ensure_ascii=False,indent=4)
@ -267,12 +271,20 @@ def outer_post_processing(combined_data, includes, good_list):
if "资格审查" in includes: if "资格审查" in includes:
zige_review = combined_data.get("资格审查", {}) zige_review = combined_data.get("资格审查", {})
zige_info,fuhe_info,zigefuhe_info=prepare_for_zige_info(zige_review) zige_info,fuhe_info,zigefuhe_info=prepare_for_zige_info(zige_review)
tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation = process_functions_in_parallel( if "技术评分" in includes:
tech_eval=combined_data.get("技术评分",{})
tech_eval_info=json.dumps(tech_eval,ensure_ascii=False,indent=4)
if "商务评分" in includes:
busi_eval=combined_data.get("商务评分",{})
busi_eval_info=json.dumps(busi_eval,ensure_ascii=False,indent=4)
all_data_info = '\n'.join([zige_info, fuhe_info, zigefuhe_info, tech_deviation_info,busi_requirements_info, tech_eval_info,busi_eval_info])
tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation,proof_materials = process_functions_in_parallel(
tech_deviation_info=tech_deviation_info, tech_deviation_info=tech_deviation_info,
procurement_reqs=procurement_reqs, busi_requirements_dict=busi_requirements,
zige_info=zige_info, zige_info=zige_info,
fuhe_info=fuhe_info, fuhe_info=fuhe_info,
zigefuhe_info=zigefuhe_info zigefuhe_info=zigefuhe_info,
all_data_info=all_data_info
) )
# 遍历原始字典的每一个键值对 # 遍历原始字典的每一个键值对
@ -292,7 +304,7 @@ def outer_post_processing(combined_data, includes, good_list):
if not processed_data["其他"]: if not processed_data["其他"]:
del processed_data["其他"] del processed_data["其他"]
return processed_data, extracted_info, tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation return processed_data, extracted_info, tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation,proof_materials
if __name__ == "__main__": if __name__ == "__main__":
combined_data = { combined_data = {

View File

@ -434,7 +434,7 @@ def combine_evaluation_standards(evaluation_method_path,invalid_path,zb_type):
if __name__ == "__main__": if __name__ == "__main__":
start_time=time.time() start_time=time.time()
# truncate_file=r"C:\Users\Administrator\Desktop\招标文件-采购类\tmp2\2024-新疆-塔城地区公安局食药环分局快检实验室项目_evaluation_method.pdf" # truncate_file=r"C:\Users\Administrator\Desktop\招标文件-采购类\tmp2\2024-新疆-塔城地区公安局食药环分局快检实验室项目_evaluation_method.pdf"
evaluation_method_path = 'C:\\Users\\Administrator\\Desktop\\文件解析问题\\文件解析问题\\1414cb9c-7bf4-401c-8761-2acde151b9c2\\tmp\\政法委视频_evaluation_method.pdf' evaluation_method_path = r'C:\Users\Administrator\Desktop\文件解析问题\文件解析问题\82a6f11d-cfcd-4cb4-93e9-940fa24abb21\ztbfile_evaluation_method.pdf'
invalid_path=r'C:\Users\Administrator\Desktop\文件解析问题\文件解析问题\1414cb9c-7bf4-401c-8761-2acde151b9c2\ztbfile.docx' invalid_path=r'C:\Users\Administrator\Desktop\文件解析问题\文件解析问题\1414cb9c-7bf4-401c-8761-2acde151b9c2\ztbfile.docx'
# truncate_file = "C:\\Users\\Administrator\\Desktop\\货物标\\output2\\2-招标文件统计局智能终端二次招标_evaluation_method.pdf" # truncate_file = "C:\\Users\\Administrator\\Desktop\\货物标\\output2\\2-招标文件统计局智能终端二次招标_evaluation_method.pdf"
# truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output2\\广水市妇幼招标文件最新W改_evaluation_method.pdf" # truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output2\\广水市妇幼招标文件最新W改_evaluation_method.pdf"

View File

@ -116,13 +116,13 @@ if __name__ == "__main__":
# pdf_path=r"C:\Users\Administrator\Desktop\货物标\zbfiles\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件.pdf" # pdf_path=r"C:\Users\Administrator\Desktop\货物标\zbfiles\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件.pdf"
# pdf_path = r"C:\Users\Administrator\Desktop\货物标\zbfiles\zbtest4_evaluation_method.pdf" # pdf_path = r"C:\Users\Administrator\Desktop\货物标\zbfiles\zbtest4_evaluation_method.pdf"
# pdf_path = r"C:\Users\Administrator\Desktop\招标文件\招标02.pdf" # pdf_path = r"C:\Users\Administrator\Desktop\招标文件\招标02.pdf"
pdf_path=r'C:\Users\Administrator\Downloads\_2020年广水市中小学教师办公电脑系统及多媒体“班班通”设备采购安装项目_加水印3333.pdf' pdf_path=r'C:\Users\Administrator\Desktop\new招标文件\货物标\HBDL-2024-0158-001-招标文件.pdf'
# input_path=r"C:\Users\Administrator\Desktop\招标文件\招标test文件夹\zbtest8.pdf" # input_path=r"C:\Users\Administrator\Desktop\招标文件\招标test文件夹\zbtest8.pdf"
output_folder = r"C:\Users\Administrator\Desktop\fsdownload\91399aa4-1ee8-447d-a05b-03cd8d15ced5\tmp" output_folder = r"C:\Users\Administrator\Desktop\fsdownload\91399aa4-1ee8-447d-a05b-03cd8d15ced5\tmp"
# selections = [1, 4] # 仅处理 selection 4、1 # selections = [1, 4] # 仅处理 selection 4、1
# selections = [1, 3, 5] selections = [1, 2, 3, 5]
# files = truncate_pdf_multiple(pdf_path, output_folder, logger, 'goods', selections) files = truncate_pdf_multiple(pdf_path, output_folder, logger, 'goods', selections)
files = truncate_pdf_multiple(pdf_path, output_folder, logger, 'goods') # files = truncate_pdf_multiple(pdf_path, output_folder, logger, 'goods')
print(files) print(files)
# print(files[-1]) # print(files[-1])
# print(files[-2]) # print(files[-2])

View File

View File

@ -18,59 +18,54 @@ def get_deviation():
zb_type = g.zb_type zb_type = g.zb_type
try: try:
logger.info("开始解析 URL: " + file_url) logger.info("call /get_deviation: 开始解析 URL: " + file_url)
if zb_type != 2: if zb_type not in [1, 2]:
logger.error(f"无效的 zb_type: {zb_type}. 期望 zb_type: 2") logger.error(f"无效的 zb_type: {zb_type}. 期望 zb_type: 1 或 2")
return jsonify({ return jsonify({
'error': 'Invalid zb_type', 'error': 'Invalid zb_type',
'message': '此端点仅支持 zb_type 2 (采购需求)' 'message': '此端点仅支持 zb_type 1 或 2'
}), 400 }), 400
else: else:
tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation = download_and_process_file_for_deviation( # 直接下载并处理文件
file_url, unique_id) output_folder = g.output_folder
if tech_deviation is None: filename = "ztbfile"
return jsonify({'error': 'File processing failed'}), 500 downloaded_filename = os.path.join(output_folder, filename)
tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response = generate_deviation_response(
tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation, # 下载文件
logger) downloaded_filepath, file_type = download_file(file_url, downloaded_filename)
if downloaded_filepath is None or file_type == 4:
logger.error("Unsupported file type or failed to download file")
return jsonify({'error': 'Unsupported file type or failed to download file'}), 500
logger.info("Local file path: " + downloaded_filepath)
# 处理文件
tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation, proof_materials = get_tech_and_business_deviation(
downloaded_filepath, file_type, unique_id, output_folder,zb_type)
# 生成偏差响应
tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response, proof_materials_response = generate_deviation_response(
tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation, proof_materials, logger)
final_response = { final_response = {
'message': 'processed successfully', 'message': 'processed successfully',
'filename': 'END', 'filename': 'END',
'data': 'END' 'data': 'END'
} }
# 流式返回数据
def generate(): def generate():
yield f"data: {json.dumps(tech_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(tech_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(tech_deviation_star_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(tech_deviation_star_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(zigefuhe_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(zigefuhe_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(shangwu_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(shangwu_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(shangwu_star_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(shangwu_star_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(proof_materials_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(final_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(final_response, ensure_ascii=False)}\n\n"
return Response(generate(), mimetype='text/event-stream') return Response(generate(), mimetype='text/event-stream')
except Exception as e: except Exception as e:
logger.error('发生异常: ' + str(e)) logger.error('发生异常: ' + str(e))
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
def download_and_process_file_for_deviation(file_url, unique_id):
"""
下载并处理采购需求文件
参数:
file_url (str): 文件的URL地址
返回:
tuple: (tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation)
"""
logger = g.logger
output_folder = g.output_folder
filename = "ztbfile"
downloaded_filename = os.path.join(output_folder, filename)
downloaded_filepath, file_type = download_file(file_url, downloaded_filename)
if downloaded_filepath is None or file_type == 4:
logger.error("Unsupported file type or failed to download file")
return None, None, None, None, None
logger.info("Local file path: " + downloaded_filepath)
tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation = get_tech_and_business_deviation(
downloaded_filepath, file_type, unique_id, output_folder)
return tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation

View File

@ -114,15 +114,16 @@ def process_and_stream(file_url, zb_type):
output_json_path = os.path.join(output_folder, 'final_result.json') output_json_path = os.path.join(output_folder, 'final_result.json')
extracted_info_path = os.path.join(output_folder, 'extracted_result.json') extracted_info_path = os.path.join(output_folder, 'extracted_result.json')
includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"] includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"]
final_result, extracted_info, tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation = outer_post_processing(combined_data, includes, good_list) final_result, extracted_info, tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation, proof_materials = outer_post_processing(combined_data, includes, good_list)
tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response = generate_deviation_response( tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response,proof_materials_response = generate_deviation_response(
tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation, logger) tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation,proof_materials, logger)
yield f"data: {json.dumps(tech_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(tech_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(tech_deviation_star_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(tech_deviation_star_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(zigefuhe_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(zigefuhe_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(shangwu_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(shangwu_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(shangwu_star_deviation_response, ensure_ascii=False)}\n\n" yield f"data: {json.dumps(shangwu_star_deviation_response, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(proof_materials_response, ensure_ascii=False)}\n\n"
try: try:
with open(extracted_info_path, 'w', encoding='utf-8') as json_file: with open(extracted_info_path, 'w', encoding='utf-8') as json_file:

View File

@ -24,12 +24,13 @@ def validate_request():
return jsonify({'error': 'Invalid zb_type provided'}), 400 return jsonify({'error': 'Invalid zb_type provided'}), 400
return file_url, zb_type return file_url, zb_type
def generate_deviation_response(tech_deviation, tech_star_deviation, business_deviation, business_star_deviation, def generate_deviation_response(tech_deviation, tech_star_deviation, business_deviation, business_star_deviation,
zigefuhe_deviation, logger): zigefuhe_deviation,proof_materials, logger):
logger.info(f"技术偏离表: {json.dumps(tech_deviation, ensure_ascii=False, indent=4)}") logger.info(f"技术偏离表: {json.dumps(tech_deviation, ensure_ascii=False, indent=4)}")
logger.info(f"技术偏离表带星: {json.dumps(tech_star_deviation, ensure_ascii=False, indent=4)}") logger.info(f"技术偏离表带星: {json.dumps(tech_star_deviation, ensure_ascii=False, indent=4)}")
logger.info(f"商务偏离表: {json.dumps(business_deviation, ensure_ascii=False, indent=4)}") logger.info(f"商务偏离表: {json.dumps(business_deviation, ensure_ascii=False, indent=4)}")
logger.info(f"商务偏离表带星: {json.dumps(business_star_deviation, ensure_ascii=False, indent=4)}") logger.info(f"商务偏离表带星: {json.dumps(business_star_deviation, ensure_ascii=False, indent=4)}")
logger.info(f"资格检查偏离表: {json.dumps(zigefuhe_deviation, ensure_ascii=False, indent=4)}") logger.info(f"资格检查偏离表: {json.dumps(zigefuhe_deviation, ensure_ascii=False, indent=4)}")
logger.info(f"所需提交的材料: {json.dumps(proof_materials, ensure_ascii=False, indent=4)}")
tech_deviation_response = { tech_deviation_response = {
'message': 'procurement_reqs', 'message': 'procurement_reqs',
@ -56,7 +57,12 @@ def generate_deviation_response(tech_deviation, tech_star_deviation, business_de
'filename': 'shangwu_star_deviation', 'filename': 'shangwu_star_deviation',
'data': json.dumps(business_star_deviation, ensure_ascii=False) 'data': json.dumps(business_star_deviation, ensure_ascii=False)
} }
return tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response proof_materials_response={
'message': 'proof_materials',
'filename': 'proof_materials',
'data': json.dumps(proof_materials, ensure_ascii=False)
}
return tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response,proof_materials_response
def require_connection_limit(): def require_connection_limit():

View File

@ -1,14 +1,18 @@
import json import json
import os
import time import time
from flask_app.general.doubao import doubao_model from flask_app.general.doubao import doubao_model
from flask_app.general.format_change import pdf2docx, docx2pdf,doc2docx from flask_app.general.format_change import pdf2docx, docx2pdf,doc2docx
from flask_app.general.json_utils import clean_json_string from flask_app.general.json_utils import clean_json_string
from flask_app.general.merge_pdfs import merge_pdfs
from flask_app.general.通义千问long import qianwen_plus
from flask_app.general.通用功能函数 import get_global_logger from flask_app.general.通用功能函数 import get_global_logger
from flask_app.general.截取pdf_main import truncate_pdf_multiple from flask_app.general.截取pdf_main import truncate_pdf_multiple
from flask_app.货物标.提取采购需求main import fetch_procurement_reqs from flask_app.货物标.提取采购需求main import fetch_procurement_reqs
from flask_app.货物标.技术参数要求提取后处理函数 import extract_matching_keys from flask_app.货物标.技术参数要求提取后处理函数 import extract_matching_keys
from flask_app.货物标.资格审查main import combine_qualification_review from flask_app.货物标.资格审查main import combine_qualification_review
from flask_app.general.商务技术评分提取 import combine_evaluation_standards
import concurrent.futures import concurrent.futures
logger = None logger = None
@ -107,7 +111,7 @@ def extract_zige_deviation_table(zige_info, fuhe_info, zigefuhe_info):
""" """
def get_model_response(query): def get_model_response(query):
return doubao_model(query) return qianwen_plus(query)
result = {"资格审查": {}} result = {"资格审查": {}}
@ -145,21 +149,50 @@ def extract_zige_deviation_table(zige_info, fuhe_info, zigefuhe_info):
"符合性检查": fuhe_deviation.get("符合性检查", fuhe_deviation), "符合性检查": fuhe_deviation.get("符合性检查", fuhe_deviation),
} }
return result return result
def extract_business_deviation(procurement): def extract_business_deviation(busi_requirements_dict):
# 默认返回值
default_return = ({"商务要求": []}, {"商务要求带星": []})
if not busi_requirements_dict:
# 如果字典为空,返回默认字典
return default_return
# 定义一个辅助函数来检查字段是否为“未提供”
def is_unprovided(field):
if isinstance(field, str):
return field == "未提供"
elif isinstance(field, list):
# 对于列表,检查所有元素是否都是“未提供”
return all(item == "未提供" for item in field)
elif isinstance(field, dict):
# 对于字典,递归检查所有值是否都是“未提供”
return all(is_unprovided(v) for v in field.values())
return False
# 获取各个要求的值,默认为“未提供”以处理缺失的键
service_requirement = busi_requirements_dict.get("服务要求", "未提供")
business_requirement = busi_requirements_dict.get("商务要求", "未提供")
other_requirement = busi_requirements_dict.get("其他要求", "未提供")
# 检查是否所有要求都是“未提供”
if (is_unprovided(service_requirement) and
is_unprovided(business_requirement) and
is_unprovided(other_requirement)):
return default_return
new_data = {} new_data = {}
counter = 1 counter = 1
if "服务要求" in procurement: if "服务要求" in busi_requirements_dict:
new_data[f"招标要求{counter}"] = procurement["服务要求"] new_data[f"招标要求{counter}"] = busi_requirements_dict["服务要求"]
counter += 1 counter += 1
# Extract "商务要求" # Extract "商务要求"
if "商务要求" in procurement: if "商务要求" in busi_requirements_dict:
new_data[f"招标要求{counter}"] = procurement["商务要求"] new_data[f"招标要求{counter}"] = busi_requirements_dict["商务要求"]
counter += 1 counter += 1
# Extract "其他要求" # Extract "其他要求"
if "其他要求" in procurement: if "其他要求" in busi_requirements_dict:
new_data[f"招标要求{counter}"] = procurement["其他要求"] new_data[f"招标要求{counter}"] = busi_requirements_dict["其他要求"]
counter += 1 counter += 1
business_requirements_string = json.dumps(new_data, ensure_ascii=False, indent=4) business_requirements_string = json.dumps(new_data, ensure_ascii=False, indent=4)
@ -213,13 +246,13 @@ def extract_business_deviation(procurement):
文本内容{full_text} 文本内容{full_text}
""" """
user_query1 = prompt_template1.format(full_text=business_requirements_string) user_query1 = prompt_template1.format(full_text=business_requirements_string)
model_res1 = doubao_model(user_query1) model_res1 = qianwen_plus(user_query1)
# print(model_res) # print(model_res)
business_req_deviation = clean_json_string(model_res1) business_req_deviation = clean_json_string(model_res1)
prompt_template2 = """以下文本是项目采购需求的商务要求部分,请你帮我从键值列表中各字符串中提取带星★或带三角▲的要求项,你的返回格式同输入文本格式,外键名为'商务要求带星',键值为字符串列表,其中每个字符串为带星★或带三角▲的要求项。 prompt_template2 = """以下文本是项目采购需求的商务要求部分,请你帮我从键值列表中各字符串中提取带星★或带三角▲的要求项,你的返回格式同输入文本格式,外键名为'商务要求带星',键值为字符串列表,其中每个字符串为带星★或带三角▲的要求项。
要求与指南 要求与指南
1. 每个星或三角要求占据一个字符串 1. 每个星或三角要求占据一个字符串
2. 若没有带星或带三角的要求项键值为空列表[] 2. 若没有带星或带三角的要求项键值为空列表[]无需返回其他说明性描述
特殊情况处理 特殊情况处理
对于输入类似于'技术要求中带★条款项不满足的视为无效投标'这种描述带星或带三角的响应情况的它本身不是带星或带三角的要求因此不需要添加进字符串列表中仅需把本身是带或带三角的要求添加进来 对于输入类似于'技术要求中带★条款项不满足的视为无效投标'这种描述带星或带三角的响应情况的它本身不是带星或带三角的要求因此不需要添加进字符串列表中仅需把本身是带或带三角的要求添加进来
@ -243,17 +276,19 @@ def extract_business_deviation(procurement):
文本内容{full_text} 文本内容{full_text}
""" """
user_query2 = prompt_template2.format(full_text=model_res1) user_query2 = prompt_template2.format(full_text=model_res1)
model_res2 = doubao_model(user_query2) model_res2 = qianwen_plus(user_query2)
business_star_req_deviation = clean_json_string(model_res2) business_star_req_deviation = clean_json_string(model_res2)
return business_req_deviation, business_star_req_deviation return business_req_deviation, business_star_req_deviation
def get_tech_star_deviation(tech_string): def get_tech_star_deviation(tech_string):
if not tech_string:
return {}
prompt_template = """以下输入文本包含采购货物的技术参数要求或采购要求。请从每个键对应的字符串列表中提取带有星★或三角▲的要求项。返回格式应与输入文本格式相同为JSON格式每个键名保持不变键值为包含对应货物、系统或功能模块的带星或带三角要求项的字符串列表。 prompt_template = """以下输入文本包含采购货物的技术参数要求或采购要求。请从每个键对应的字符串列表中提取带有星★或三角▲的要求项。返回格式应与输入文本格式相同为JSON格式每个键名保持不变键值为包含对应货物、系统或功能模块的带星或带三角要求项的字符串列表。
要求与指南 要求与指南
1. 如果某个货物系统或功能模块下没有带星或带三角的要求项则不返回该键值对 1. 如果某个货物系统或功能模块下没有带星或带三角的要求项则不返回该键值对
2. 每个带星或带三角的要求项应作为单独的字符串 2. 每个带星或带三角的要求项应作为单独的字符串
3. 如果所有设备系统或功能模块中都没有带星或带三角的要求项则直接返回空字典 {{}} 3. 如果所有设备系统或功能模块中都没有带星或带三角的要求项则直接返回空字典 {{}}无需返回其他说明性描述
### 示例输入1如下 ### 示例输入1如下
{{ {{
@ -298,19 +333,52 @@ def get_tech_star_deviation(tech_string):
输入文本内容{full_text} 输入文本内容{full_text}
""" """
user_query = prompt_template.format(full_text=tech_string) user_query = prompt_template.format(full_text=tech_string)
model_res = doubao_model(user_query) model_res = qianwen_plus(user_query)
# print(model_res) # print(model_res)
tech_star_deviation = clean_json_string(model_res) tech_star_deviation = clean_json_string(model_res)
filtered_dict = {key: value for key, value in tech_star_deviation.items() if value} #过滤键值为空列表,二重保险。 filtered_dict = {key: value for key, value in tech_star_deviation.items() if value} #过滤键值为空列表,二重保险。
return filtered_dict return filtered_dict
def process_functions_in_parallel(tech_deviation_info, procurement_reqs, zige_info, fuhe_info, zigefuhe_info): def get_proof_materials(all_data_info):
prompt_template = """以下文本是从招标文件中摘取的资格审查、采购需求、商务条款、技术评分相关内容。请根据这些内容,提取并列出投标人需要提交的证明材料。
格式要求
请以 JSON 格式返回结果
- 键名为 '证明材料'
- 键值为字符串列表其中每个字符串表示投标人需要提交的一份材料
要求与指南
1. 仅提取与投标人需要提交的材料相关的信息忽略无关内容
2. 返回的内容尽量与文本一致
3. 不需要包含重复项每份材料只需列出一次
4. 在提取采购需求部分的材料时应明确对应的设备或货物名称并通过冒号:连接例如
- "发射器:内部机构实用新型专利证书及外观专利证书"
- "发射器:外壳需有正规厂家世标认证"
示例输出仅供格式参考
{{
"证明材料":[
"具有独立承担民事责任的能力;",
"具有良好的商业信誉和健全的财务会计制度;",
"发射器:内部机构实用新型专利证书及外观专利证书",
"发射器:外壳需有正规厂家世标认证"
]
}}
输入文本{full_text}
"""
user_query=prompt_template.format(full_text=all_data_info)
# print(user_query)
model_res=qianwen_plus(user_query)
proof_materials = clean_json_string(model_res)
return proof_materials
def process_functions_in_parallel(tech_deviation_info, busi_requirements_dict, zige_info, fuhe_info, zigefuhe_info,all_data_info):
# 准备输入参数 # 准备输入参数
# 定义任务和对应参数 # 定义任务和对应参数
tasks = [ tasks = [
("tech_star_deviation", get_tech_star_deviation, (tech_deviation_info,)), ("tech_star_deviation", get_tech_star_deviation, (tech_deviation_info,)),
("business_deviation_and_star", extract_business_deviation, (procurement_reqs,)), ("business_deviation_and_star", extract_business_deviation, (busi_requirements_dict,)),
("zigefuhe_deviation", extract_zige_deviation_table, (zige_info, fuhe_info, zigefuhe_info)), ("zigefuhe_deviation", extract_zige_deviation_table, (zige_info, fuhe_info, zigefuhe_info)),
("proof_materials", get_proof_materials, (all_data_info,))
] ]
results = {} results = {}
@ -337,11 +405,13 @@ def process_functions_in_parallel(tech_deviation_info, procurement_reqs, zige_in
results.get("tech_star_deviation"), results.get("tech_star_deviation"),
results.get("business_deviation"), results.get("business_deviation"),
results.get("business_star_deviation"), results.get("business_star_deviation"),
results.get("zigefuhe_deviation") results.get("zigefuhe_deviation"),
results.get("proof_materials")
) )
def get_tech_and_business_deviation(file_path,file_type,unique_id,output_folder): def get_tech_and_business_deviation(file_path,file_type,unique_id,output_folder,zb_type=2):
global logger global logger
logger = get_global_logger(unique_id) logger = get_global_logger(unique_id)
# 第一步:根据文件类型进行转换
if file_type == 1: # docx if file_type == 1: # docx
docx_path=file_path docx_path=file_path
pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理 pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理
@ -355,37 +425,89 @@ def get_tech_and_business_deviation(file_path,file_type,unique_id,output_folder)
else: else:
logger.error("Unsupported file type provided. Preprocessing halted.") logger.error("Unsupported file type provided. Preprocessing halted.")
return None return None
selections=[1,3,5] # 第二步根据zb_type确定选择项和类别并截取PDF
files=truncate_pdf_multiple(pdf_path,output_folder,logger,'goods',selections) if zb_type == 2:
notice_path=files[0] selections = [1, 2, 3, 5]
qualification_file=files[1] category = 'goods' # 商品类
procurement_file=files[2] else:
invalid_path=files[-2] selections = [1, 2, 3]
category = 'engineering' # 工程类
try:
files = truncate_pdf_multiple(pdf_path, output_folder, logger, category, selections)
except Exception as e:
logger.error(f"PDF截取过程中出错: {e}")
return None
# 根据zb_type分配路径
notice_path = files[0] if len(files) > 0 else ""
evaluation_path = files[1] if len(files) > 1 else ""
qualification_path = files[2] if len(files) > 2 else ""
procurement_path = files[3] if zb_type == 2 and len(files) > 3 else ""
invalid_path = files[-2]
# invalid_path=docx_path # invalid_path=docx_path
invalid_path=docx_path if docx_path != "" else invalid_path #可能是pdf docx invalid_path = docx_path if docx_path else invalid_path
if not procurement_file: if zb_type == 2 and not procurement_path:
procurement_file=invalid_path procurement_path = invalid_path
if not evaluation_path:
tech_deviation={} evaluation_path=invalid_path
with concurrent.futures.ThreadPoolExecutor() as executor: if not notice_path:
# 提交任务到线程池 notice_path=invalid_path
future_procurement = executor.submit(fetch_procurement_reqs, procurement_file, invalid_path) if zb_type != 2:
time.sleep(1)
future_review = executor.submit(combine_qualification_review, invalid_path, qualification_file, notice_path)
try: try:
# 获取函数执行结果 qualification_path = merge_pdfs(
procurement_reqs = future_procurement.result() [qualification_path, evaluation_path],
os.path.join(output_folder, "merged_qualification.pdf")
)
except Exception as e:
logger.error(f"PDF合并过程中出错: {e}")
qualification_path = ""
tech_deviation={}
with concurrent.futures.ThreadPoolExecutor() as executor:
# 使用字典存储future对象
futures = {}
# 根据zb_type条件提交fetch_procurement_reqs任务
if zb_type == 2:
futures['procurement'] = executor.submit(fetch_procurement_reqs, procurement_path, invalid_path)
else:
# 当zb_type != 2时设置默认空值
futures['procurement'] = concurrent.futures.Future()
futures['procurement'].set_result({}) # 设为默认空字典
# 提交combine_qualification_review任务
futures['review'] = executor.submit(combine_qualification_review, invalid_path, qualification_path, notice_path)
# 提交combine_evaluation_standards任务
futures['evaluation'] = executor.submit(combine_evaluation_standards, evaluation_path, invalid_path, zb_type)
# 获取并处理任务结果
try:
procurement_reqs = futures['procurement'].result()
except Exception as e: except Exception as e:
logger.error(f'fetch_procurement_reqs 出现异常: {e}') logger.error(f'fetch_procurement_reqs 出现异常: {e}')
procurement_reqs = {} # 或根据需要进行处理 procurement_reqs = {} # 根据需要处理
try: try:
review_standards_res = future_review.result() review_standards_res = futures['review'].result()
except Exception as e: except Exception as e:
logger.error(f'combine_qualification_review 出现异常: {e}') logger.error(f'combine_qualification_review 出现异常: {e}')
review_standards_res = {} # 或根据需要进行处理 review_standards_res = {} # 根据需要处理
try:
evaluation_res = futures['evaluation'].result()
except Exception as e:
logger.error(f'combine_evaluation_standards 出现异常: {e}')
evaluation_res = {
"技术评分": {},
"商务评分": {}
}
evaluation_info=json.dumps(evaluation_res, ensure_ascii=False, indent=4)
# technical_standards = {"技术评分": evaluation_res.get("技术评分", {})} #技术评议表
# commercial_standards = {"商务评分": evaluation_res.get("商务评分", {})} #商务评议表
tech_requirements = get_nested(procurement_reqs, ["采购需求"], {}) tech_requirements = get_nested(procurement_reqs, ["采购需求"], {})
busi_requirements = {k: v for k, v in procurement_reqs.items() if k != "采购需求"}
busi_requirements_info=json.dumps(busi_requirements,ensure_ascii=False,indent=4)
if tech_requirements: if tech_requirements:
good_list = tech_requirements.pop('货物列表', []) # 如果 '货物列表' 不存在,返回 [] good_list = tech_requirements.pop('货物列表', []) # 如果 '货物列表' 不存在,返回 []
logger.info("Collected good_list from the processing function: %s", good_list) logger.info("Collected good_list from the processing function: %s", good_list)
@ -395,20 +517,22 @@ def get_tech_and_business_deviation(file_path,file_type,unique_id,output_folder)
tech_deviation_info="" tech_deviation_info=""
zige_info, fuhe_info, zigefuhe_info = prepare_for_zige_info(review_standards_res.get("资格审查", {})) zige_info, fuhe_info, zigefuhe_info = prepare_for_zige_info(review_standards_res.get("资格审查", {}))
tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation = process_functions_in_parallel( all_data_info = '\n'.join([zige_info, fuhe_info, zigefuhe_info, tech_deviation_info,busi_requirements_info, evaluation_info])
tech_star_deviation, business_deviation, business_star_deviation, zigefuhe_deviation, proof_materials= process_functions_in_parallel(
tech_deviation_info=tech_deviation_info, tech_deviation_info=tech_deviation_info,
procurement_reqs=procurement_reqs, busi_requirements_dict=busi_requirements,
zige_info=zige_info, zige_info=zige_info,
fuhe_info=fuhe_info, fuhe_info=fuhe_info,
zigefuhe_info=zigefuhe_info zigefuhe_info=zigefuhe_info,
all_data_info=all_data_info
) )
return tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation return tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation,proof_materials
if __name__ == "__main__": if __name__ == "__main__":
file_path=r"C:\Users\Administrator\Desktop\fsdownload\5950ad84-30c8-4643-b6de-b13ef5be7a5c\ztbfile.pdf" file_path=r"C:\Users\Administrator\Desktop\new招标文件\工程标\gcHBDL-2024-0017-001-招标文件.pdf"
file_type=2 file_type=2
output_folder = r"C:\Users\Administrator\Desktop\fsdownload\5950ad84-30c8-4643-b6de-b13ef5be7a5c\tmp" output_folder = r"C:\Users\Administrator\Desktop\new招标文件\工程标\tmp"
tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation=get_tech_and_business_deviation(file_path,file_type,"123",output_folder) tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation,proof_materials=get_tech_and_business_deviation(file_path,file_type,"123",output_folder,1)
print("技术偏离表") print("技术偏离表")
print(json.dumps(tech_deviation,ensure_ascii=False,indent=4)) print(json.dumps(tech_deviation,ensure_ascii=False,indent=4))
print("技术带星") print("技术带星")
@ -419,3 +543,5 @@ if __name__ == "__main__":
print(json.dumps(business_star_deviation, ensure_ascii=False, indent=4)) print(json.dumps(business_star_deviation, ensure_ascii=False, indent=4))
print("资格审查") print("资格审查")
print(json.dumps(zigefuhe_deviation, ensure_ascii=False, indent=4)) print(json.dumps(zigefuhe_deviation, ensure_ascii=False, indent=4))
print("证明材料")
print(json.dumps(proof_materials,ensure_ascii=False,indent=4))

View File

@ -279,8 +279,6 @@ def goods_bid_main(output_folder, file_path, file_type, unique_id):
# 2.废标项这边,考虑大模型+正则并用 # 2.废标项这边,考虑大模型+正则并用
# 3.限制评分项的因素。 # 3.限制评分项的因素。
#TODO:评分、开评定标这边也加上超长逻辑
if __name__ == "__main__": if __name__ == "__main__":
# 配置日志器 # 配置日志器

View File

@ -2,108 +2,111 @@ import json
from flask_app.货物标.技术参数要求提取后处理函数 import extract_matching_keys from flask_app.货物标.技术参数要求提取后处理函数 import extract_matching_keys
def extract_matching_keys(data, good_list, special_keys=None, parent_key=''): import re
import re from collections import defaultdict
from collections import defaultdict
def get_suffix(n): #12.27之前版本
""" # def extract_matching_keys(data, good_list, special_keys=None, parent_key=''):
根据数字n返回对应的字母后缀 # def get_suffix(n):
1 -> 'a', 2 -> 'b', ..., 26 -> 'z', 27 -> 'aa', 28 -> 'ab', ... # """
""" # 根据数字n返回对应的字母后缀。
suffix = '' # 1 -> 'a', 2 -> 'b', ..., 26 -> 'z', 27 -> 'aa', 28 -> 'ab', ...
while n > 0: # """
n, r = divmod(n - 1, 26) # suffix = ''
suffix = chr(97 + r) + suffix # while n > 0:
return suffix # n, r = divmod(n - 1, 26)
# suffix = chr(97 + r) + suffix
# return suffix
#
# def count_matching_keys(data, patterns, special_keys, counter=None):
# """递归统计匹配键的出现次数,仅统计值为列表的键"""
# if counter is None:
# counter = defaultdict(int)
#
# if isinstance(data, dict):
# for key, value in data.items():
# clean_key = key.replace(" ", "") # 去除键中的空格
# if isinstance(value, list):
# if clean_key not in special_keys and any(pattern.match(clean_key) for pattern in patterns):
# counter[clean_key] += 1
# elif isinstance(value, dict):
# count_matching_keys(value, patterns, special_keys, counter)
# elif isinstance(data, list):
# for item in data:
# if isinstance(item, (dict, list)):
# count_matching_keys(item, patterns, special_keys, counter)
#
# return counter
#
# def process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key):
# """递归处理数据并构建结果"""
#
# def get_suffix_label(key):
# suffix_map[key] += 1
# return get_suffix(suffix_map[key])
#
# if isinstance(data, dict):
# for key, value in data.items():
# clean_key = key.replace(" ", "") # 去除键中的空格
# if isinstance(value, list):
# # 处理值为列表的键
# if any(pattern.match(clean_key) for pattern in patterns):
# # 检查是否以特殊符号开头
# if clean_key.startswith(('▲', '★','●','■','◆','☆','△','◇','○','□')):
# symbol = clean_key[0]
# stripped_key = clean_key[1:]
# new_key = generate_key(stripped_key, parent_key, key_counter, suffix_map, special_keys)
# # 将符号添加到每个字符串的开头
# new_value = [symbol + item for item in value]
# filtered_data[new_key] = new_value
# else:
# new_key = generate_key(clean_key, parent_key, key_counter, suffix_map, special_keys)
# filtered_data[new_key] = value
# elif isinstance(value, dict):
# # 继续递归处理嵌套字典
# new_parent_key = clean_key if parent_key == '' else f"{parent_key}的{clean_key}"
# process_data(value, patterns, special_keys, key_counter, suffix_map,
# filtered_data, new_parent_key)
# elif isinstance(data, list):
# for item in data:
# if isinstance(item, (dict, list)):
# process_data(item, patterns, special_keys, key_counter, suffix_map,
# filtered_data, parent_key)
#
# def generate_key(key, parent_key, key_counter, suffix_map, special_keys):
# """生成新的键名"""
# if key in special_keys and parent_key:
# return f"{parent_key}的{key}"
# elif key_counter[key] > 1:
# suffix = get_suffix(suffix_map[key] + 1)
# suffix_map[key] += 1
# return f"{key}-{suffix}"
# return key
#
# if special_keys is None:
# special_keys = ["系统功能"] # 默认值为 ["系统功能"]
#
# # 去除 good_list 中的空格
# clean_good_list = [g.replace(" ", "") for g in good_list]
#
# # 构建匹配的正则表达式
# patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in clean_good_list]
#
# # 先统计所有匹配键的出现次数,仅统计值为列表的键
# key_counter = count_matching_keys(data, patterns, special_keys)
#
# # 初始化后缀映射
# suffix_map = {key: 0 for key, count in key_counter.items() if count > 1}
#
# # 用于存储最终结果
# filtered_data = {}
#
# # 递归处理数据
# process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key)
#
# return filtered_data
def count_matching_keys(data, patterns, special_keys, counter=None):
"""递归统计匹配键的出现次数,仅统计值为列表的键"""
if counter is None:
counter = defaultdict(int)
if isinstance(data, dict):
for key, value in data.items():
clean_key = key.replace(" ", "") # 去除键中的空格
if isinstance(value, list):
if clean_key not in special_keys and any(pattern.match(clean_key) for pattern in patterns):
counter[clean_key] += 1
elif isinstance(value, dict):
count_matching_keys(value, patterns, special_keys, counter)
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
count_matching_keys(item, patterns, special_keys, counter)
return counter
def process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key):
"""递归处理数据并构建结果"""
def get_suffix_label(key):
suffix_map[key] += 1
return get_suffix(suffix_map[key])
if isinstance(data, dict):
for key, value in data.items():
clean_key = key.replace(" ", "") # 去除键中的空格
if isinstance(value, list):
# 处理值为列表的键
if any(pattern.match(clean_key) for pattern in patterns):
# 检查是否以特殊符号开头
if clean_key.startswith(('', '','','','','','','','','')):
symbol = clean_key[0]
stripped_key = clean_key[1:]
new_key = generate_key(stripped_key, parent_key, key_counter, suffix_map, special_keys)
# 将符号添加到每个字符串的开头
new_value = [symbol + item for item in value]
filtered_data[new_key] = new_value
else:
new_key = generate_key(clean_key, parent_key, key_counter, suffix_map, special_keys)
filtered_data[new_key] = value
elif isinstance(value, dict):
# 继续递归处理嵌套字典
new_parent_key = clean_key if parent_key == '' else f"{parent_key}{clean_key}"
process_data(value, patterns, special_keys, key_counter, suffix_map,
filtered_data, new_parent_key)
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
process_data(item, patterns, special_keys, key_counter, suffix_map,
filtered_data, parent_key)
def generate_key(key, parent_key, key_counter, suffix_map, special_keys):
"""生成新的键名"""
if key in special_keys and parent_key:
return f"{parent_key}{key}"
elif key_counter[key] > 1:
suffix = get_suffix(suffix_map[key] + 1)
suffix_map[key] += 1
return f"{key}-{suffix}"
return key
if special_keys is None:
special_keys = ["系统功能"] # 默认值为 ["系统功能"]
# 去除 good_list 中的空格
clean_good_list = [g.replace(" ", "") for g in good_list]
# 构建匹配的正则表达式
patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in clean_good_list]
# 先统计所有匹配键的出现次数,仅统计值为列表的键
key_counter = count_matching_keys(data, patterns, special_keys)
# 初始化后缀映射
suffix_map = {key: 0 for key, count in key_counter.items() if count > 1}
# 用于存储最终结果
filtered_data = {}
# 递归处理数据
process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key)
return filtered_data
def test_extract_matching_keys(): def test_extract_matching_keys():
# 定义测试数据 # 定义测试数据
@ -133,37 +136,14 @@ def test_extract_matching_keys():
"1 、主体钢架结构及定制型材;", "1 、主体钢架结构及定制型材;",
"2 、确保楼层承受力许可,按需加固楼层地面;", "2 、确保楼层承受力许可,按需加固楼层地面;",
"3 、钢结构。" "3 、钢结构。"
],
"电缆及信号线缆": [
"1 、配套所需控制网线 、高清视频线缆 、 电源线缆等适配。"
],
"控制终端": [
"1 、处理器: 八核心 16 线程;",
"2 、显卡: 8G/DDR6/PCI Express 4.0 16X",
"3 、 内存: ≥8G DDR4 内存;",
"4 、硬盘: SSD 固态硬盘(容量≥480G)",
"5 、接口: 音频/网络/HDMI 接口/9 针串口;",
"6 、显示器: 21.5 英寸。"
] ]
}, },
"摄像系统": { "摄像系统": {
"系统功能": ["a", "b"], "系统功能": ["a", "b"],
"★高清摄像机": [ "☆钢结构底座及铝型材支架": [
"1 、成像器件: 1/2.8 Exmor CMOS", "1 、主体钢架结构及定制型材wwww",
"2 、镜头: 30 倍光学f=4.3mm to 129mm ", "2 、确保楼层承受力许可,按需加固楼层地面;",
"3 、水平视角: 63.7 ° ;", "3 、钢结构。"
"4、视频输出格式1080P/601 080P/501080P/301080P/251080i/60 720p/60",
"5 、视频输出: 3G-SDI HDMI CVBS,IP (可同步输出 ",
"6 、真双输出: IP 和 SDI 视频格式可以独立设置;",
"7 、控制方式: RS232 / RS422 / RS485 IP/Onvif/Visca-over-IP IP 控制软件, 红外遥控器;",
"8 、IP 最高 1080p60 支持 H.264/H.265/MJPEG",
"9 、支持 Tally 灯;",
"10 、支持独立 PoE+IEEE 802.3 at 和 DC 12V 电源;",
"11 、扩展存储: Micro SD,最高支持 128GB。"
],
"摄像机三脚架": [
"1 、铝合金材质, 承重 2-10Kg",
"2 、满足高清摄像机承重 、尺寸要求。"
] ]
}, },
"视频处理系统": { "视频处理系统": {
@ -194,106 +174,6 @@ def test_extract_matching_keys():
"画面预览使用, 具有画中画 、独立单画面放大功能。" "画面预览使用, 具有画中画 、独立单画面放大功能。"
] ]
}, },
"发言系统": {
"数字会议发言主机": [
"1 、标准挂载单元数量: 4 路总线接口, 单路可连接 32 个 最多系统可挂载 128 个会议单元, 且最远线路长度可高达 100 米;",
"2、主机面板彩屏显示系统菜单通过设置可设 定 1/2/4/6 发言数量;",
"3 、支持先入先出模式, 后入后出模式, 限制模式, 电脑/主席允许 模式, 自由讨论模式;",
"4 、可直接控制最多三个摄像球, 完成视频会议功能;",
"5、多种输入输出接口主输入、卡座输入和前置输出、辅助输出及录音输出接口",
"6 、带有 RS 232 视频控制输出 口, 可以直接输出派尔高-P 派尔高 -D VISCA 控制协议, 控制最大 3 个摄像机, 完成摄像自动跟踪;",
"7 、 内置 4 切 1 视频切换器, 用于摄像机的视频 接连;",
"8 、可以响应处理话筒的会议中服务的请求;",
"9 、 内置签到表决功能, 可以配合话筒进行签到表决;",
"10 、 内置 DSP 自适应音频处理器,可以最大可能的抑制声回输。"
],
"方形短杆代表话筒": [
"1 、超大静音开关设计;",
"2 、会议操作系统,全新的触摸操控技术, 2.8 英寸的彩色触摸屏幕;",
"3 、超短全金属短咪杆设计;",
"4 、高灵敏度咪芯设计,拾音距离≥80 cm ",
"5 、红色雾面指示灯设计, 指示发言状态;",
"6 、支持视像跟踪;",
"7、配合主机 可以实现先入先出,后入后出, 限制模式,主席允许模式, 自由讨论模式;",
"8、话筒的身份可以自行设定可以通过主机设置改变话筒身份在 代表, 主席, VIP 自由切换, 让使用更灵活多样, 满足 高端需求;",
"9 、长距离传输对音质不会有影响; 具备超强的抗手机 RF 干扰性。"
],
"专用连接线缆": [
"主机与话筒专用连接线缆, 长度≥30m。"
],
"手持无线话筒": [
"1 、含一台接收机, 两个无线手持话筒发射器;",
"2 、频率响应: 50Hz-18KHz",
"3 、有效使用距离≥100 米;",
"4 、信噪比≥105dB(1KHz-A)",
"5 、灵敏度: -105dBm(12dB S/N AD)。"
]
},
"视频会议系统": {
"▲多点控制器": [
"1 、遵循 H.323 、H.320 、SIP 标准协议;",
"2 、支持 H.265 H.264 HP H.264 编解码标准。",
"3 、支持不低于 25 分屏高清多画面;",
"4 、最大线路速率: 8M",
"5、视频抗丢包能力支持高至 60%丢包率情况下 图像流畅无马赛克;音频抗 IP 网络丢包能力:支持高至 75%丢包 率情况下,声音清晰流畅; 会议抗 IP 网络丢包能力:支持高至 70%丢包率情况下,会 议仍可正常召开。 以上 5 项参数需提供第三方检测机构检验 报告。"
],
"★多串口控制服务器": [
"1 、具有高速数据处理能力, 内嵌高速嵌入式 CPU ",
"2 、提供 16 路一控多 、多控一;",
"3 、具有多种转发机制, 支持 IP 、串口间双向转发机制;",
"4、控制会议矩阵、会议摄像机外围设备串口设备实现对会议系统设备的控制"
],
"★综合会议管理调度平台": [
"1、含硬件终端和视频会议专用软件用于控制会议、矩阵、会议摄像机实现与省厅 、 随州市综合会议管理调度平台对接 、融合, 互联互通;",
"2 、统一调度管理平台, 根据业务需要, 可互为控制 、互为 备份;",
"3、可以与原有的主控平台互为操作、实现控制备份保证会议正常召开 需在设计方案中详细阐明如何实现;",
"4、实现对会议设备的整合控制采用一键拖拉式操作软件界面友好 、操作管理简易 、直观;",
"5 、可在综合会议管理平台实现四画面预览各分会场及中心视频信 号;",
"6 、提供软件著作权证书。"
],
"65寸电视机移动推车(9楼)": [
"1 、全钢结构, 满足 70 寸电视承重安装要求;",
"2 、承载: 200Kg",
"3 、轮子带自锁刹车功能。"
],
"65寸液晶电视机(分会场)": [
"1 、屏幕尺寸: 65 英寸; 含挂架及安装;",
"2 、背光类型: LED",
"3 、屏幕分辨率: 超高清 4K 3840 ×2 160",
"4 、支持 HDR 显示;",
"5 、CPU Cortex A55 四核;",
"6 、接口: USB2.0 ×2 、HDMI2.0 ×2",
"7 、 网络连接方式: 无线/网线。"
],
"控制平板及软件": [
"10.2 寸无线触摸屏, 含控制软件, 实现远程一键式控制 、视频会议调度。"
],
"鹅颈话筒": [
"1 、采样率: 48kHz",
"2 、频响: 20Hz 20kHz",
"3 、灵敏度: 38 ±2dB",
"4 、拾音距离: 20-50CM;含接头 、线缆, 线缆 长度≥3.5m",
"5 、支持终端远程供电, 无需外接电源。"
]
},
"辅助系统": {
"时序电源": [
"1、具有 12 路 1KW 电源;",
"2、具有电压表指示 支持串口控制;",
"3、采用触点闭合控制功能",
"4、具有过压 、过流保护。"
],
"多媒体地插盒": [
"1 、具有至少 1 路 HDMI 、 1 路电源 、2 路网络接口模块;",
"2 、采用优质接插件。"
],
"线材辅料": [
"采用专用线材 、材料 、接口 、各种辅料等。"
],
"墙体拆除及修复": [
"对大屏安装区域墙体 、天花进行拆除及修复。"
]
}
} }
} }
good_list = [ good_list = [

View File

@ -145,7 +145,7 @@ def get_base_info(merged_baseinfo_path,clause_path):
baseinfo_list.append(merged) baseinfo_list.append(merged)
judge_file_path = 'flask_app/static/提示词/是否相关问题货物标.txt' judge_file_path = 'flask_app/static/提示词/是否相关问题货物标.txt'
# judge_file_path =r'D:\flask_project\flask_app\static\提示词\是否相关问题货物标.txt' # judge_file_path =r'D:\flask_project\flask_app\static\提示词\是否相关问题货物标.txt'
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交两个任务 # 提交两个任务
future1 = executor.submit(process_judge_questions, judge_file_path, chosen_numbers, file_id, future1 = executor.submit(process_judge_questions, judge_file_path, chosen_numbers, file_id,
baseinfo_list) baseinfo_list)

View File

@ -1,7 +1,7 @@
import json import json
import re import re
from collections import defaultdict from collections import defaultdict
#传输技术参数需求的时候后处理 #传输技术参数需求的时候后处理 12.27版本,对重复的键名,若键值一样,不添加后缀-a -b..
def extract_matching_keys(data, good_list, special_keys=None, parent_key=''): def extract_matching_keys(data, good_list, special_keys=None, parent_key=''):
def get_suffix(n): def get_suffix(n):
""" """
@ -14,70 +14,97 @@ def extract_matching_keys(data, good_list, special_keys=None, parent_key=''):
suffix = chr(97 + r) + suffix suffix = chr(97 + r) + suffix
return suffix return suffix
def count_matching_keys(data, patterns, special_keys, counter=None): def count_matching_keys(data, patterns, special_keys, key_value_map=None):
"""递归统计匹配键的出现次数,仅统计值为列表的键""" """
if counter is None: 递归统计匹配键的出现次数及其对应的唯一值仅统计值为列表的键
counter = defaultdict(int) 不包括 special_keys 中的键
"""
if key_value_map is None:
key_value_map = defaultdict(list)
if isinstance(data, dict): if isinstance(data, dict):
for key, value in data.items(): for key, value in data.items():
clean_key = key.replace(" ", "") # 去除键中的空格 clean_key = key.replace(" ", "") # 去除键中的空格
if isinstance(value, list): if isinstance(value, list):
if clean_key not in special_keys and any(pattern.match(clean_key) for pattern in patterns): if clean_key not in special_keys and any(pattern.match(clean_key) for pattern in patterns):
counter[clean_key] += 1 value_tuple = tuple(value)
if value_tuple not in key_value_map[clean_key]:
key_value_map[clean_key].append(value_tuple)
elif isinstance(value, dict): elif isinstance(value, dict):
count_matching_keys(value, patterns, special_keys, counter) count_matching_keys(value, patterns, special_keys, key_value_map)
elif isinstance(data, list): elif isinstance(data, list):
for item in data: for item in data:
if isinstance(item, (dict, list)): if isinstance(item, (dict, list)):
count_matching_keys(item, patterns, special_keys, counter) count_matching_keys(item, patterns, special_keys, key_value_map)
return counter return key_value_map
def process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key): def assign_suffixes(key_value_map):
"""
为每个键的每个唯一值分配后缀
返回一个字典键为原键名值为另一个字典键为值元组值为对应的后缀如果需要
"""
suffix_assignment = defaultdict(dict)
for key, values in key_value_map.items():
if len(values) == 1:
suffix_assignment[key][values[0]] = '' # 只有一个唯一值,不需要后缀
else:
for idx, val in enumerate(values, start=1):
if idx == 1:
suffix = '' # 第一个唯一值不添加后缀
else:
suffix = '-' + get_suffix(idx - 1) # 从 '-a' 开始
suffix_assignment[key][val] = suffix
return suffix_assignment
def process_data(data, patterns, special_keys, suffix_assignment, filtered_data, parent_key):
"""递归处理数据并构建结果""" """递归处理数据并构建结果"""
def get_suffix_label(key):
suffix_map[key] += 1
return get_suffix(suffix_map[key])
if isinstance(data, dict): if isinstance(data, dict):
for key, value in data.items(): for key, value in data.items():
clean_key = key.replace(" ", "") # 去除键中的空格 clean_key = key.replace(" ", "") # 去除键中的空格
current_parent_key = clean_key if parent_key == '' else f"{parent_key}{clean_key}"
if isinstance(value, list): if isinstance(value, list):
# 处理值为列表的键 if clean_key in special_keys:
if any(pattern.match(clean_key) for pattern in patterns): # 处理 special_keys前缀父键路径
new_key = current_parent_key
filtered_data[new_key] = value
elif any(pattern.match(clean_key) for pattern in patterns):
# 处理普通匹配键
# 检查是否以特殊符号开头 # 检查是否以特殊符号开头
if clean_key.startswith(('', '','','','','','','','','')): if clean_key.startswith(('', '','','','','','','','','')):
symbol = clean_key[0] symbol = clean_key[0]
stripped_key = clean_key[1:] stripped_key = clean_key[1:]
new_key = generate_key(stripped_key, parent_key, key_counter, suffix_map, special_keys) value_tuple = tuple(value)
suffix = suffix_assignment.get(stripped_key, {}).get(value_tuple, '')
if suffix:
new_key = f"{stripped_key}{suffix}"
else:
new_key = stripped_key
# 将符号添加到每个字符串的开头 # 将符号添加到每个字符串的开头
new_value = [symbol + item for item in value] new_value = [symbol + item for item in value]
filtered_data[new_key] = new_value filtered_data[new_key] = new_value
else: else:
new_key = generate_key(clean_key, parent_key, key_counter, suffix_map, special_keys) # 获取当前值的后缀
value_tuple = tuple(value)
suffix = suffix_assignment.get(clean_key, {}).get(value_tuple, '')
if suffix:
new_key = f"{clean_key}{suffix}"
else:
new_key = clean_key
filtered_data[new_key] = value filtered_data[new_key] = value
elif isinstance(value, dict): elif isinstance(value, dict):
# 继续递归处理嵌套字典 # 继续递归处理嵌套字典
new_parent_key = clean_key if parent_key == '' else f"{parent_key}{clean_key}" process_data(value, patterns, special_keys, suffix_assignment, filtered_data, current_parent_key)
process_data(value, patterns, special_keys, key_counter, suffix_map,
filtered_data, new_parent_key)
elif isinstance(data, list): elif isinstance(data, list):
for item in data: for item in data:
if isinstance(item, (dict, list)): if isinstance(item, (dict, list)):
process_data(item, patterns, special_keys, key_counter, suffix_map, process_data(item, patterns, special_keys, suffix_assignment, filtered_data, parent_key)
filtered_data, parent_key)
def generate_key(key, parent_key, key_counter, suffix_map, special_keys): def generate_patterns(good_list):
"""生成新的键名""" """生成匹配的正则表达式列表"""
if key in special_keys and parent_key: return [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in good_list]
return f"{parent_key}{key}"
elif key_counter[key] > 1:
suffix = get_suffix(suffix_map[key] + 1)
suffix_map[key] += 1
return f"{key}-{suffix}"
return key
if special_keys is None: if special_keys is None:
special_keys = ["系统功能"] # 默认值为 ["系统功能"] special_keys = ["系统功能"] # 默认值为 ["系统功能"]
@ -86,23 +113,24 @@ def extract_matching_keys(data, good_list, special_keys=None, parent_key=''):
clean_good_list = [g.replace(" ", "") for g in good_list] clean_good_list = [g.replace(" ", "") for g in good_list]
# 构建匹配的正则表达式 # 构建匹配的正则表达式
patterns = [re.compile(r'^' + re.escape(g) + r'(?:-\d+)?$') for g in clean_good_list] patterns = generate_patterns(clean_good_list)
# 先统计所有匹配键的出现次数,仅统计值为列表的键 # 先统计所有匹配键的出现次数及其对应的唯一值,仅统计值为列表的键
key_counter = count_matching_keys(data, patterns, special_keys) key_value_map = count_matching_keys(data, patterns, special_keys)
# 初始化后缀映射 # 为每个键的唯一值分配后缀
suffix_map = {key: 0 for key, count in key_counter.items() if count > 1} suffix_assignment = assign_suffixes(key_value_map)
# 用于存储最终结果 # 用于存储最终结果
filtered_data = {} filtered_data = {}
# 递归处理数据 # 递归处理数据
process_data(data, patterns, special_keys, key_counter, suffix_map, filtered_data, parent_key) process_data(data, patterns, special_keys, suffix_assignment, filtered_data, parent_key)
return filtered_data return filtered_data
def postprocess(data): def postprocess(data):
"""递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'""" """递归地转换字典中的值为列表,如果所有键对应的值都是'/', '{}''未知'"""
def convert_dict(value): def convert_dict(value):