zbparse/flask_app/routes/工程标解析main.py

302 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import multiprocessing
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from docx import Document
from flask_app.general.insert_del_pagemark import insert_mark,delete_mark
from flask_app.general.截取pdf_main import truncate_pdf_multiple
from flask_app.general.merge_pdfs import merge_pdfs
from flask_app.general.读取文件.clean_pdf import is_pure_image
from flask_app.general.通用功能函数 import get_global_logger
from flask_app.general.投标人须知正文条款提取成json文件 import convert_clause_to_json
from flask_app.general.json_utils import transform_json_values
from flask_app.general.无效标和废标公共代码 import combine_find_invalid
from flask_app.general.投标人须知正文提取指定内容 import extract_from_notice
import concurrent.futures
from flask_app.工程标.基础信息整合工程标 import combine_basic_info
from flask_app.工程标.资格审查模块main import combine_review_standards
from flask_app.general.商务技术评分提取 import combine_evaluation_standards
from flask_app.general.format_change import pdf2docx, docx2pdf
def preprocess_file_main(output_folder, file_path, file_type,logger):
# 这里是你原本处理请求的地方
with multiprocessing.Pool(processes=1) as pool:
result = pool.apply(
preprocess_files, # 你的实际执行函数
args=(output_folder, file_path, file_type, logger)
)
return result
def preprocess_files(output_folder, file_path, file_type,logger):
logger.info("starting 文件预处理...")
start_time=time.time()
is_pure_image_flag = False
pdf_path = ""
# 根据文件类型处理文件路径
if file_type == 1: # docx
# docx_path = file_path
if is_pure_image(file_path):
is_pure_image_flag = True
else:
pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = file_path
# docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库
elif file_type == 3: #doc
# docx_path=doc2docx(downloaded_file_path)
pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理
else:
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
if not is_pure_image_flag: # 大多数情况 不是纯图片doc/docx
# 调用截取PDF多次
truncate_files = truncate_pdf_multiple(pdf_path, output_folder,logger,'engineering')
else:
truncate_files = ['', '', '', '', '', file_path, '']
# print("切割出的文件:"+str(truncate_files))
# 处理各个部分
notice_path = truncate_files[0] # 招标公告
evaluation_method = truncate_files[1] # 评标方法
qualification = truncate_files[2] # 资格审查
tobidders_notice_table = truncate_files[3] # 投标人须知前附表
tobidders_notice = truncate_files[4] # 投标人须知正文
invalid_path = truncate_files[5] if truncate_files[5] != "" else pdf_path # 无效标
merged_baseinfo_path = truncate_files[-1]
more_path = [merged_baseinfo_path, tobidders_notice]
merged_baseinfo_path_more = os.path.join(output_folder, "merged_baseinfo_path_more.pdf")
merged_baseinfo_path_more = merge_pdfs(more_path, merged_baseinfo_path_more)
clause_path = convert_clause_to_json(tobidders_notice, output_folder) # 投标人须知正文条款pdf->json
# invalid_docpath = copy_docx(docx_path) # docx截取无效标部分
# invalid_docpath=pdf2docx(invalid_path)
truncate_endtime=time.time()
if not is_pure_image_flag:
invalid_added_pdf = insert_mark(invalid_path)
logger.info(f"文件切分CPU耗时{truncate_endtime - start_time:.2f}")
invalid_added_docx = pdf2docx(invalid_added_pdf) #有标记的invalid_path
try:
# 尝试加载 .docx 文件
doc = Document(invalid_added_docx)
print("yes")
except Exception as e:
# 捕获异常并打印错误信息
invalid_added_docx=pdf2docx(invalid_path)
invalid_deleted_docx=delete_mark(invalid_added_docx) #无标记的invalid_path
if not invalid_deleted_docx:
invalid_deleted_docx=pdf2docx(invalid_path)
else:
invalid_deleted_docx = file_path
invalid_added_docx = ''
try:
# 尝试加载 .docx 文件
doc = Document(invalid_deleted_docx)
except Exception as e:
invalid_deleted_docx=pdf_path if pdf_path!="" else file_path #文档转换还是失败!最后的手段保证文件可用!
end_time=time.time()
logger.info(f"文件预处理总耗时:{end_time - start_time:.2f}")
# 返回包含预处理后文件路径的字典
return {
'invalid_deleted_docx':invalid_deleted_docx,
'invalid_added_docx': invalid_added_docx,
'notice_path':notice_path,
'tobidders_notice_table': tobidders_notice_table,
'evaluation_method':evaluation_method,
'qualification': qualification,
'merged_baseinfo_path':merged_baseinfo_path,
'merged_baseinfo_path_more':merged_baseinfo_path_more,
'clause_path': clause_path
}
# 基本信息
def fetch_project_basic_info(invalid_deleted_docx, merged_baseinfo_path, merged_baseinfo_path_more,clause_path, logger):
logger.info("starting 基础信息...")
start_time = time.time()
try:
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_deleted_docx
if not merged_baseinfo_path_more:
merged_baseinfo_path_more = invalid_deleted_docx
basic_res = combine_basic_info(merged_baseinfo_path, merged_baseinfo_path_more, clause_path,invalid_deleted_docx)
result = basic_res
end_time = time.time()
logger.info(f"基础信息 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 基础信息: {exc}")
# 返回默认值
result = {"基础信息": {}}
return result
def fetch_qualification_review(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_deleted_docx, merged_baseinfo_path, notice_path, logger):
logger.info("starting 资格审查...")
start_time = time.time()
try:
if not notice_path:
notice_path = invalid_deleted_docx
if not evaluation_method:
evaluation_method = invalid_deleted_docx
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_deleted_docx
review_standards_res = combine_review_standards(
evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_deleted_docx, merged_baseinfo_path, notice_path)
result = review_standards_res
end_time = time.time()
logger.info(f"资格审查 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 资格审查: {exc}")
# 返回默认值
result = {"资格审查": {}}
return result
# 评分细则 流式
def fetch_evaluation_standards(invalid_deleted_docx, evaluation_method,logger):
logger.info("starting 商务标和技术标...")
start_time = time.time()
if not evaluation_method:
evaluation_method = invalid_deleted_docx
evaluation_standards_res = combine_evaluation_standards(evaluation_method,invalid_deleted_docx,1)
technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})}
commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})}
end_time = time.time()
logger.info(f"商务标和技术标 done耗时{end_time - start_time:.2f}")
return {
"technical_standards": technical_standards,
"commercial_standards": commercial_standards
}
# 无效、废标项解析
def fetch_invalid_requirements(invalid_added_docx, output_folder, logger):
logger.info("starting 无效标与废标...")
start_time = time.time()
try:
find_invalid_res = combine_find_invalid(invalid_added_docx, output_folder)
result = find_invalid_res
end_time = time.time()
logger.info(f"无效标与废标 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 无效标与废标: {exc}")
# 返回默认值
result = {"无效标与废标": {}}
return result
# 投标文件要求
def fetch_bidding_documents_requirements(invalid_deleted_docx, merged_baseinfo_path_more, clause_path, logger):
logger.info("starting 投标文件要求...")
start_time = time.time()
try:
if not merged_baseinfo_path_more:
merged_baseinfo_path_more = invalid_deleted_docx
selection = 1
fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection)
result = {"投标文件要求": fetch_bidding_documents_requirements_json}
end_time = time.time()
logger.info(f"投标文件要求 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 投标文件要求: {exc}")
# 返回默认值
result = {"投标文件要求": {}}
return result
# 开评定标流程
def fetch_bid_opening(invalid_deleted_docx, merged_baseinfo_path_more, clause_path, logger):
logger.info("starting 开评定标流程...")
start_time = time.time()
try:
if not merged_baseinfo_path_more:
merged_baseinfo_path_more = invalid_deleted_docx
selection = 2
fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection)
result = {"开评定标流程": fetch_bid_opening_json}
end_time = time.time()
logger.info(f"开评定标流程 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 开评定标流程: {exc}")
# 返回默认值
result = {"开评定标流程": {}}
return result
def engineering_bid_main(output_folder, file_path, file_type, unique_id):
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_file_main(output_folder, file_path, file_type,logger)
if not processed_data:
error_response = {
'error': '文件预处理失败。请检查文件类型并重试。'
}
yield json.dumps(error_response, ensure_ascii=False)
return # 停止进一步处理
with ProcessPoolExecutor() as executor:
# 立即启动不依赖 knowledge_name 和 index 的任务
futures = {
'base_info': executor.submit(fetch_project_basic_info,processed_data['invalid_deleted_docx'] ,processed_data['merged_baseinfo_path'],processed_data['merged_baseinfo_path_more'],
processed_data['clause_path'],logger),
'qualification_review': executor.submit(fetch_qualification_review, processed_data['evaluation_method'],
processed_data['qualification'], output_folder,
processed_data['tobidders_notice_table'],
processed_data['clause_path'], processed_data['invalid_deleted_docx'],
processed_data['merged_baseinfo_path'],processed_data['notice_path'],logger),
'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['invalid_deleted_docx'],processed_data['evaluation_method'],logger),
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_added_docx'],output_folder,logger),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,processed_data['invalid_deleted_docx'], processed_data['merged_baseinfo_path_more'],processed_data['clause_path'],logger),
'opening_bid': executor.submit(fetch_bid_opening,processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path_more'], processed_data['clause_path'],logger)
}
# 提前处理这些不依赖的任务,按完成顺序返回
for future in as_completed(futures.values()):
key = next(k for k, v in futures.items() if v == future)
try:
result = future.result()
# 如果是 evaluation_standards拆分技术标和商务标
if key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别返回技术标和商务标
yield json.dumps({'technical_standards': transform_json_values(technical_standards)}, ensure_ascii=False)
yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)}, ensure_ascii=False)
else:
# 处理其他任务的结果
yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False)
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
if key == 'evaluation_standards':
# 返回默认的商务评分和技术评分
default_evaluation = {
'technical_standards': {"技术评分": ""},
'commercial_standards': {"商务评分": ""}
}
yield json.dumps(default_evaluation, ensure_ascii=False)
# yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False)
#TODO:基本信息,判断是否这里,打勾逻辑取消了。
if __name__ == "__main__":
start_time = time.time()
output_folder = r"C:\Users\Administrator\Desktop\fsdownload\bb2747ad-5578-4b3b-b60f-3a2d1b672b6f"
file_type = 2 #1:docx 2:pdf 3:其他
input_file = r"C:\Users\Administrator\Desktop\fsdownload\bb2747ad-5578-4b3b-b60f-3a2d1b672b6f\zytest.pdf"
print("yes")
for output in engineering_bid_main(output_folder, input_file, file_type, "zytest"):
print(output)
# preprocess_files(output_folder,input_file,2,"121")
# engineering_bid_main(output_folder,input_file,2,"111")
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time} seconds.")