zbparse/flask_app/main/工程标解析main.py
2024-11-13 11:46:13 +08:00

238 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from flask_app.general.投标人须知正文提取指定内容 import get_requirements_with_gpt
from flask_app.main.截取pdf import truncate_pdf_multiple
from flask_app.general.merge_pdfs import merge_pdfs
from flask_app.main.table_content_extraction import extract_tables_main
from flask_app.main.提取json工程标版 import convert_clause_to_json
from flask_app.general.json_utils import transform_json_values
from flask_app.general.无效标和废标公共代码 import combine_find_invalid
from flask_app.main.投标人须知正文提取指定内容 import extract_from_notice
import concurrent.futures
from flask_app.main.基础信息整合快速版 import combine_basic_info
from flask_app.main.资格审查模块 import combine_review_standards
from flask_app.main.商务评分技术评分整合 import combine_evaluation_standards
from flask_app.general.format_change import pdf2docx, docx2pdf,doc2docx
from flask_app.general.docx截取docx import copy_docx
from flask_app.general.通义千问long import upload_file,qianwen_long
from flask_app.general.json_utils import clean_json_string
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
logger=None
# 创建全局线程池
executor = ThreadPoolExecutor()
def preprocess_files(output_folder, downloaded_file_path, file_type,unique_id):
logger.info("starting 文件预处理...")
logger.info("output_folder..." + output_folder)
start_time=time.time()
# 根据文件类型处理文件路径
if file_type == 1: # docx
docx_path = downloaded_file_path
pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = downloaded_file_path
docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库
elif file_type == 3: #doc
pdf_path=docx2pdf(downloaded_file_path)
docx_path=doc2docx(downloaded_file_path)
else:
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
# 调用截取PDF多次
truncate_files = truncate_pdf_multiple(pdf_path, output_folder,unique_id)
# 处理各个部分
tobidders_notice_table=truncate_files[0]
# tobidders_notice_table_docx = pdf2docx(tobidders_notice_table) # 投标人须知前附表转docx
# truncate_jsonpath = extract_tables_main(tobidders_notice_table_docx, output_folder) # 投标人须知前附表docx->json
tobidders_notice = truncate_files[1] #投标人须知正文
evaluation_method = truncate_files[2] #评标方法
qualification = truncate_files[3] #资格审查
invalid_path=truncate_files[5]
# invalid_docpath = copy_docx(docx_path) # docx截取无效标部分
invalid_docpath=pdf2docx(invalid_path)
merged_baseinfo_path=truncate_files[-1]
more_path=[merged_baseinfo_path,tobidders_notice]
merged_baseinfo_path_more=os.path.join(output_folder,"merged_baseinfo_path_more.pdf")
merge_pdfs(more_path,merged_baseinfo_path_more)
clause_path = convert_clause_to_json(tobidders_notice, output_folder) # 投标人须知正文条款pdf->json
end_time=time.time()
logger.info(f"文件预处理 done耗时{end_time - start_time:.2f}")
# 返回包含预处理后文件路径的字典
return {
'file_path': downloaded_file_path,
'output_folder': output_folder,
'invalid_path':invalid_path,
'tobidders_notice_table': tobidders_notice_table,
'tobidders_notice': tobidders_notice,
'evaluation_method':evaluation_method,
'qualification': qualification,
'merged_baseinfo_path':merged_baseinfo_path,
'merged_baseinfo_path_more':merged_baseinfo_path_more,
'clause_path': clause_path,
'invalid_docpath': invalid_docpath
}
# 基本信息
def fetch_project_basic_info(invalid_path, merged_baseinfo_path, merged_baseinfo_path_more,tobidders_notice, clause_path):
logger.info("starting 基础信息...")
start_time = time.time()
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_path
if not merged_baseinfo_path_more:
merged_baseinfo_path_more=invalid_path
if not tobidders_notice:
tobidders_notice = invalid_path
basic_res = combine_basic_info(merged_baseinfo_path,merged_baseinfo_path_more,tobidders_notice, clause_path)
end_time = time.time()
logger.info(f"基础信息 done耗时{end_time - start_time:.2f}")
return basic_res
# 形式、响应、资格评审
def fetch_qualification_review(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_path, merged_baseinfo_path):
logger.info("starting 资格审查...")
start_time = time.time()
if not evaluation_method:
evaluation_method = invalid_path
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_path
review_standards_res = combine_review_standards(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_path, merged_baseinfo_path)
end_time = time.time()
logger.info(f"资格审查 done耗时{end_time - start_time:.2f}")
return review_standards_res
# 评分细则 流式
def fetch_evaluation_standards(invalid_path, evaluation_method):
logger.info("starting 商务标和技术标...")
start_time = time.time()
if not evaluation_method:
evaluation_method = invalid_path
evaluation_standards_res = combine_evaluation_standards(evaluation_method)
technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})}
commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})}
end_time = time.time()
logger.info(f"商务标和技术标 done耗时{end_time - start_time:.2f}")
return {
"technical_standards": technical_standards,
"commercial_standards": commercial_standards
}
# 无效、废标项解析
def fetch_invalid_requirements(invalid_docpath, output_folder):
logger.info("starting 无效标与废标...")
start_time = time.time()
find_invalid_res = combine_find_invalid(invalid_docpath, output_folder)
end_time = time.time()
logger.info(f"无效标与废标 done耗时{end_time - start_time:.2f}")
return find_invalid_res
# 投标文件要求
def fetch_bidding_documents_requirements(invalid_path, merged_baseinfo_path_more,clause_path):
logger.info("starting 投标文件要求...")
start_time = time.time()
if not merged_baseinfo_path_more:
merged_baseinfo_path_more=invalid_path
selection = 1
fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection)
end_time = time.time()
logger.info(f"投标文件要求 done耗时{end_time - start_time:.2f}")
return {"投标文件要求": fetch_bidding_documents_requirements_json}
# 开评定标流程
def fetch_bid_opening(invalid_path, merged_baseinfo_path_more,clause_path):
logger.info("starting 开评定标流程...")
start_time = time.time()
if not merged_baseinfo_path_more:
merged_baseinfo_path_more=invalid_path
selection = 2
fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection)
end_time = time.time()
logger.info(f"开评定标流程 done耗时{end_time - start_time:.2f}")
return {"开评定标流程": fetch_bid_opening_json}
#分段返回
def engineering_bid_main(output_folder, downloaded_file_path, file_type, unique_id):
global logger
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_files(output_folder, downloaded_file_path, file_type,unique_id)
if not processed_data:
yield json.dumps({}) # 如果处理数据失败,返回空的 JSON
with concurrent.futures.ThreadPoolExecutor() as executor:
# 立即启动不依赖 knowledge_name 和 index 的任务
futures = {
'base_info': executor.submit(fetch_project_basic_info,processed_data['invalid_path'] ,processed_data['merged_baseinfo_path'],processed_data['merged_baseinfo_path_more'],
processed_data['tobidders_notice'], processed_data['clause_path']),
'qualification_review': executor.submit(fetch_qualification_review, processed_data['evaluation_method'],
processed_data['qualification'], output_folder,
processed_data['tobidders_notice_table'],
processed_data['clause_path'], processed_data['invalid_path'],
processed_data['merged_baseinfo_path']),
'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['invalid_path'],processed_data['evaluation_method']),
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'],output_folder),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,processed_data['invalid_path'], processed_data['merged_baseinfo_path_more'],processed_data['clause_path']),
'opening_bid': executor.submit(fetch_bid_opening,processed_data['invalid_path'],processed_data['merged_baseinfo_path_more'], processed_data['clause_path'])
}
# 提前处理这些不依赖的任务,按完成顺序返回
for future in concurrent.futures.as_completed(futures.values()):
key = next(k for k, v in futures.items() if v == future)
try:
result = future.result()
# 如果是 evaluation_standards拆分技术标和商务标
if key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别返回技术标和商务标
yield json.dumps({'technical_standards': transform_json_values(technical_standards)}, ensure_ascii=False)
yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)}, ensure_ascii=False)
else:
# 处理其他任务的结果
yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False)
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False)
#TODO:废标项,针对新文件作优化,统一成货物标的处理逻辑
#TODO:基本信息,判断是否这里,打勾逻辑取消了。
if __name__ == "__main__":
start_time = time.time()
output_folder = "C:\\Users\\Administrator\\Desktop\\招标文件\\new_test1"
file_type = 2 #1:docx 2:pdf 3:其他
input_file = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest2.pdf"
print("yes")
# for output in engineering_bid_main(output_folder, input_file, file_type, "zytest"):
# print(output)
preprocess_files(output_folder,input_file,2,"121")
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time} seconds.")