zbparse/flask_app/货物标/货物标解析main.py

267 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 竞磋 竞谈 磋商 询价 邀请 单一来源
import json
import time
from flask_app.general.format_change import docx2pdf, pdf2docx,doc2docx
from flask_app.general.json_utils import transform_json_values
from flask_app.货物标.基础信息解析main import combine_basic_info
from flask_app.货物标.投标人须知正文提取指定内容货物标版 import extract_from_notice
from flask_app.货物标.截取pdf货物标版 import truncate_pdf_multiple
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
from flask_app.货物标.提取json货物标版 import convert_clause_to_json
from flask_app.general.无效标和废标公共代码 import combine_find_invalid
from flask_app.货物标.资格审查main import combine_qualification_review
from flask_app.货物标.评分标准提取main import combine_evaluation_standards
import logging
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
# 创建全局线程池
executor = ThreadPoolExecutor()
def preprocess_files(output_folder, file_path, file_type,logger):
logger.info("starting 文件预处理...")
start_time = time.time()
logger.info("output_folder..." + output_folder)
# 根据文件类型处理文件路径
if file_type == 1: # docx
docx_path = file_path
pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = file_path
docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库
elif file_type == 3: # doc
pdf_path = docx2pdf(file_path)
docx_path = doc2docx(file_path)
else:
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
# # 异步上传知识库
# future_knowledge = executor.submit(addfileToKnowledge, docx_path, "招标解析" + unique_id)
# 调用截取PDF多次
truncate_files = truncate_pdf_multiple(pdf_path,
output_folder) # index: 0->商务技术服务要求 1->评标办法 2->资格审查 3->投标人须知前附表 4->投标人须知正文
# 处理各个部分
invalid_path=pdf_path
invalid_docpath = docx_path # docx截取无效标部分
procurement_path = truncate_files[5] # 采购需求
evaluation_method_path = truncate_files[1] # 评标办法
qualification_path = truncate_files[2] # 资格审查
tobidders_notice_path = truncate_files[4] # 投标人须知正文
notice_path = truncate_files[0] #招标公告
merged_baseinfo_path = truncate_files[6] # 合并封面+招标公告+投标人须知前附表+须知正文
clause_path = convert_clause_to_json(tobidders_notice_path, output_folder) # 投标人须知正文条款pdf->json
end_time = time.time()
logger.info(f"文件预处理 done耗时{end_time - start_time:.2f}")
# 提前返回,不等待 future_knowledge 完成,返回包含 Future 对象
return {
'invalid_path': invalid_path,
'output_folder': output_folder,
'procurement_path': procurement_path,
'evaluation_method_path': evaluation_method_path,
'qualification_path': qualification_path,
'notice_path': notice_path,
# 'knowledge_future': future_knowledge, # 返回 Future 对象
'clause_path': clause_path,
'invalid_docpath': invalid_docpath,
'merged_baseinfo_path': merged_baseinfo_path
}
def fetch_project_basic_info(invalid_path,merged_baseinfo_path, procurement_path, clause_path,logger):
logger.info("starting 基础信息...")
start_time = time.time()
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_path
if not procurement_path:
procurement_path=invalid_path
basic_res = combine_basic_info(merged_baseinfo_path, procurement_path,clause_path,invalid_path)
base_info, good_list = post_process_baseinfo(basic_res,logger)
end_time = time.time()
logger.info(f"基础信息 done耗时{end_time - start_time:.2f}")
return base_info, good_list
def fetch_qualification_review(invalid_path,qualification_path, notice_path,logger):
logger.info("starting 资格审查...")
start_time = time.time()
review_standards_res = combine_qualification_review(invalid_path,qualification_path, notice_path)
end_time = time.time()
logger.info(f"资格审查 done耗时{end_time - start_time:.2f}")
return review_standards_res
def fetch_evaluation_standards(invalid_path, evaluation_method_path,logger):
logger.info("starting 商务评分和技术评分...")
start_time = time.time()
if not evaluation_method_path:
evaluation_method_path = invalid_path
evaluation_standards_res = combine_evaluation_standards(evaluation_method_path)
technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})}
commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})}
end_time = time.time()
logger.info(f"商务评分和技术评分 done耗时{end_time - start_time:.2f}")
return {
"technical_standards": technical_standards,
"commercial_standards": commercial_standards
}
def fetch_invalid_requirements(invalid_docpath, output_folder,logger):
logger.info("starting 无效标与废标...")
start_time = time.time()
find_invalid_res = combine_find_invalid(invalid_docpath, output_folder)
end_time = time.time()
logger.info(f"无效标与废标 done耗时{end_time - start_time:.2f}")
return find_invalid_res
def fetch_bidding_documents_requirements(invalid_path,merged_baseinfo_path,clause_path,logger):
logger.info("starting 投标文件要求...")
if not merged_baseinfo_path:
merged_baseinfo_path=invalid_path
start_time = time.time()
selection=1
fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path,clause_path, selection)
end_time = time.time()
logger.info(f"投标文件要求 done耗时{end_time - start_time:.2f}")
return {"投标文件要求": fetch_bidding_documents_requirements_json}
# 开评定标流程
def fetch_bid_opening(invalid_path,merged_baseinfo_path,clause_path,logger):
logger.info("starting 开评定标流程...")
if not merged_baseinfo_path:
merged_baseinfo_path=invalid_path
start_time = time.time()
selection=2
fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path,clause_path, selection)
end_time = time.time()
logger.info(f"开评定标流程 done耗时{end_time - start_time:.2f}")
return {"开评定标流程": fetch_bid_opening_json}
def post_process_baseinfo(base_info,logger):
"""
'base_info' 任务完成后执行的函数。
确保在缺少某些键时,返回 good_list=[]。
参数:
- base_info (dict): 原始的 base_info 数据。
返回:
- tuple: (处理后的 base_info, good_list)
"""
try:
pure_base_info = base_info.get("基础信息", {})
# 尝试提取 '货物列表',若中间某个键不存在,返回 good_list=[]
procurement_reqs = pure_base_info.get('采购要求', {})
technical_requirements = procurement_reqs.get('采购需求', {})
good_list = technical_requirements.pop('货物列表', []) # 如果 '货物列表' 不存在,返回 []
# 删除 '货物列表' 后更新原始的 base_info
if '采购需求' in procurement_reqs and not technical_requirements:
# 若技术要求为空,删除该键
procurement_reqs.pop('采购需求')
if '采购要求' in pure_base_info and not procurement_reqs:
# 若采购要求为空,删除该键
pure_base_info.pop('采购要求')
# 更新基础信息
base_info['基础信息'] = pure_base_info
# logger.info(f"Extracted good_list: {good_list}")
return base_info, good_list
except Exception as e:
logger.error(f"Error in post_process_baseinfo: {e}")
return base_info, [] # 返回空列表
def goods_bid_main(output_folder, file_path, file_type, unique_id):
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_files(output_folder, file_path, file_type,logger)
if not processed_data:
yield json.dumps({}) # 如果处理数据失败,返回空的 JSON
with concurrent.futures.ThreadPoolExecutor() as executor:
# 立即启动不依赖 knowledge_name 和 index 的任务
futures = {
'evaluation_standards': executor.submit(fetch_evaluation_standards,processed_data['invalid_path'],
processed_data['evaluation_method_path'],logger),
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'],
output_folder,logger),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,processed_data['invalid_path'],processed_data['merged_baseinfo_path'],
processed_data['clause_path'],logger),
'opening_bid': executor.submit(fetch_bid_opening, processed_data['invalid_path'],processed_data['merged_baseinfo_path'],processed_data['clause_path'],logger),
'base_info': executor.submit(fetch_project_basic_info, processed_data['invalid_path'],processed_data['merged_baseinfo_path'],
processed_data['procurement_path'],processed_data['clause_path'],logger),
'qualification_review': executor.submit(fetch_qualification_review, processed_data['invalid_path'],
processed_data['qualification_path'],
processed_data['notice_path'],logger),
}
# 提前处理这些不依赖的任务,按完成顺序返回
for future in concurrent.futures.as_completed(futures.values()):
key = next(k for k, v in futures.items() if v == future)
try:
result = future.result()
if key == 'base_info':
base_info, good_list = result
collected_good_list = good_list # Store good_list for later use
yield json.dumps({'base_info': transform_json_values(base_info)}, ensure_ascii=False)
# 如果是 evaluation_standards拆分技术标和商务标
elif key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别返回技术标和商务标
yield json.dumps({'technical_standards': transform_json_values(technical_standards)},ensure_ascii=False)
yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)},ensure_ascii=False)
else:
# 处理其他任务的结果
yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False)
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False)
if collected_good_list is not None:
yield json.dumps({'good_list': transform_json_values(collected_good_list)}, ensure_ascii=False)
#广水市 2022 年义务教育学校多媒体补充采购项目 资格审查有问题
#TODO:把所有未知都删掉。
#TODO:考虑把解析失败的调用豆包,全文上传。
#TODO:同系统下的多个货物,记录一下数量
#TODO:设备前面带星,而不是要求前面带星。
#商务标这里改为列表最里层
#good_list 金额 截取上下文
if __name__ == "__main__":
# 配置日志器
unique_id = "uuidzyzy11"
# logger = get_global_logger(unique_id)
output_folder = "flask_app/static/output/zytest1"
file_type = 1 # 1:docx 2:pdf 3:其他
input_file = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\6.2定版视频会议磋商文件.pdf"
start_time = time.time()
# 创建生成器
generator = goods_bid_main(output_folder, input_file, file_type, unique_id)
# 迭代生成器,逐步获取和处理结果
for output in generator:
print(output)
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time:.2f} seconds.")