10.17 小解析货物标

This commit is contained in:
zy123 2024-10-17 15:33:58 +08:00
parent 1453d4ae70
commit eaf9d93e15
14 changed files with 940 additions and 577 deletions

View File

@ -0,0 +1,118 @@
# -*- encoding:utf-8 -*-
import json
import logging
import os
import time
from flask_app.main.format_change import docx2pdf, pdf2docx
from flask_app.main.json_utils import clean_json_string
from flask_app.main.判断是否分包等 import merge_json_to_list, read_questions_from_judge
from flask_app.main.基础信息整合 import judge_consortium_bidding
from flask_app.main.多线程提问 import read_questions_from_file, multi_threading
from flask_app.main.通义千问long import upload_file
from flask_app.货物标.基础信息解析main import aggregate_basic_info_goods
from flask_app.货物标.货物标截取pdf import truncate_pdf_specific
from flask_app.main.post_processing import inner_post_processing
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
logger = None
def get_base_info(baseinfo_file_path):
file_id = upload_file(baseinfo_file_path)
# baseinfo_file_path='flask_app/static/提示词/基本信息货物标.txt'
baseinfo_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\基本信息货物标.txt'
questions = read_questions_from_file(baseinfo_file_path)
more_query = "请你根据招标文件信息,回答以下问题:是否组织踏勘现场?是否召开投标预备会(或投标答疑会)?是否退还投标文件?是否允许分包? 是否需要递交投标保证金或磋商保证金是否需要提交履约保证金或履约担保是否有招标代理服务费或中标、成交服务费请按json格式给我提供信息键名分别为'是否组织踏勘现场','是否召开投标预备会'(或'是否召开投标答疑会','是否退还投标文件',是否允许分包','是否递交投标保证金'(或'是否递交磋商保证金','是否提交履约保证金','是否有招标代理服务费',键值仅限于'','','未知',若存在矛盾信息,请回答'未知'"
questions.append(more_query)
baseinfo_results = multi_threading(questions, "", file_id, 2) # 1代表使用百炼rag 2代表使用qianwen-long
baseinfo_list = [clean_json_string(res) for _, res in baseinfo_results] if baseinfo_results else []
chosen_numbers, merged = merge_json_to_list(baseinfo_list.pop())
baseinfo_list.append(merged)
# judge_file_path = 'flask_app/static/提示词/是否相关问题货物标.txt'
judge_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\是否相关问题货物标.txt'
judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers)
res2 = multi_threading(judge_questions, "", file_id, 2) # 调用千问-long
if not res2:
print("基础信息整合: multi_threading error!")
else:
for question, response in res2:
baseinfo_list.append(clean_json_string(response))
return baseinfo_list
#货物标
def little_parse_goods(output_folder,file_path):
files=truncate_pdf_specific(file_path,output_folder)
baseinfo_list=get_base_info(files[-1])
aggregated_baseinfo = aggregate_basic_info_goods(baseinfo_list)
return {"基础信息": aggregated_baseinfo}
def little_parse_engineering(output_folder,downloaded_filepath):
return True
def little_parse_main(output_folder, file_path, file_type,zb_type,unique_id):
"""
主解析函数根据文件类型和招标类型处理文件并保存结果为JSON文件
参数:
output_folder (str): 输出文件夹路径
file_path (str): 待处理文件的路径
file_type (int): 文件类型1: docx, 2: pdf, 3: doc
zb_type (int): 招标类型2: 货物标, 其他: 工程标
返回:
str: 保存的JSON文件的路径
"""
global logger
logger = get_global_logger(unique_id)
# 根据文件类型处理文件路径
if file_type == 1: # docx
docx_path = file_path
pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = file_path
# docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库
elif file_type == 3: # doc
pdf_path = docx2pdf(file_path)
# docx_path = pdf2docx(pdf_path)
else:
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
# 根据招标类型调用相应的解析函数
if zb_type == 2: # 货物标
combined_data = little_parse_goods(output_folder, pdf_path)
logger.info("Parsed goods data: %s", json.dumps(combined_data, ensure_ascii=False, indent=4))
res = inner_post_processing(combined_data["基础信息"])
else:
combined_data = little_parse_engineering(output_folder, pdf_path)
logger.info("Parsed engineering data: %s", json.dumps(combined_data, ensure_ascii=False, indent=4))
res = inner_post_processing(combined_data["基础信息"])
# 定义保存JSON文件的路径
json_path = os.path.join(output_folder, "final_result.json")
# 将结果保存为JSON文件
with open(json_path, 'w', encoding='utf-8') as json_file:
json.dump(res, json_file, ensure_ascii=False, indent=4)
logger.info(f"Extraction results saved to {json_path}")
# 返回JSON文件的路径
return json_path
if __name__ == "__main__":
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\little_parse_output"
start_time = time.time()
file_type = 2 # 1:docx 2:pdf 3:其他
zb_type=2 #1:工程标 2货物标
input_file = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\6.2定版视频会议磋商文件.pdf"
res=little_parse_main(output_folder, input_file, file_type, zb_type)
print(json.dumps(res, ensure_ascii=False, indent=4))
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time} seconds.")

View File

@ -0,0 +1,196 @@
import re
def inner_post_processing(base_info):
"""
处理 '基础信息' 部分提取所需字段
参数:
base_info (dict): 包含 '基础信息' 的字典
返回:
dict: 提取的信息字典 extracted_info
"""
# 初始化提取的信息字典
extracted_info = {}
# 定义一个辅助函数用于获取嵌套字典中的值
def get_nested(dic, keys, default=None):
for key in keys:
if isinstance(dic, dict):
dic = dic.get(key, default)
else:
return default
return dic
# 定义一个辅助函数用于递归查找包含特定子字符串的键
def find_keys_containing(dic, substring):
found_values = []
if isinstance(dic, dict):
for key, value in dic.items():
if substring in key:
found_values.append(value)
if isinstance(value, dict):
found_values.extend(find_keys_containing(value, substring))
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
found_values.extend(find_keys_containing(item, substring))
return found_values
# 定义一个辅助函数用于根据候选键列表提取值(部分匹配)
def extract_field(contact_info, candidate_keys):
for candidate in candidate_keys:
for key, value in contact_info.items():
if candidate in key and value not in ["未知", ""]:
return value
return ""
# 定义一个辅助函数用于提取 '投标保证金'
def extract_bid_bond(guarantee_info):
# 定义投标保证金的候选键
bid_bond_candidates = ["投标保证金", "磋商保证金"]
# 第一步:查找包含 "投标保证金" 或 "磋商保证金" 的键
for candidate in bid_bond_candidates:
for key, value in guarantee_info.items():
if candidate in key:
if isinstance(value, dict):
# 在嵌套字典中查找包含 "金额" 的键
for sub_key, sub_value in value.items():
if "金额" in sub_key and sub_value not in ["未知", ""]:
return sub_value
elif isinstance(value, str):
if "金额" in key and value not in ["未知", ""]:
return value
else:
# 如果 value 既不是 dict 也不是 str忽略
continue
# 第二步:如果没有找到包含 "金额" 的键,尝试在所有键值中查找符合模式的值
amount_pattern = re.compile(r'(?:\d{1,3}(?:[,]\d{3})*(?:\.\d+)?|\d+(?:\.\d+)?|[\u4e00-\u9fff]+(?:\.\d+)?)\s*(?:元|万元)')
for key, value in guarantee_info.items():
if isinstance(value, str):
match = amount_pattern.search(value)
if match:
return match.group()
elif isinstance(value, dict):
# 递归查找嵌套字典中的金额
found_amount = extract_bid_bond(value)
if found_amount:
return found_amount
# 如果都没有找到,则返回空字符串
return ""
# 定义所需字段的映射关系,暂时不包含'联系人'和'联系电话'以及'招标项目地点'
mapping = {
"代理机构名称": [["招标人/代理信息", "招标代理机构"]],
"招标项目名称": [["项目信息", "项目名称"], ["项目信息", "工程名称"]],
"招标项目编号": [["项目信息", "项目编号"], ["项目信息", "招标编号"]],
"开标时间": [["关键时间/内容", "开标时间"]],
"报名截止日期": [["关键时间/内容", "投标文件递交截止日期"]],
"招标项目预算": [["项目信息", "招标控制价"]],
"招标单位名称": [["招标人/代理信息", "招标人"]],
"招标公告地址": [["关键时间/内容", "信息公示媒介"], ["关键时间/内容", "评标结果公示媒介"]]
}
# 提取并映射字段
for new_key, paths in mapping.items():
value = None
for path in paths:
value = get_nested(base_info, path)
if value:
break
extracted_info[new_key] = value if value else ""
# 特殊处理 '招标项目地点'
# 在 '项目信息' 下查找包含 "地点" 的键
project_info = base_info.get("项目信息", {})
location_candidates = find_keys_containing(project_info, "地点")
if location_candidates:
# 选择第一个找到的地点
extracted_info["招标项目地点"] = location_candidates[0]
else:
extracted_info["招标项目地点"] = ""
# 特殊处理 '联系人' 和 '联系电话'
# 提取 '项目联系方式'
project_contact = get_nested(base_info, ["招标人/代理信息", "项目联系方式"], {})
# 提取 '招标人联系方式'
bidder_contact = get_nested(base_info, ["招标人/代理信息", "招标人联系方式"], {})
# 定义候选键列表,按优先级排序
name_candidates = ["名称", "联系人", "招标"]
phone_candidates = ["电话", "手机", "联系方式"]
# 提取 '联系人'
contact_names = [project_contact, bidder_contact]
contact_name = ""
for contact in contact_names:
extracted_name = extract_field(contact, name_candidates)
if extracted_name:
contact_name = extracted_name
break
extracted_info["联系人"] = contact_name
# 提取 '联系电话'
contact_phones = [project_contact, bidder_contact]
contact_phone = ""
for contact in contact_phones:
extracted_phone = extract_field(contact, phone_candidates)
if extracted_phone:
contact_phone = extracted_phone
break
extracted_info["联系电话"] = contact_phone
# 特殊处理 '投标保证金'
# 提取 '保证金相关'
guarantee_info = get_nested(base_info, ["保证金相关"], {})
extracted_info["投标保证金"] = extract_bid_bond(guarantee_info)
return extracted_info
def outer_post_processing(combined_data, includes):
"""
外层处理函数调用内层 post_processing 处理 '基础信息'并构建 processed_data
参数:
combined_data (dict): 原始合并数据
includes (list): 需要包含的键列表
返回:
tuple: (processed_data, extracted_info)
"""
# 初始化结果字典,预设'其他'分类为空字典
processed_data = {"其他": {}}
# 初始化提取的信息字典
extracted_info = {}
# 检查 '基础信息' 是否在 includes 中
if "基础信息" in includes:
base_info = combined_data.get("基础信息", {})
# 调用内层 post_processing 处理 '基础信息'
extracted_info = inner_post_processing(base_info)
# 将 '基础信息' 保留在处理后的数据中
processed_data["基础信息"] = base_info
# 遍历原始字典的每一个键值对
for key, value in combined_data.items():
if key in includes:
if key == "基础信息":
# 已经处理 '基础信息',无需再次添加
continue
else:
# 直接保留包含在 includes 列表中的键值对
processed_data[key] = value
else:
# 将不在 includes 列表中的键值对加入到 '其他' 分类中
processed_data["其他"][key] = value
# 如果 '其他' 分类没有任何内容,可以选择删除这个键
if not processed_data["其他"]:
del processed_data["其他"]
return processed_data, extracted_info

View File

@ -1,5 +1,4 @@
import logging import logging
import re
import shutil import shutil
import time import time
import uuid import uuid
@ -7,12 +6,15 @@ from datetime import datetime, timedelta
from flask import Flask, request, jsonify, Response, stream_with_context, g from flask import Flask, request, jsonify, Response, stream_with_context, g
import json import json
import os import os
from flask_app.general.little_zbparse import little_parse_main
from flask_app.main.download import download_file from flask_app.main.download import download_file
from flask_app.main.post_processing import outer_post_processing
from flask_app.main.招标文件解析 import engineering_bid_main from flask_app.main.招标文件解析 import engineering_bid_main
from flask_app.货物标.货物标解析main import goods_bid_main from flask_app.货物标.货物标解析main import goods_bid_main
app = Flask(__name__)
app = Flask(__name__)
class CSTFormatter(logging.Formatter): class CSTFormatter(logging.Formatter):
"""自定义的 Formatter将日志的时间戳调整为中国标准时间UTC+8""" """自定义的 Formatter将日志的时间戳调整为中国标准时间UTC+8"""
@ -125,11 +127,67 @@ def validate_request(default_zb_type=1):
return jsonify({'error': 'Invalid zb_type provided'}), 400 return jsonify({'error': 'Invalid zb_type provided'}), 400
return file_url, zb_type return file_url, zb_type
@app.route('/little_zbparse',methods=['POST'])
def little_zbparse():
logger=g.logger
file_url,zb_type = validate_request()
if isinstance(file_url, tuple): # Check if the returned value is an error response
return file_url
try:
logger.info("starting parsing url:" + file_url)
final_json_path= download_and_process_file(file_url,zb_type)
if not final_json_path:
return jsonify({'error': 'File processing failed'}), 500
response = generate_response(final_json_path) # 先获取响应内容
# remove_directory(output_folder) # 然后删除文件夹
return response # 最后返回获取的响应
except Exception as e:
logger.error('Exception occurred: ' + str(e)) # 使用全局 logger 记录
return jsonify({'error': str(e)}), 500
def download_and_process_file(file_url,zb_type):
logger = g.logger
unique_id = g.unique_id
output_folder = f"flask_app/static/output/{unique_id}" # 直接使用全局 unique_id 构建路径
filename = "ztbfile"
downloaded_filename = os.path.join(output_folder, filename)
# 下载文件,假设 download_file 函数已正确处理异常并返回文件路径
downloaded_filepath, file_type = download_file(file_url, downloaded_filename)
if downloaded_filepath is None or file_type == 4:
logger.error("Unsupported file type or failed to download file")
return None
logger.info("Local file path: " + downloaded_filepath)
processed_file_path = little_parse_main(output_folder, downloaded_filepath, file_type,zb_type,unique_id)
return processed_file_path
def generate_response(final_json_path):
logger = g.logger
# 检查final_json_path是否为空或None
if not final_json_path:
logger.error('Empty or None path provided for final_json.')
return jsonify({'error': 'No path provided for final_json.'}), 400
if not os.path.exists(final_json_path):
logger.error('final_json not found at path: ' + final_json_path)
return jsonify({'error': 'final_json not found'}), 404
with open(final_json_path, 'r', encoding='utf-8') as f:
logger.info('final_json_path:' + final_json_path)
zbparse_data = json.load(f)
json_str = json.dumps(zbparse_data, ensure_ascii=False)
return jsonify({
'message': 'File uploaded and processed successfully',
'filename': os.path.basename(final_json_path),
'data': json_str
})
# 流式 # 流式
@app.route('/upload', methods=['POST']) @app.route('/upload', methods=['POST'])
def zbparse(): def zbparse():
logger = g.logger logger = g.logger
logger.info("start!!!") logger.info("zbparse start!!!")
# 获取并显示接收到的 JSON 数据 # 获取并显示接收到的 JSON 数据
received_data = request.get_json() received_data = request.get_json()
logger.info("Received JSON data: " + str(received_data)) logger.info("Received JSON data: " + str(received_data))
@ -143,170 +201,6 @@ def zbparse():
logger.error('Exception occurred: ' + str(e)) logger.error('Exception occurred: ' + str(e))
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
def post_processing(combined_data, includes):
# 初始化结果字典,预设'其他'分类为空字典
processed_data = {"其他": {}}
# 初始化提取的信息字典
extracted_info = {}
# 定义一个辅助函数用于获取嵌套字典中的值
def get_nested(dic, keys, default=None):
for key in keys:
if isinstance(dic, dict):
dic = dic.get(key, default)
else:
return default
return dic
# 定义一个辅助函数用于递归查找包含特定子字符串的键
def find_keys_containing(dic, substring):
found_values = []
if isinstance(dic, dict):
for key, value in dic.items():
if substring in key:
found_values.append(value)
if isinstance(value, dict):
found_values.extend(find_keys_containing(value, substring))
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
found_values.extend(find_keys_containing(item, substring))
return found_values
# 定义一个辅助函数用于根据候选键列表提取值(部分匹配)
def extract_field(contact_info, candidate_keys):
for candidate in candidate_keys:
for key, value in contact_info.items():
if candidate in key and value not in ["未知", ""]:
return value
return ""
# 定义一个辅助函数用于提取 '投标保证金'
def extract_bid_bond(guarantee_info):
# 定义投标保证金的候选键
bid_bond_candidates = ["投标保证金", "磋商保证金"]
# 第一步:查找包含 "投标保证金" 或 "磋商保证金" 的键
for candidate in bid_bond_candidates:
for key, value in guarantee_info.items():
if candidate in key:
if isinstance(value, dict):
# 在嵌套字典中查找包含 "金额" 的键
for sub_key, sub_value in value.items():
if "金额" in sub_key and sub_value not in ["未知", ""]:
return sub_value
elif isinstance(value, str):
if "金额" in key and value not in ["未知", ""]:
return value
else:
# 如果 value 既不是 dict 也不是 str忽略
continue
# 第二步:如果没有找到包含 "金额" 的键,尝试在所有键值中查找符合模式的值
amount_pattern = re.compile(r'(?:\d{1,3}(?:[,]\d{3})*(?:\.\d+)?|\d+(?:\.\d+)?|[\u4e00-\u9fff]+(?:\.\d+)?)\s*(?:元|万元)')
for key, value in guarantee_info.items():
if isinstance(value, str):
match = amount_pattern.search(value)
if match:
return match.group()
elif isinstance(value, dict):
# 递归查找嵌套字典中的金额
found_amount = extract_bid_bond(value)
if found_amount:
return found_amount
# 如果都没有找到,则返回空字符串
return ""
# 如果 '基础信息' 在 includes 中,则进行字段提取
if "基础信息" in includes:
base_info = combined_data.get("基础信息", {})
# 定义所需字段的映射关系,暂时不包含'联系人'和'联系电话'以及'招标项目地点'
mapping = {
"招标项目名称": [["项目信息", "项目名称"], ["项目信息", "工程名称"]],
"招标项目编号": [["项目信息", "项目编号"], ["项目信息", "招标编号"]],
"开标时间": [["关键时间/内容", "开标时间"]],
"报名截止日期": [["关键时间/内容", "投标文件递交截止日期"]],
"招标项目预算": [["项目信息", "招标控制价"]],
"招标单位名称": [["招标人/代理信息", "招标人"]],
"招标公告地址": [["关键时间/内容", "信息公示媒介"], ["关键时间/内容", "评标结果公示媒介"]]
}
# 提取并映射字段
for new_key, paths in mapping.items():
value = None
for path in paths:
value = get_nested(base_info, path)
if value:
break
extracted_info[new_key] = value if value else ""
# 特殊处理 '招标项目地点'
# 在 '项目信息' 下查找包含 "地点" 的键
project_info = base_info.get("项目信息", {})
location_candidates = find_keys_containing(project_info, "地点")
if location_candidates:
# 选择第一个找到的地点
extracted_info["招标项目地点"] = location_candidates[0]
else:
extracted_info["招标项目地点"] = ""
# 特殊处理 '联系人' 和 '联系电话'
# 提取 '项目联系方式'
project_contact = get_nested(base_info, ["招标人/代理信息", "项目联系方式"], {})
# 提取 '招标人联系方式'
bidder_contact = get_nested(base_info, ["招标人/代理信息", "招标人联系方式"], {})
# 定义候选键列表,按优先级排序
name_candidates = ["名称", "联系人", "招标"]
phone_candidates = ["电话", "手机", "联系方式"]
# 提取 '联系人'
contact_names = [project_contact, bidder_contact]
contact_name = ""
for contact in contact_names:
extracted_name = extract_field(contact, name_candidates)
if extracted_name:
contact_name = extracted_name
break
extracted_info["联系人"] = contact_name
# 提取 '联系电话'
contact_phones = [project_contact, bidder_contact]
contact_phone = ""
for contact in contact_phones:
extracted_phone = extract_field(contact, phone_candidates)
if extracted_phone:
contact_phone = extracted_phone
break
extracted_info["联系电话"] = contact_phone
# 特殊处理 '投标保证金'
# 提取 '保证金相关'
guarantee_info = get_nested(base_info, ["保证金相关"], {})
extracted_info["投标保证金"] = extract_bid_bond(guarantee_info)
# 遍历原始字典的每一个键值对
for key, value in combined_data.items():
if key in includes:
if key == "基础信息":
# 已经处理 '基础信息',保留在处理后的数据中
processed_data[key] = value
else:
# 直接保留包含在 includes 列表中的键值对
processed_data[key] = value
else:
# 将不在 includes 列表中的键值对加入到 '其他' 分类中
processed_data["其他"][key] = value
# 如果 '其他' 分类没有任何内容,可以选择删除这个键
if not processed_data["其他"]:
del processed_data["其他"]
return processed_data, extracted_info
# 分段返回 # 分段返回
def process_and_stream(file_url, zb_type): def process_and_stream(file_url, zb_type):
""" """
@ -362,7 +256,7 @@ def process_and_stream(file_url, zb_type):
1: engineering_bid_main, 1: engineering_bid_main,
2: goods_bid_main 2: goods_bid_main
} }
processing_func = processing_functions.get(zb_type, engineering_bid_main) processing_func = processing_functions.get(zb_type, engineering_bid_main) #默认按工程标解析
# 从 processing_func 获取数据 # 从 processing_func 获取数据
for data in processing_func(output_folder, downloaded_filepath, file_type, unique_id): for data in processing_func(output_folder, downloaded_filepath, file_type, unique_id):
@ -397,7 +291,7 @@ def process_and_stream(file_url, zb_type):
# **保存 combined_data 到 output_folder 下的 'final_result.json'** # **保存 combined_data 到 output_folder 下的 'final_result.json'**
output_json_path = os.path.join(output_folder, 'final_result.json') output_json_path = os.path.join(output_folder, 'final_result.json')
includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"] includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"]
final_result, extracted_info = post_processing(combined_data, includes) final_result, extracted_info = outer_post_processing(combined_data, includes)
try: try:
with open(output_json_path, 'w', encoding='utf-8') as json_file: with open(output_json_path, 'w', encoding='utf-8') as json_file:
@ -427,6 +321,7 @@ def process_and_stream(file_url, zb_type):
'filename': os.path.basename(downloaded_filepath), 'filename': os.path.basename(downloaded_filepath),
'data': 'END' 'data': 'END'
} }
yield f"data: {json.dumps(final_response)}\n\n" yield f"data: {json.dumps(final_response)}\n\n"
finally: finally:

View File

@ -1,25 +1,24 @@
import json
import re import re
combined_data = { data={
"基础信息": { "基础信息": {
"招标人/代理信息": { "招标人/代理信息": {
"招标人": "广水市公安局", "招标人": "广水市公安局",
"招标人联系方式": { "招标人联系方式": {
"名称": "广水市公安局", "名称": "广水市公安局",
"地址": "广水市应山办事处应十大道 189号", "联系电话": "0722-6248000",
"联系电话": "未知" "地址": "广水市应山办事处应十大道189号"
}, },
"招标代理机构": "湖北楚振捷工程项目管理有限公司", "招标代理机构": "湖北楚振捷工程项目管理有限公司",
"招标代理机构联系方式": { "招标代理机构联系方式": {
"名称": "湖北楚振捷工程项目管理有限公司", "名称": "湖北楚振捷工程项目管理有限公司",
"地址": "广水市永阳一路 41号", "联系电话": "0722-6256088",
"联系方式": "0722-6256088" "地址": "广水市永阳一路41号"
}, },
"项目联系方式":{ "项目联系方式": {
"名称":"张三", "名称": "吴凡",
"联系电话":"11" "联系电话": "18871130808"
} }
}, },
"项目信息": { "项目信息": {
"项目名称": "广水市公安局视频会议高清化改造项目", "项目名称": "广水市公安局视频会议高清化改造项目",
@ -54,51 +53,51 @@ combined_data = {
}, },
"关键时间/内容": { "关键时间/内容": {
"投标文件递交截止日期": "2021年 6月 18日 15点 00分", "投标文件递交截止日期": "2021年 6月 18日 15点 00分",
"开标时间":"111", "投标文件递交地点": "广水市公共资源交易中心五楼 501号开标室",
"开标地点":"哈哈", "开标时间": "2021年 6月 18日 15点 00分",
"澄清招标文件的截止时间": "未知", "开标地点": "广水市公共资源交易中心五楼 501号开标室",
"澄清招标文件的截止时间": "同采购文件获取截止时间(如有)",
"投标有效期": "90日历天", "投标有效期": "90日历天",
"信息公示媒介": "中国湖北政府采购网http://www.ccgp-hubei.gov.cn, 中国广水网http://www.zggsw.gov.cn/", "信息公示媒介": "中国湖北政府采购网http://www.ccgp-hubei.gov.cn, 中国广水网http://www.zggsw.gov.cn/"
"投标文件递交地点": "广水市公共资源交易中心五楼 501号开标室"
}, },
"保证金相关": { "保证金相关": {
"质量保证金": "未知", "投标保证金": "不提交",
"履约保证金": "不提交", "履约保证金": "不提交",
"是否递交磋商保证金": "",
"退还投标保证金": "/", "退还投标保证金": "/",
"投标保证金": { "质量保证金": "未知"
"价格":"100000000.00 元"
}
}, },
"其他信息": { "其他信息": {
"是否退还投标文件": "",
"投标费用承担": "供应商应承担所有与准备和参加磋商有关的费用,不论磋商的结果如何,采购人和采购代理机构均无义务和责任承担这些费用。", "投标费用承担": "供应商应承担所有与准备和参加磋商有关的费用,不论磋商的结果如何,采购人和采购代理机构均无义务和责任承担这些费用。",
"招标代理服务费": { "招标代理服务费": {
"支付人": "成交供应商", "收费标准": "根据国家发展与改革委员会办公厅发改办价格【2003】857 号文的规定经协商由成交供应商按国家发展和改革委员发改价格【2011】534号文规定货物类取费标准向采购代理机构支付招标代理服务费包含“招标代理费、评标会务费、评标费”如本项目各包服务费不足叁仟元则供应商按叁仟元支付服务费。",
"支付标准": "根据国家发展与改革委员会办公厅发改办价格【2003】857 号文的规定经协商由成交供应商按国家发展和改革委员发改价格【2011】534号文规定货物类取费标准向采购代理机构支付招标代理服务费包含“招标代理费、评标会务费、评标费”如本项目各包服务费不足叁仟元则供应商按叁仟元支付服务费。", "递交方式": "成交服务费由成交供应商在领取成交通知书的同时向代理机构支付,可使用现金或电汇办理,汇款账户信息如下: 账户名:湖北楚振捷工程项目管理有限公司 账户号码580742561314 开户银行:中国银行股份有限公司广水支行营业部"
"支付方式": "成交服务费由成交供应商在领取成交通知书的同时向代理机构支付,可使用现金或电汇办理",
"支付时间": "领取成交通知书的同时"
},
"偏离": {
"偏离项要求": "供应商需在响应文件中提供《采购需求响应、偏离说明表/导读表》具体格式见第六章响应文件格式中的49页。供应商需对采购需求中的各项技术参数和服务要求进行逐项响应明确表明无偏离、正偏离或负偏离的情况。对于负偏离项需详细说明偏离的具体内容及原因。",
"偏离项内容": "未知"
}, },
"是否退还投标文件": "",
"是否召开投标答疑会": "",
"投标预备会": "不召开", "投标预备会": "不召开",
"踏勘现场": "不组织" "踏勘现场": "不组织",
"偏离": {
"偏离项要求": "供应商需在响应文件中提供《采购需求响应、偏离说明表/导读表》具体格式见第六章响应文件格式中的49页。供应商需对采购需求中的各项技术参数和服务要求进行逐项响应明确表明无偏离、正偏离或负偏离的情况。",
"正偏离定义": "供应商提供的产品或服务优于采购需求中的技术参数和服务要求。",
"负偏离定义": "供应商提供的产品或服务未能完全满足采购需求中的技术参数和服务要求。",
"偏离项处理": "非★号条款允许的负偏离项数及范围见第四章评标办法,★号条款不允许负偏离。",
"偏离项数量限制": "非★号条款允许的负偏离项数及范围见第四章评标办法,具体数量限制未知。",
"偏离项评分影响": "负偏离可能会影响供应商的评分,具体评分标准见第四章评定办法。"
}
} }
}, }
"资格审查": { },
"商务评分": { },
"技术评分": { },
"zhes":"11111"
# 其他键值对
} }
includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"] def inner_post_processing(base_info):
"""
处理 '基础信息' 部分提取所需字段
def post_processing(combined_data, includes): 参数:
# 初始化结果字典,预设'其他'分类为空字典 base_info (dict): 包含 '基础信息' 的字典
processed_data = {"其他": {}}
返回:
dict: 提取的信息字典 extracted_info
"""
# 初始化提取的信息字典 # 初始化提取的信息字典
extracted_info = {} extracted_info = {}
@ -170,95 +169,74 @@ def post_processing(combined_data, includes):
# 如果都没有找到,则返回空字符串 # 如果都没有找到,则返回空字符串
return "" return ""
# 如果 '基础信息' 在 includes 中,则进行字段提取 # 定义所需字段的映射关系,暂时不包含'联系人'和'联系电话'以及'招标项目地点'
if "基础信息" in includes: mapping = {
base_info = combined_data.get("基础信息", {}) "代理机构名称": [["招标人/代理信息", "招标代理机构"]],
"招标项目名称": [["项目信息", "项目名称"], ["项目信息", "工程名称"]],
"招标项目编号": [["项目信息", "项目编号"], ["项目信息", "招标编号"]],
"开标时间": [["关键时间/内容", "开标时间"]],
"报名截止日期": [["关键时间/内容", "投标文件递交截止日期"]],
"招标项目预算": [["项目信息", "招标控制价"]],
"招标单位名称": [["招标人/代理信息", "招标人"]],
"招标公告地址": [["关键时间/内容", "信息公示媒介"], ["关键时间/内容", "评标结果公示媒介"]]
}
# 定义所需字段的映射关系,暂时不包含'联系人'和'联系电话'以及'招标项目地点' # 提取并映射字段
mapping = { for new_key, paths in mapping.items():
"招标项目名称": [["项目信息", "项目名称"], ["项目信息", "工程名称"]], value = None
"招标项目编号": [["项目信息", "项目编号"], ["项目信息", "招标编号"]], for path in paths:
"开标时间": [["关键时间/内容", "开标时间"]], value = get_nested(base_info, path)
"报名截止日期": [["关键时间/内容", "投标文件递交截止日期"]], if value:
"招标项目预算": [["项目信息", "招标控制价"]],
"招标单位名称": [["招标人/代理信息", "招标人"]],
"招标公告地址": [["关键时间/内容", "信息公示媒介"], ["关键时间/内容", "评标结果公示媒介"]]
}
# 提取并映射字段
for new_key, paths in mapping.items():
value = None
for path in paths:
value = get_nested(base_info, path)
if value:
break
extracted_info[new_key] = value if value else ""
# 特殊处理 '招标项目地点'
# 在 '项目信息' 下查找包含 "地点" 的键
project_info = base_info.get("项目信息", {})
location_candidates = find_keys_containing(project_info, "地点")
if location_candidates:
# 选择第一个找到的地点
extracted_info["招标项目地点"] = location_candidates[0]
else:
extracted_info["招标项目地点"] = ""
# 特殊处理 '联系人' 和 '联系电话'
# 提取 '项目联系方式'
project_contact = get_nested(base_info, ["招标人/代理信息", "项目联系方式"], {})
# 提取 '招标人联系方式'
bidder_contact = get_nested(base_info, ["招标人/代理信息", "招标人联系方式"], {})
# 定义候选键列表,按优先级排序
name_candidates = ["名称", "联系人", "招标"]
phone_candidates = ["电话", "手机", "联系方式"]
# 提取 '联系人'
contact_names = [project_contact, bidder_contact]
contact_name = ""
for contact in contact_names:
extracted_name = extract_field(contact, name_candidates)
if extracted_name:
contact_name = extracted_name
break break
extracted_info["联系人"] = contact_name extracted_info[new_key] = value if value else ""
# 提取 '联系电话' # 特殊处理 '招标项目地点'
contact_phones = [project_contact, bidder_contact] # 在 '项目信息' 下查找包含 "地点" 的键
contact_phone = "" project_info = base_info.get("项目信息", {})
for contact in contact_phones: location_candidates = find_keys_containing(project_info, "地点")
extracted_phone = extract_field(contact, phone_candidates) if location_candidates:
if extracted_phone: # 选择第一个找到的地点
contact_phone = extracted_phone extracted_info["招标项目地点"] = location_candidates[0]
break else:
extracted_info["联系电话"] = contact_phone extracted_info["招标项目地点"] = ""
# 特殊处理 '投标保证金' # 特殊处理 '联系人' 和 '联系电话'
# 提取 '保证金相关' # 提取 '项目联系方式'
guarantee_info = get_nested(base_info, ["保证金相关"], {}) project_contact = get_nested(base_info, ["招标人/代理信息", "项目联系方式"], {})
extracted_info["投标保证金"] = extract_bid_bond(guarantee_info)
# 遍历原始字典的每一个键值对 # 提取 '招标人联系方式'
for key, value in combined_data.items(): bidder_contact = get_nested(base_info, ["招标人/代理信息", "招标人联系方式"], {})
if key in includes:
if key == "基础信息":
# 已经处理 '基础信息',保留在处理后的数据中
processed_data[key] = value
else:
# 直接保留包含在 includes 列表中的键值对
processed_data[key] = value
else:
# 将不在 includes 列表中的键值对加入到 '其他' 分类中
processed_data["其他"][key] = value
# 如果 '其他' 分类没有任何内容,可以选择删除这个键 # 定义候选键列表,按优先级排序
if not processed_data["其他"]: name_candidates = ["名称", "联系人", "招标"]
del processed_data["其他"] phone_candidates = ["电话", "手机", "联系方式"]
return processed_data, extracted_info # 提取 '联系人'
contact_names = [project_contact, bidder_contact]
contact_name = ""
for contact in contact_names:
extracted_name = extract_field(contact, name_candidates)
if extracted_name:
contact_name = extracted_name
break
extracted_info["联系人"] = contact_name
# 提取 '联系电话'
contact_phones = [project_contact, bidder_contact]
contact_phone = ""
for contact in contact_phones:
extracted_phone = extract_field(contact, phone_candidates)
if extracted_phone:
contact_phone = extracted_phone
break
extracted_info["联系电话"] = contact_phone
res1,res2=post_processing(combined_data,includes) # 特殊处理 '投标保证金'
print(json.dumps(res2,ensure_ascii=False,indent=4)) # 提取 '保证金相关'
guarantee_info = get_nested(base_info, ["保证金相关"], {})
extracted_info["投标保证金"] = extract_bid_bond(guarantee_info)
return extracted_info
res=inner_post_processing(data["基础信息"])
print(res)

View File

@ -95,7 +95,7 @@ def merge_json_to_list(merged):
merged['偏离']='不允许' merged['偏离']='不允许'
merged.pop('是否允许偏离', None) merged.pop('是否允许偏离', None)
return chosen_numbers, json.dumps(merged,ensure_ascii=False) return chosen_numbers, merged

View File

@ -89,15 +89,14 @@ def judge_consortium_bidding(baseinfo_list):
updated_list = [] updated_list = []
accept_bidding = False accept_bidding = False
for baseinfo in baseinfo_list: for baseinfo in baseinfo_list:
json_data = clean_json_string(baseinfo)
# 检查 "是否接受联合体投标" 键是否存在且其值为 "是" # 检查 "是否接受联合体投标" 键是否存在且其值为 "是"
if "是否接受联合体投标" in json_data and json_data["是否接受联合体投标"] == "": if "是否接受联合体投标" in baseinfo and baseinfo["是否接受联合体投标"] == "":
accept_bidding = True accept_bidding = True
# 从字典中移除特定键值对 # 从字典中移除特定键值对
json_data.pop("是否接受联合体投标", None) baseinfo.pop("是否接受联合体投标", None)
# # 将修改后的 json 数据转换回 JSON 字符串(如果需要) # # 将修改后的 json 数据转换回 JSON 字符串(如果需要)
# updated_info = json.dumps(json_data) # updated_info = json.dumps(json_data)
updated_list.append(json_data) updated_list.append(baseinfo)
# 更新原始列表,如果你想保留修改 # 更新原始列表,如果你想保留修改
baseinfo_list[:] = updated_list baseinfo_list[:] = updated_list
return accept_bidding return accept_bidding
@ -115,15 +114,15 @@ def combine_basic_info(knowledge_name, truncate0, output_folder, clause_path):
- dict: 综合后的基础信息 - dict: 综合后的基础信息
""" """
baseinfo_list = [] baseinfo_list = []
baseinfo_file_path = 'flask_app/static/提示词/前两章提问总结.txt' # baseinfo_file_path = 'flask_app/static/提示词/前两章提问总结.txt'
# baseinfo_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\前两章提问总结.txt' baseinfo_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\前两章提问总结.txt'
questions = read_questions_from_file(baseinfo_file_path) questions = read_questions_from_file(baseinfo_file_path)
res1 = multi_threading(questions, knowledge_name) res1 = multi_threading(questions, knowledge_name)
for index, response in res1: for index, response in res1:
try: try:
if response and len(response) > 1: if response and len(response) > 1:
baseinfo_list.append(response[1]) baseinfo_list.append(clean_json_string(response[1]))
else: else:
print(f"基础信息整合: Warning: Missing or incomplete response data for query index {index}.") print(f"基础信息整合: Warning: Missing or incomplete response data for query index {index}.")
except Exception as e: except Exception as e:
@ -133,8 +132,8 @@ def combine_basic_info(knowledge_name, truncate0, output_folder, clause_path):
chosen_numbers, merged = judge_whether_main(truncate0, output_folder) chosen_numbers, merged = judge_whether_main(truncate0, output_folder)
baseinfo_list.append(merged) baseinfo_list.append(merged)
judge_file_path = 'flask_app/static/提示词/是否相关问题.txt' # judge_file_path = 'flask_app/static/提示词/是否相关问题.txt'
# judge_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\是否相关问题.txt' judge_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\是否相关问题.txt'
judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers) judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers)
judge_consortium = judge_consortium_bidding(baseinfo_list) # 通过招标公告判断是否接受联合体投标 judge_consortium = judge_consortium_bidding(baseinfo_list) # 通过招标公告判断是否接受联合体投标

View File

@ -22,4 +22,4 @@
12.该项目是否接受联合体投标请按json格式给我提供信息键名为'是否接受联合体投标''是否接受联合体投标'的键值仅限于'是'、'否'、'未知'。 12.该项目是否接受联合体投标请按json格式给我提供信息键名为'是否接受联合体投标''是否接受联合体投标'的键值仅限于'是'、'否'、'未知'。
13.该项目的开标时间开启时间和开标地点是请按json格式给我提供信息键名为'开标时间'和'开标地点',若存在未知信息,在对应的键值中填'未知'。 13.该项目的开标时间(开启时间和开标地点是请按json格式给我提供信息键名为'开标时间'和'开标地点',对于"开标时间",若文中没有明确时间却有诸如'同投标截止时间'的类比表述,将其键值设为文中相关表述,若存在未知信息,在对应的键值中填'未知'。

View File

@ -11,7 +11,7 @@ from flask_app.main.判断是否分包等 import merge_json_to_list, read_questi
from flask_app.货物标.提取采购需求main import fetch_procurement_reqs from flask_app.货物标.提取采购需求main import fetch_procurement_reqs
def aggregate_basic_info(baseinfo_list): def aggregate_basic_info_goods(baseinfo_list):
""" """
将基础信息列表中的数据进行合并和分类 将基础信息列表中的数据进行合并和分类
@ -100,18 +100,18 @@ def dynamic_key_handling(key_groups, detected_keys):
def get_base_info(baseinfo_file_path): def get_base_info(baseinfo_file_path):
file_id = upload_file(baseinfo_file_path) file_id = upload_file(baseinfo_file_path)
baseinfo_file_path='flask_app/static/提示词/基本信息货物标.txt' # baseinfo_file_path='flask_app/static/提示词/基本信息货物标.txt'
# baseinfo_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\基本信息货物标.txt' baseinfo_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\基本信息货物标.txt'
questions = read_questions_from_file(baseinfo_file_path) questions = read_questions_from_file(baseinfo_file_path)
more_query = "请你根据招标文件信息,回答以下问题:是否组织踏勘现场?是否召开投标预备会(或投标答疑会)?是否退还投标文件?是否允许分包? 是否需要递交投标保证金或磋商保证金是否需要提交履约保证金或履约担保是否有招标代理服务费或中标、成交服务费请按json格式给我提供信息键名分别为'是否组织踏勘现场','是否召开投标预备会'(或'是否召开投标答疑会','是否退还投标文件',是否允许分包','是否递交投标保证金'(或'是否递交磋商保证金','是否提交履约保证金','是否有招标代理服务费',键值仅限于'','','未知',若存在矛盾信息,请回答'未知'" more_query = "请你根据招标文件信息,回答以下问题:是否组织踏勘现场?是否召开投标预备会(或投标答疑会)?是否退还投标文件?是否允许分包? 是否需要递交投标保证金或磋商保证金是否需要提交履约保证金或履约担保是否有招标代理服务费或中标、成交服务费请按json格式给我提供信息键名分别为'是否组织踏勘现场','是否召开投标预备会'(或'是否召开投标答疑会','是否退还投标文件',是否允许分包','是否递交投标保证金'(或'是否递交磋商保证金','是否提交履约保证金','是否有招标代理服务费',键值仅限于'','','未知',若存在矛盾信息,请回答'未知'"
questions.append(more_query) questions.append(more_query)
baseinfo_results = multi_threading(questions, "", file_id, 2) # 1代表使用百炼rag 2代表使用qianwen-long baseinfo_results = multi_threading(questions, "", file_id, 2) # 1代表使用百炼rag 2代表使用qianwen-long
baseinfo_list = [res for _, res in baseinfo_results] if baseinfo_results else [] baseinfo_list = [clean_json_string(res) for _, res in baseinfo_results] if baseinfo_results else []
chosen_numbers, merged = merge_json_to_list(clean_json_string(baseinfo_list.pop())) chosen_numbers, merged = merge_json_to_list(baseinfo_list.pop())
baseinfo_list.append(merged) baseinfo_list.append(merged)
judge_file_path = 'flask_app/static/提示词/是否相关问题货物标.txt' # judge_file_path = 'flask_app/static/提示词/是否相关问题货物标.txt'
# judge_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\是否相关问题货物标.txt' judge_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\是否相关问题货物标.txt'
judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers) judge_questions = read_questions_from_judge(judge_file_path, chosen_numbers)
# print(judge_questions) # print(judge_questions)
judge_consortium = judge_consortium_bidding(baseinfo_list) # 通过招标公告判断是否接受联合体投标 judge_consortium = judge_consortium_bidding(baseinfo_list) # 通过招标公告判断是否接受联合体投标
@ -147,15 +147,15 @@ def combine_basic_info(baseinfo_file_path, procurement_file_path):
thread1.start() thread1.start()
# 等待一秒后启动获取采购需求的线程 # 等待一秒后启动获取采购需求的线程
time.sleep(1) time.sleep(1)
thread2 = threading.Thread(target=fetch_procurement_reqs_thread) # thread2 = threading.Thread(target=fetch_procurement_reqs_thread)
thread2.start() # thread2.start()
# 等待两个线程都完成 # 等待两个线程都完成
thread1.join() thread1.join()
thread2.join() # thread2.join()
# 合并结果 # 合并结果
baseinfo_list += temp_list # temp_list 是一个列表 baseinfo_list += temp_list # temp_list 是一个列表
baseinfo_list.append(procurement_reqs) # procurement_reqs 是一个字典 # baseinfo_list.append(procurement_reqs) # procurement_reqs 是一个字典
aggregated_baseinfo = aggregate_basic_info(baseinfo_list) aggregated_baseinfo = aggregate_basic_info_goods(baseinfo_list)
return {"基础信息": aggregated_baseinfo} return {"基础信息": aggregated_baseinfo}

View File

@ -196,23 +196,55 @@ def process_nested_data(data):
# 读取JSON数据提取内容转换结构并打印结果 # 读取JSON数据提取内容转换结构并打印结果
def extract_from_notice(clause_path, type): def extract_from_notice(clause_path, type):
"""
从公告中提取特定类型的内容
Args:
clause_path (str): 包含条款的JSON文件路径
type (int): 提取的类型
1 - ["投标文件","响应文件","响应性文件"]
2 - ["开标", "评标", "定标","磋商程序","中标","程序","步骤"]
3 - ["重新招标、不再招标和终止招标", "重新招标", "不再招标", "终止招标"]
4 - ["评标"] # 测试
Returns:
dict str: 提取并处理后的数据或在 `clause_path` 为空或发生错误时返回空字符串 `""`
"""
# 定义默认的返回结果
DEFAULT_RESULT = ""
# 判断 clause_path 是否为空
if not clause_path:
print("Warning: clause_path is empty.")
return DEFAULT_RESULT
# 根据 type 设置 target_values
if type == 1: if type == 1:
target_values = ["投标文件","响应文件","响应性文件"] target_values = ["投标文件", "响应文件", "响应性文件"]
elif type == 2: elif type == 2:
target_values = ["开标", "评标", "定标","磋商程序","中标","程序","步骤"] target_values = ["开标", "评标", "定标", "磋商程序", "中标", "程序", "步骤"]
elif type == 3: elif type == 3:
target_values = ["重新招标、不再招标和终止招标", "重新招标", "不再招标", "终止招标"] target_values = ["重新招标、不再招标和终止招标", "重新招标", "不再招标", "终止招标"]
elif type == 4: elif type == 4:
target_values = ["评标"] #测试 target_values = ["评标"] # 测试
else: else:
raise ValueError("Invalid type specified. Use 1 for '投标文件, 投标' or 2 for '开标, 评标, 定标'or 3 for '重新招标'") print(f"Error: Invalid type specified: {type}. Use 1, 2, 3, or 4.")
with open(clause_path, 'r', encoding='utf-8') as file: return DEFAULT_RESULT
data = json.load(file) try:
# 打开并读取JSON文件
with open(clause_path, 'r', encoding='utf-8') as file:
data = json.load(file)
# 提取目标部分
extracted_data = extract_between_sections(data, target_values) # 读取json extracted_data = extract_between_sections(data, target_values) # 读取json
transformed_data=process_with_outer_key(extracted_data) transformed_data = process_with_outer_key(extracted_data)
final_result=process_nested_data(transformed_data) final_result = process_nested_data(transformed_data)
return final_result return final_result
except Exception as e:
print(f"Error in extract_from_notice: {e}")
return DEFAULT_RESULT
if __name__ == "__main__": if __name__ == "__main__":
file_path = 'D:\\flask_project\\flask_app\\static\\output\\87f48f9c-e6ee-4dc1-a981-5a10085c4635\\tmp\\clause1.json' file_path = 'D:\\flask_project\\flask_app\\static\\output\\87f48f9c-e6ee-4dc1-a981-5a10085c4635\\tmp\\clause1.json'
# file_path = 'D:\\flask_project\\flask_app\\static\\output\\87f48f9c-e6ee-4dc1-a981-5a10085c4635\\clause1.json' # file_path = 'D:\\flask_project\\flask_app\\static\\output\\87f48f9c-e6ee-4dc1-a981-5a10085c4635\\clause1.json'

View File

@ -330,6 +330,7 @@ def clean_content(content):
return cleaned_content return cleaned_content
#如果file_path为空返回""
def convert_clause_to_json(file_path,output_folder,type=1,suffix_counter="1.json"): def convert_clause_to_json(file_path,output_folder,type=1,suffix_counter="1.json"):
if not os.path.exists(file_path): if not os.path.exists(file_path):
print(f"The specified file does not exist: {file_path}") print(f"The specified file does not exist: {file_path}")

View File

@ -8,28 +8,45 @@ from flask_app.货物标.商务服务其他要求提取 import get_business_requ
from flask_app.main.json_utils import nest_json_under_key from flask_app.main.json_utils import nest_json_under_key
#获取采购清单 #获取采购清单
def fetch_procurement_reqs(truncate_file): def fetch_procurement_reqs(truncate_file):
# 上传文件并获取 file_id # 定义默认的 procurement_reqs 字典
file_id = upload_file(truncate_file) DEFAULT_PROCUREMENT_REQS = {
"技术要求": "",
"商务要求": "",
"服务要求": "",
"其他要求": ""
}
# 如果 truncate_file 是空字符串,直接返回包含空字符串的字典
if not truncate_file:
return DEFAULT_PROCUREMENT_REQS.copy()
try:
# 上传文件并获取 file_id
file_id = upload_file(truncate_file)
# 使用 ThreadPoolExecutor 并行处理 get_technical_requirements 和 get_business_requirements # 使用 ThreadPoolExecutor 并行处理 get_technical_requirements 和 get_business_requirements
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任务给线程池 # 提交任务给线程池
future_technical = executor.submit(get_technical_requirements, file_id) future_technical = executor.submit(get_technical_requirements, file_id)
time.sleep(1) time.sleep(1) # 如果需要延迟,可以保留,否则建议移除以提高效率
future_business = executor.submit(get_business_requirements, truncate_file, file_id) future_business = executor.submit(get_business_requirements, truncate_file, file_id)
# 获取并行任务的结果 # 获取并行任务的结果
technical_requirements = future_technical.result() technical_requirements = future_technical.result()
business_requirements = future_business.result() business_requirements = future_business.result()
# 构建最终的嵌套结构,确保四个键平级
procurement_reqs = { # 构建最终的嵌套结构,确保四个键平级
procurement_reqs = {
"技术要求": technical_requirements.get("技术要求", {}), "技术要求": technical_requirements.get("技术要求", {}),
"商务要求": business_requirements.get("商务要求", {}), "商务要求": business_requirements.get("商务要求", {}),
"服务要求": business_requirements.get("服务要求", {}), "服务要求": business_requirements.get("服务要求", {}),
"其他要求": business_requirements.get("其他要求", {}) "其他要求": business_requirements.get("其他要求", {})
} }
return procurement_reqs return procurement_reqs
except Exception as e:
print(f"Error in fetch_procurement_reqs: {e}")
# 在出错时返回默认的包含空字符串的字典
return DEFAULT_PROCUREMENT_REQS.copy()
if __name__ == "__main__": if __name__ == "__main__":
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\货物标output" output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\货物标output"

View File

@ -145,80 +145,108 @@ def parse_json_with_duplicates(raw_string):
def combine_evaluation_standards(truncate_file): def combine_evaluation_standards(truncate_file):
file_id = upload_file(truncate_file) # 定义默认的评审结果字典
user_query1 = "根据该文档,你判断它是否有具体的关于技术评分或商务评分或投标报价的评分要求,如果有,返回'',否则返回''" # 应对竞争性谈判这种无评分要求的情况 DEFAULT_EVALUATION_REVIEW = {
judge_res = qianwen_long(file_id, user_query1) "技术评分": "",
# 默认 judge 为 True "商务评分": ""
judge = True }
# 检查 judge_res 的内容 # 如果 truncate_file 是空字符串,直接返回包含空字符串的字典
if '' in judge_res: if not truncate_file:
judge = False return DEFAULT_EVALUATION_REVIEW.copy()
if judge: try:
# 执行 user_query 相关的逻辑 # 上传文件并获取文件ID
# user_query = "根据该文档中的评标办法前附表或者评分标准表,请你列出该文件的技术评分,商务评分,投标报价评审标准以及它们对应的具体评分要求,外层键名分别为'技术评分','商务评分','投标报价'。如果评分内容不是这3个则返回文档中给定的评分内容以及它的评分要求都以json的格式返回结果如果该采购活动有多个包则最外层键名为对应的包名。请不要回答有关资格审查的内容" file_id = upload_file(truncate_file)
user_query = (
""" # 定义用户查询
根据该文档中的评标办法前附表请你列出该文件的技术评分商务评分投标报价评审以及它们对应的具体评分要求请以json格式返回结果请在这三大块评分中分别用若干键值对表示具体要求请精确到具体的评审项内层的键名为'评分''要求'若这三大块评分中存在其他信息则在相应评分大块中新增键名'备注'存放该信息键值为具体的要求否则不需要如果评分内容因素不是这3个则返回文档中给定的评分内容因素以及它们的具体评分要求如果该采购活动有多个包则最外层键名为对应的包名,否则不需要不要回答有关资格审查的内容若存在未知信息填充'未知'以下为示例输出 user_query1 = (
{ "根据该文档,你判断它是否有具体的关于技术评分或商务评分或投标报价的评分要求,"
"一包": { "如果有,返回'',否则返回''"
"技术评分": { ) # 应对竞争性谈判这种无评分要求的情况
"主要监理岗位的职责": {
"评分": "4分", # 执行查询
"要求": "1、总监理工程师的职责全面、清晰、合理得 1.2-2分一般的1.2分。2、其他主要监理人员及岗位的职责全面、清晰、合理得 1.2-2分一般的 1.2分。" judge_res = qianwen_long(file_id, user_query1)
},
"备注": "若不满足“与公安部、省公安厅、随州市公安局高清视频会议系统无缝对接互联互通”的要求则本项技术部分50分不得分。" # 默认 judge 为 True
}, judge = True
"商务评分": {
"控制系统内主板": { # 检查 judge_res 的内容
"评分": "10分", if '' in judge_res:
"要求": "所投电梯控制系统内主板为制造商原厂原品牌制造生产且为进口部件得 10分。提供进口部件报关单及原产地证明扫描件加盖公章否则不得分" judge = False
},
"制造商技术实力": [ if judge:
# 执行 user_query 相关的逻辑
# user_query = "根据该文档中的评标办法前附表或者评分标准表,请你列出该文件的技术评分,商务评分,投标报价评审标准以及它们对应的具体评分要求,外层键名分别为'技术评分','商务评分','投标报价'。如果评分内容不是这3个则返回文档中给定的评分内容以及它的评分要求都以json的格式返回结果如果该采购活动有多个包则最外层键名为对应的包名。请不要回答有关资格审查的内容"
user_query = (
"""
根据该文档中的评标办法前附表请你列出该文件的技术评分商务评分投标报价评审以及它们对应的具体评分要求请以json格式返回结果请在这三大块评分中分别用若干键值对表示具体要求请精确到具体的评审项内层的键名为'评分''要求'若这三大块评分中存在其他信息则在相应评分大块中新增键名'备注'存放该信息键值为具体的要求否则不需要如果评分内容因素不是这3个则返回文档中给定的评分内容因素以及它们的具体评分要求如果该采购活动有多个包则最外层键名为对应的包名,否则不需要不要回答有关资格审查的内容若存在未知信息填充'未知'以下为示例输出
{ {
"评分": "3分", "一包": {
"要求": "一级证书得3分二级证书得1分其他不得分" "技术评分": {
"主要监理岗位的职责": {
"评分": "4分",
"要求": "1、总监理工程师的职责全面、清晰、合理得 1.2-2分一般的1.2分。2、其他主要监理人员及岗位的职责全面、清晰、合理得 1.2-2分一般的 1.2分。"
}, },
{ "备注": "若不满足“与公安部、省公安厅、随州市公安局高清视频会议系统无缝对接互联互通”的要求则本项技术部分50分不得分。"
"评分": "2分", },
"要求": "行业销量排名连续前 2 名,得 2 分,第 4-6 名得 0.5 分,其他不得分" "商务评分": {
"控制系统内主板": {
"评分": "10分",
"要求": "所投电梯控制系统内主板为制造商原厂原品牌制造生产且为进口部件得 10分。提供进口部件报关单及原产地证明扫描件加盖公章否则不得分"
},
"制造商技术实力": [
{
"评分": "3分",
"要求": "一级证书得3分二级证书得1分其他不得分"
},
{
"评分": "2分",
"要求": "行业销量排名连续前 2 名,得 2 分,第 4-6 名得 0.5 分,其他不得分"
}
]
},
"投标报价评审": {
"投标报价是否出现违反计价规范": {
"评分": "合格/不合格",
"要求": "A:投标报价未违反计价规范的评审意见为“合格”B投标报价违反计价规范的评审意见为“不合格”"
} }
]
},
"投标报价评审": {
"投标报价是否出现违反计价规范": {
"评分": "合格/不合格",
"要求": "A:投标报价未违反计价规范的评审意见为“合格”B投标报价违反计价规范的评审意见为“不合格”"
} }
} }
} }
} """
""" )
) # 执行第二个查询
# 执行第二个查询 evaluation_res = qianwen_long(file_id, user_query)
evaluation_res = qianwen_long(file_id, user_query)
cleaned_evaluation_res = parse_json_with_duplicates(evaluation_res)
result_data = process_data_based_on_key(cleaned_evaluation_res)
include = ['一包', '二包', '三包', '四包', '五包']
target_values = ['技术', '设计', '实施']
updated_jsons = {}
# 检查是否有外层键匹配 include 列表 # 清理和处理响应
if any(key for key in result_data if any(included in key for included in include)): cleaned_evaluation_res = parse_json_with_duplicates(evaluation_res)
# 有匹配的项,处理这些项 result_data = process_data_based_on_key(cleaned_evaluation_res)
for key in result_data: include = ['一包', '二包', '三包', '四包', '五包']
if any(item in key for item in include): target_values = ['技术', '设计', '实施']
inner_dict = result_data[key] updated_jsons = {}
updated_jsons[key] = combine_technical_and_business(inner_dict, target_values)
# 检查是否有外层键匹配 include 列表
if any(key for key in result_data if any(included in key for included in include)):
# 有匹配的项,处理这些项
for key in result_data:
if any(item in key for item in include):
inner_dict = result_data[key]
updated_jsons[key] = combine_technical_and_business(inner_dict, target_values)
else:
# 没有匹配的项,对整个字典运行
updated_jsons = combine_technical_and_business(result_data, target_values)
return updated_jsons
else: else:
# 没有匹配的项,对整个字典运行 # 如果 judge 是 False直接返回默认的技术标和商务标的结构
updated_jsons = combine_technical_and_business(result_data, target_values) result_data = {}
return updated_jsons result_data['技术评分'] = '本招标文件没有技术评分!'
else: result_data['商务评分'] = '本招标文件没有商务评分!'
# 如果 judge 是 False直接返回默认的技术标和商务标的结构 return result_data
result_data = {}
result_data['技术评分'] = '' except Exception as e:
result_data['商务评分'] = '' print(f"Error in combine_evaluation_standards: {e}")
return result_data # 在出错时返回默认的包含空字符串的字典
return DEFAULT_EVALUATION_REVIEW.copy()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,6 +1,3 @@
import glob
import fitz
from PyPDF2 import PdfReader, PdfWriter from PyPDF2 import PdfReader, PdfWriter
import re # 导入正则表达式库 import re # 导入正则表达式库
import os # 用于文件和文件夹操作 import os # 用于文件和文件夹操作
@ -169,11 +166,14 @@ def process_files(file_path, output_folder, begin_pattern, begin_page, end_patte
result = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) result = extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result: if result:
if output_suffix == "tobidders_notice": if output_suffix == "tobidders_notice":
return result # 返回包含两个路径的元组 # 确保返回的是元组,并将其中的 None 转换为 ""
path1, path2 = result
return (path1 or "", path2 or "")
elif output_suffix == "qualification1": elif output_suffix == "qualification1":
merge_and_cleanup(result, "qualification3") merge_and_cleanup(result, "qualification3")
return result return result or ""
return None return result or ""
return "" # 返回空字符串
def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix): def process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix):
@ -181,7 +181,6 @@ def process_input(input_path, output_folder, begin_pattern, begin_page, end_patt
os.makedirs(output_folder) os.makedirs(output_folder)
generated_files = [] generated_files = []
# 判断输入是目录还是单个文件,并统一处理
if os.path.isdir(input_path): if os.path.isdir(input_path):
for file_name in os.listdir(input_path): for file_name in os.listdir(input_path):
file_path = os.path.join(input_path, file_name) file_path = os.path.join(input_path, file_name)
@ -189,22 +188,23 @@ def process_input(input_path, output_folder, begin_pattern, begin_page, end_patt
result = process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) result = process_files(file_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result: if result:
if isinstance(result, tuple): if isinstance(result, tuple):
generated_files.extend(result) generated_files.extend([f if f else "" for f in result])
else: else:
generated_files.append(result) generated_files.append(result if result else "")
elif os.path.isfile(input_path) and is_pdf_or_doc(input_path): elif os.path.isfile(input_path) and is_pdf_or_doc(input_path):
result = process_files(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) result = process_files(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix)
if result: if result:
if isinstance(result, tuple): if isinstance(result, tuple):
generated_files.extend(result) generated_files.extend([f if f else "" for f in result])
else: else:
generated_files.append(result) generated_files.append(result if result else "")
else: else:
print("提供的路径既不是文件夹也不是PDF文件。") print("提供的路径既不是文件夹也不是PDF文件。")
return generated_files return generated_files
# 默认逻辑是start_page匹配上就不再设置了一般不匹配上目录的原因是设置了begin_page=5但是匹配'第一章 招标公告'的时候start_page可能会错误匹配到目录。 # 默认逻辑是start_page匹配上就不再设置了一般不匹配上目录的原因是设置了begin_page=5但是匹配'第一章 招标公告'的时候start_page可能会错误匹配到目录。
def extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None, def extract_pages_generic(pdf_document, begin_pattern, end_pattern, begin_page, common_header, exclusion_pattern=None,
output_suffix="normal"): output_suffix="normal"):
@ -264,7 +264,7 @@ def extract_pages(pdf_path, output_folder, begin_pattern, begin_page, end_patter
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix) return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
except Exception as e: except Exception as e:
print(f"Error processing {pdf_path}: {e}") print(f"Error processing {pdf_path}: {e}")
return None return ""
def extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern, begin_page, common_header, def extract_pages_tobidders_notice(pdf_document, begin_pattern, end_pattern, begin_page, common_header,
@ -346,14 +346,14 @@ def extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix,
start_page1, end_page1 = extract_pages_generic(pdf_document, begin_pattern, end_pattern, -1, common_header) start_page1, end_page1 = extract_pages_generic(pdf_document, begin_pattern, end_pattern, -1, common_header)
if start_page1 is None or end_page1 is None: if start_page1 is None or end_page1 is None:
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!") print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
return None, None return "", ""
# 提取第二部分 # 提取第二部分
start_page2 = end_page1 # 第二部分的开始页就是第一部分的结束页 start_page2 = end_page1 # 第二部分的开始页就是第一部分的结束页
_, end_page2 = extract_pages_generic(pdf_document, end_pattern, end_pattern, start_page2 - 1, common_header, _, end_page2 = extract_pages_generic(pdf_document, end_pattern, end_pattern, start_page2 - 1, common_header,
exclusion_pattern) exclusion_pattern)
if end_page2 is None: if end_page2 is None:
print(f"second: {output_suffix} 未找到第二部分的结束页在文件 {pdf_path} 中!") print(f"second: {output_suffix} 未找到第二部分的结束页在文件 {pdf_path} 中!")
return None, None return "", ""
# 保存提取的页面 # 保存提取的页面
path1 = save_extracted_pages(pdf_document, start_page1, end_page1, pdf_path, output_folder, path1 = save_extracted_pages(pdf_document, start_page1, end_page1, pdf_path, output_folder,
@ -364,42 +364,46 @@ def extract_pages_twice_tobidders_notice(pdf_path, output_folder, output_suffix,
return path1, path2 return path1, path2
def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header): def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header):
exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据') try:
pdf_document = PdfReader(pdf_path) exclusion_pattern = re.compile(r'文件的构成|文件的组成|须对应|需对应|须按照|需按照|须根据|需根据')
patterns = None pdf_document = PdfReader(pdf_path)
begin_page = 0 patterns = None
if output_suffix == "procurement":
patterns = [get_patterns_for_procurement()]
begin_page = 5
elif output_suffix == "evaluation_method" or output_suffix == "qualification2" or output_suffix == "qualification3":
patterns = [get_patterns_for_evaluation_method()]
begin_page = 5
elif output_suffix == "qualification1":
patterns = [get_patterns_for_qualification()] # This now returns a tuple of pattern pairs
begin_page = 5
elif output_suffix == "notice":
patterns = [get_patterns_for_notice()]
begin_page = 0 begin_page = 0
# Try each set of patterns until a valid range is found if output_suffix == "procurement":
for pattern_pair in patterns: patterns = [get_patterns_for_procurement()]
start_page, end_page = extract_pages_generic(pdf_document, pattern_pair[0], pattern_pair[1], begin_page, begin_page = 5
common_header, elif output_suffix == "evaluation_method" or output_suffix == "qualification2" or output_suffix == "qualification3":
exclusion_pattern, output_suffix) patterns = [get_patterns_for_evaluation_method()]
if start_page is not None and end_page is not None: begin_page = 5
break elif output_suffix == "qualification1":
if start_page is None or end_page is None: patterns = [get_patterns_for_qualification()] # This now returns a tuple of pattern pairs
if output_suffix == "qualification1": begin_page = 5
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!") elif output_suffix == "notice":
print("third:尝试提取评分办法章节...") patterns = [get_patterns_for_notice()]
temp = truncate_pdf_main(pdf_path, output_folder, 2, "qualification2") begin_page = 0
if len(temp) > 0: # Try each set of patterns until a valid range is found
return temp[0] for pattern_pair in patterns:
start_page, end_page = extract_pages_generic(pdf_document, pattern_pair[0], pattern_pair[1], begin_page,
common_header,
exclusion_pattern, output_suffix)
if start_page is not None and end_page is not None:
break
if start_page is None or end_page is None:
if output_suffix == "qualification1":
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
print("third:尝试提取评分办法章节...")
temp = truncate_pdf_main(pdf_path, output_folder, 2, "qualification2")
if len(temp) > 0:
return temp[0]
else:
return "" # 返回空字符串
else: else:
return None print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!")
else: return "" # 返回空字符串
print(f"second: {output_suffix} 未找到起始或结束页在文件 {pdf_path} 中!") return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix)
return "" except Exception as e:
return save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix) print(f"Error in extract_pages_twice: {e}")
return "" # 返回空字符串
# def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix): # def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix):
@ -417,30 +421,40 @@ def extract_pages_twice(pdf_path, output_folder, output_suffix, common_header):
def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix): def save_extracted_pages(pdf_document, start_page, end_page, pdf_path, output_folder, output_suffix):
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] try:
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf") base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
output_pdf_path = os.path.join(output_folder, f"{base_file_name}_{output_suffix}.pdf")
if output_suffix == 'notice' and start_page - 1 >= 0: if start_page < 0 or end_page >= len(pdf_document.pages) or start_page > end_page:
before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf") print(f"无效的页面范围: {start_page}{end_page}")
before_doc = PdfWriter() return ""
for page_num in range(0, start_page):
before_doc.add_page(pdf_document.pages[page_num]) if output_suffix == 'notice' and start_page - 1 >= 0:
with open(before_pdf_path, 'wb') as f: before_pdf_path = os.path.join(output_folder, f"{base_file_name}_before.pdf")
before_doc.write(f) before_doc = PdfWriter()
print(f"已保存页面从 0 到 {start_page - 1}{before_pdf_path}") for page_num in range(0, start_page):
before_doc.add_page(pdf_document.pages[page_num])
with open(before_pdf_path, 'wb') as f:
before_doc.write(f)
print(f"已保存页面从 0 到 {start_page - 1}{before_pdf_path}")
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
output_doc.add_page(pdf_document.pages[page_num])
with open(output_pdf_path, 'wb') as f:
output_doc.write(f)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
except Exception as e:
print(f"Error in save_extracted_pages: {e}")
return "" # 返回空字符串
output_doc = PdfWriter()
for page_num in range(start_page, end_page + 1):
output_doc.add_page(pdf_document.pages[page_num])
with open(output_pdf_path, 'wb') as f:
output_doc.write(f)
print(f"{output_suffix} 已截取并保存页面从 {start_page}{end_page}{output_pdf_path}")
return output_pdf_path
#合并封面+招标公告+投标人须知前附表+须知正文 #合并封面+招标公告+投标人须知前附表+须知正文
def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name): def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_name):
""" """
合并 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件以及 truncate_files 中的指定文件 合并 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件
以及 truncate_files 中以指定后缀结尾的文件按照指定顺序合并
参数 参数
- output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径 - output_folder (str): 包含以 {base_file_name}_before.pdf 结尾的 PDF 文件的文件夹路径
@ -448,92 +462,120 @@ def merge_selected_pdfs(output_folder, truncate_files, output_path, base_file_na
- output_path (str): 合并后的 PDF 文件保存路径 - output_path (str): 合并后的 PDF 文件保存路径
- base_file_name (str): 用于匹配文件名的基础名称 - base_file_name (str): 用于匹配文件名的基础名称
""" """
# 1. 查找 output_folder 中以 {base_file_name}_before.pdf 结尾的 PDF 文件 # 1. 获取 output_folder 中所有文件
pattern = f'*{base_file_name}_before.pdf' try:
before_pdfs = glob.glob(os.path.join(output_folder, pattern)) all_output_files = os.listdir(output_folder)
print(f"找到 {len(before_pdfs)} 个以 '{base_file_name}_before.pdf' 结尾的文件。") except FileNotFoundError:
print(f"输出文件夹 '{output_folder}' 未找到。")
return
except PermissionError:
print(f"没有权限访问输出文件夹 '{output_folder}'")
return
# 2. 获取 truncate_files 中指定的文件索引5、3、4 # 2. 定义要选择的文件后缀及合并顺序,包括 before 文件
selected_indices = [5, 3, 4] # 注意索引从0开始 desired_suffixes = [
selected_truncate_pdfs = [] f'{base_file_name}_before.pdf',
for idx in selected_indices: f'{base_file_name}_notice.pdf',
if idx < len(truncate_files): f'{base_file_name}_tobidders_notice_part1.pdf',
selected_truncate_pdfs.append(truncate_files[idx]) f'{base_file_name}_tobidders_notice_part2.pdf'
print(f"选中 truncate_files[{idx}]: {truncate_files[idx]}") ]
all_pdfs_to_merge = []
for suffix in desired_suffixes:
if suffix == f'{base_file_name}_before.pdf':
# 从 output_folder 中选择以 {base_file_name}_before.pdf 结尾的文件
matching_files = [
os.path.join(output_folder, f)
for f in all_output_files
if f.endswith(suffix)
]
else: else:
print(f"truncate_files 列表中没有索引为 {idx} 的元素。") # 从 truncate_files 中选择以指定后缀结尾的文件
matching_files = [f for f in truncate_files if f.endswith(suffix)]
if matching_files:
# 如果找到多个匹配的文件,按名称排序并添加
matching_files_sorted = sorted(matching_files)
all_pdfs_to_merge.extend(matching_files_sorted)
for f in matching_files_sorted:
print(f"选中文件: {f}")
else:
print(f"没有找到以 '{suffix}' 结尾的文件。")
# 3. 合并所有 PDF 文件
all_pdfs_to_merge = before_pdfs + selected_truncate_pdfs
print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}") print(f"总共将要合并的 PDF 文件数量: {len(all_pdfs_to_merge)}")
if not all_pdfs_to_merge: if not all_pdfs_to_merge:
print("没有找到要合并的 PDF 文件。") print("没有找到要合并的 PDF 文件。")
return return
# 调用 merge_pdfs 函数 # 调用 merge_pdfs 函数进行合并
merge_pdfs(all_pdfs_to_merge, output_path) merge_pdfs(all_pdfs_to_merge, output_path)
print(f"已成功合并 PDF 文件到 '{output_path}'")
def truncate_pdf_main(input_path, output_folder, selection, output_suffix="default"): def truncate_pdf_main(input_path, output_folder, selection, output_suffix="default"):
if selection == 1: try:
# 更新的正则表达式以匹配"第x章"和"第x部分",考虑到可能的空格和其他文字 if selection == 1:
begin_pattern = re.compile( # 更新的正则表达式以匹配"第x章"和"第x部分",考虑到可能的空格和其他文字
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|' begin_pattern = re.compile(
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?采购.*' r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:服务|项目|商务).*?要求|'
) r'^第[一二三四五六七八九十百千]+(?:章|部分).*?采购.*'
begin_page = 5 )
end_pattern = re.compile( begin_page = 5
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+' end_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
local_output_suffix = "procurement" )
elif selection == 2: local_output_suffix = "procurement"
begin_pattern = re.compile( elif selection == 2:
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*' begin_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(磋商|谈判|评标|评定|评审)(方法|办法).*'
begin_page = 5 )
end_pattern = re.compile( begin_page = 5
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+' end_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+'
local_output_suffix = "evaluation_method" )
elif selection == 3: local_output_suffix = "evaluation_method"
begin_pattern = re.compile( elif selection == 3:
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE begin_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(资格审查).*', re.MULTILINE
begin_page = 5 )
end_pattern = re.compile( begin_page = 5
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE end_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
local_output_suffix = "qualification1" )
elif selection == 4: # 投标人须知前附表和正文 local_output_suffix = "qualification1"
begin_page = 1 elif selection == 4: # 投标人须知前附表和正文
begin_pattern = re.compile( begin_page = 1
r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)', begin_pattern = re.compile(
re.MULTILINE r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)',
) re.MULTILINE
end_pattern = re.compile( )
r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE end_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分)\s*[\u4e00-\u9fff]+', re.MULTILINE
local_output_suffix = "tobidders_notice" )
elif selection == 5: # 招标公告 local_output_suffix = "tobidders_notice"
begin_page = 0 elif selection == 5: # 招标公告
begin_pattern = re.compile( begin_page = 0
r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书).*' begin_pattern = re.compile(
) r'^第[一二三四五六七八九十百千]+(?:章|部分).*?(?:公告|邀请书).*'
end_pattern = re.compile( )
r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)' end_pattern = re.compile(
) r'^(?:第[一二三四五六七八九十百千]+(?:章|部分)\s*(?:投标人|磋商|供应商|谈判供应商|磋商供应商)须知+|(?:一\s*、\s*)?(?:投标人|磋商|供应商)须知前附表)'
local_output_suffix = "notice" )
else: local_output_suffix = "notice"
print("无效的选择:请选择1-6") else:
return None print("无效的选择:请选择1-5")
return None
# 如果传入的 output_suffix 是 'default',则使用本地生成的 output_suffix # 如果传入的 output_suffix 是 'default',则使用本地生成的 output_suffix
if output_suffix == "default": if output_suffix == "default":
output_suffix = local_output_suffix output_suffix = local_output_suffix
# 调用相应的处理函数 # 调用相应的处理函数
return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) return process_input(input_path, output_folder, begin_pattern, begin_page, end_pattern, output_suffix) or ""
except Exception as e:
print(f"Error in truncate_pdf_main: {e}")
return "" # 返回空字符串
def truncate_pdf_multiple(pdf_path, output_folder): def truncate_pdf_multiple(pdf_path, output_folder):
@ -554,13 +596,48 @@ def truncate_pdf_multiple(pdf_path, output_folder):
return truncate_files return truncate_files
#小解析,只需要前三章内容
def truncate_pdf_specific(pdf_path, output_folder):
"""
处理 PDF 文件选择 selection 4 5 的部分并合并结果
Args:
pdf_path (str): 要处理的 PDF 文件路径
output_folder (str): 截取后的文件保存文件夹路径
Returns:
list: 截取的文件路径列表包括合并后的文件路径如果有
"""
base_file_name = os.path.splitext(os.path.basename(pdf_path))[0]
truncate_files = []
selections = [4, 5] # 仅处理 selection 4 和 5
for selection in selections:
files = truncate_pdf_main(pdf_path, output_folder, selection)
if files:
if isinstance(files, list):
truncate_files.extend(files)
elif isinstance(files, str):
truncate_files.append(files)
if truncate_files:
merged_output_path = os.path.join(output_folder, f"{base_file_name}_merged_specific.pdf")
merge_selected_pdfs(output_folder, truncate_files, merged_output_path, base_file_name)
truncate_files.append(merged_output_path)
print(f"已生成合并文件: {merged_output_path}")
else:
print(f"没有文件需要合并 for {pdf_path}")
return truncate_files
# TODO:交通智能系统和招标(1)(1)文件有问题 sele=4的时候excludsion有问题 # TODO:交通智能系统和招标(1)(1)文件有问题 sele=4的时候excludsion有问题
if __name__ == "__main__": if __name__ == "__main__":
input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\6.2定版视频会议磋商文件.pdf" input_path = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\招标文件正文(1).pdf"
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\truncate_all" output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\truncate_all"
files = truncate_pdf_multiple(input_path, output_folder) # files = truncate_pdf_multiple(input_path, output_folder)
print(files) files=truncate_pdf_specific(input_path,output_folder)
print(files[-1])
# selection = 1 # 例如1 - 商务技术服务要求, 2 - 评标办法, 3 - 资格审查后缀有qualification1或qualification2与评标办法一致 4.投标人须知前附表part1 投标人须知正文part2 5-公告 # selection = 1 # 例如1 - 商务技术服务要求, 2 - 评标办法, 3 - 资格审查后缀有qualification1或qualification2与评标办法一致 4.投标人须知前附表part1 投标人须知正文part2 5-公告
# generated_files = truncate_pdf_main(input_path, output_folder, selection) # generated_files = truncate_pdf_main(input_path, output_folder, selection)

View File

@ -6,8 +6,6 @@ from flask_app.main.通义千问long import qianwen_long, upload_file
from flask_app.main.多线程提问 import multi_threading from flask_app.main.多线程提问 import multi_threading
from flask_app.main.json_utils import extract_content_from_json, clean_json_string from flask_app.main.json_utils import extract_content_from_json, clean_json_string
from flask_app.货物标.投标人须知正文条款提取成json文件货物标版 import convert_clause_to_json from flask_app.货物标.投标人须知正文条款提取成json文件货物标版 import convert_clause_to_json
from flask_app.货物标.货物标截取pdf import truncate_pdf_main
# 这个字典可能有嵌套,你需要遍历里面的键名,对键名作判断,而不是键值,具体是这样的:如果处于同一层级的键的数量>1并且键名全由数字或点号组成。那么就将这些序号键名全部删除重新组织成一个字典格式的数据你可以考虑用字符串列表来保持部分平级的数据 # 这个字典可能有嵌套,你需要遍历里面的键名,对键名作判断,而不是键值,具体是这样的:如果处于同一层级的键的数量>1并且键名全由数字或点号组成。那么就将这些序号键名全部删除重新组织成一个字典格式的数据你可以考虑用字符串列表来保持部分平级的数据
# 对于同级的键,如果数量>1且键名都统一那么将键名去掉用列表保持它们的键值 # 对于同级的键,如果数量>1且键名都统一那么将键名去掉用列表保持它们的键值
@ -340,6 +338,7 @@ def process_additional_queries(combined_res, match_keys, output_folder, notice_p
Returns: Returns:
dict: 更新后的最终结果 dict: 更新后的最终结果
""" """
#对于空的notice_path的情况此处做了异常处理
clause2_path = convert_clause_to_json(notice_path, output_folder, 2) clause2_path = convert_clause_to_json(notice_path, output_folder, 2)
updated_match_keys = process_match_keys(match_keys, clause2_path) updated_match_keys = process_match_keys(match_keys, clause2_path)
@ -369,45 +368,67 @@ def process_additional_queries(combined_res, match_keys, output_folder, notice_p
return final_result return final_result
def combine_qualification_review(output_folder,qualification_path, notice_path,knowledge_name): def combine_qualification_review(output_folder, qualification_path, notice_path, baseinfo_path):
# 定义默认的评审结果字典
DEFAULT_QUALIFICATION_REVIEW = {
"资格审查":{
"资格审查": "",
"符合性审查": ""
}
}
""" """
组合资格性审查和符合性审查的评审结果 组合资格性审查和符合性审查的评审结果
Args: Args:
truncate_file: 要上传和处理的文件 output_folder (str): 输出文件夹路径
knowledge_name: 知识库的名称用于后续查询 qualification_path (str): 资格审查文件路径
notice_path (str): 招标公告文件路径
knowledge_name (str): 知识库的名称用于后续查询
Returns: Returns:
dict: 最终组合的评审结果 dict: 最终组合的评审结果
""" """
# 上传文件并获取文件ID # 如果 qualification_path 是空字符串,直接返回包含空字符串的字典
file_id = upload_file(qualification_path) if not qualification_path:
# 定义用户查询列表 return DEFAULT_QUALIFICATION_REVIEW.copy()
user_query = [
"该招标文件中规定的资格性审查标准是怎样的请以json格式给出外层为'资格性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关符合性性审查的内容。",
"该招标文件中规定的符合性审查标准是怎样的请以json格式给出外层为'符合性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关资格性审查的内容。"
]
# 执行多线程查询
results = multi_threading(user_query, "", file_id, 2)
combined_res = {} try:
for question, response in results: # 上传文件并获取文件ID
if response: file_id = upload_file(qualification_path)
cleaned_data = clean_json_string(response) # 清理大模型回答
processed1 = preprocess_dict(cleaned_data)
processed2 = process_dict(processed1)
combined_res.update(processed2)
else:
print(f"Warning: No response for question '{question}'.")
# 查找章节或条款引用 # 定义用户查询列表
match_keys = find_chapter_clause_references(combined_res) #[{'资格性审查.资格要求': '符合本采购文件第一章第二款要求,并提供合格有效的证明材料。'}] user_query = [
"该招标文件中规定的资格性审查标准是怎样的请以json格式给出外层为'资格性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关符合性性审查的内容。",
"该招标文件中规定的符合性审查标准是怎样的请以json格式给出外层为'符合性审查',你的回答要与原文完全一致,不可擅自总结删减,也不要回答有关资格性审查的内容。"
]
# 如果没有匹配的章节或条款,直接返回 combined_res # 执行多线程查询
if not match_keys: results = multi_threading(user_query, "", file_id, 2)
return {"资格审查":combined_res}
# 调用新的函数处理后续逻辑 combined_res = {}
return process_additional_queries(combined_res, match_keys, output_folder,notice_path,knowledge_name) for question, response in results:
if response:
cleaned_data = clean_json_string(response) # 清理大模型回答
processed1 = preprocess_dict(cleaned_data)
processed2 = process_dict(processed1)
combined_res.update(processed2)
else:
print(f"Warning: No response for question '{question}'.")
# 查找章节或条款引用
match_keys = find_chapter_clause_references(combined_res) # [{'资格性审查.资格要求': '符合本采购文件第一章第二款要求,并提供合格有效的证明材料。'}]
# 如果没有匹配的章节或条款,直接返回 combined_res
if not match_keys:
return {"资格审查": combined_res}
# 调用新的函数处理后续逻辑
return process_additional_queries(combined_res, match_keys, output_folder, notice_path, baseinfo_path)
except Exception as e:
print(f"Error in combine_qualification_review: {e}")
# 在出错时返回默认的包含空字符串的字典
return DEFAULT_QUALIFICATION_REVIEW.copy()
# 整合基础信息核心代码 # 整合基础信息核心代码
@ -416,7 +437,8 @@ def combine_qualification_review(output_folder,qualification_path, notice_path,k
if __name__ == "__main__": if __name__ == "__main__":
# qualification_path="C:\\Users\\Administrator\\Desktop\\货物标\\output3\\6.2定版视频会议磋商文件_qualification2.pdf" # qualification_path="C:\\Users\\Administrator\\Desktop\\货物标\\output3\\6.2定版视频会议磋商文件_qualification2.pdf"
output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\zboutpub" output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\zboutpub"
qualification_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output3\\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件_qualification2.pdf" # qualification_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output3\\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件_qualification2.pdf"
qualification_path=""
notice_path="C:\\Users\\Administrator\\Desktop\\货物标\\output5\\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件_notice.pdf" notice_path="C:\\Users\\Administrator\\Desktop\\货物标\\output5\\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件_notice.pdf"
# knowledge_name = "6.2视频会议docx" # knowledge_name = "6.2视频会议docx"
baseinfo_path="C:\\Users\\Administrator\\Desktop\\货物标\\zboutpub\\merged_baseinfo.pdf" baseinfo_path="C:\\Users\\Administrator\\Desktop\\货物标\\zboutpub\\merged_baseinfo.pdf"