zbparse/flask_app/routes/货物标解析main.py

329 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import multiprocessing
import time
from docx import Document
from flask_app.general.format_change import docx2pdf, pdf2docx
from flask_app.general.insert_del_pagemark import insert_mark, delete_mark
from flask_app.general.json_utils import transform_json_values
from flask_app.general.读取文件.clean_pdf import is_pure_image
from flask_app.general.通用功能函数 import get_global_logger
from flask_app.货物标.基础信息解析货物标版 import combine_basic_info
from flask_app.general.投标人须知正文提取指定内容 import extract_from_notice
from flask_app.general.截取pdf_main import truncate_pdf_multiple
from concurrent.futures import as_completed, ProcessPoolExecutor
from flask_app.general.投标人须知正文条款提取成json文件 import convert_clause_to_json
from flask_app.general.无效标和废标公共代码 import combine_find_invalid
from flask_app.货物标.资格审查main import combine_qualification_review
from flask_app.general.商务技术评分提取 import combine_evaluation_standards
from concurrent.futures import ThreadPoolExecutor
def preprocess_file_main(output_folder, file_path, file_type,logger):
# 这里是你原本处理请求的地方
with multiprocessing.Pool(processes=1) as pool:
result = pool.apply(
preprocess_files, # 你的实际执行函数
args=(output_folder, file_path, file_type, logger)
)
return result
def preprocess_files(output_folder, file_path, file_type,logger):
logger.info("starting 文件预处理...")
start_time = time.time()
is_pure_image_flag=False #判断是否为纯图片类型的docx
pdf_path=""
# 根据文件类型处理文件路径
if file_type == 1: # docx
# docx_path = file_path
if is_pure_image(file_path):
is_pure_image_flag=True
else:
pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = file_path
# docx_path = pdf2docx(pdf_path)
elif file_type == 3: # doc
# docx_path = doc2docx(file_path)
pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理
else:
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
if not is_pure_image_flag: #大多数情况 不是纯图片doc/docx
# 调用截取PDF多次
truncate_files = truncate_pdf_multiple(pdf_path, output_folder,logger,'goods')
else:
truncate_files=['','','','','','',file_path,''] #纯图片,无需切片
# print("切割出的文件:"+str(truncate_files))
# 处理各个部分
procurement_path = truncate_files[5] # 采购需求
evaluation_method_path = truncate_files[1] # 评标办法
qualification_path = truncate_files[2] # 资格审查
tobidders_notice_path = truncate_files[4] # 投标人须知正文
notice_path = truncate_files[0] # 招标公告
merged_baseinfo_path = truncate_files[7] # 合并封面+招标公告+投标人须知前附表+须知正文
clause_path = convert_clause_to_json(tobidders_notice_path, output_folder) # 投标人须知正文条款pdf->json
invalid_path = truncate_files[6] if truncate_files[6] != "" else pdf_path #无效标(投标文件格式\合同条款之前的内容)
truncate_endtime = time.time()
logger.info(f"文件切分CPU耗时{truncate_endtime - start_time:.2f}")
if not is_pure_image_flag:
invalid_added_pdf = insert_mark(invalid_path)
change_start=time.time()
invalid_added_docx = pdf2docx(invalid_added_pdf) # 有标记的invalid_path用于废标项提取使用正则。
change_end=time.time()
logger.info(f"文档转换p2d耗时{change_end - change_start:.2f}")
try:
# 尝试加载 .docx 文件
doc = Document(invalid_added_docx)
# print("yes")
except Exception as e:
# 捕获异常并打印错误信息
invalid_added_docx=pdf2docx(invalid_path)
invalid_deleted_docx = delete_mark(invalid_added_docx) # 无标记的invalid_path
if not invalid_deleted_docx:
invalid_deleted_docx=pdf2docx(invalid_path)
else: #主要是节约了pdf2docx的一块钱
invalid_deleted_docx=file_path
invalid_added_docx='' #由于传入的docx是纯图片型正则是提取不到的需要调用大模型。
try:
# 尝试加载 .docx 文件
doc = Document(invalid_deleted_docx)
except Exception as e:
invalid_deleted_docx=pdf_path if pdf_path!="" else file_path #文档转换还是失败!最后的手段保证文件可用!
end_time = time.time()
logger.info(f"文件预处理总耗时:{end_time - start_time:.2f}")
# 提前返回,不等待 future_knowledge 完成,返回包含 Future 对象
return {
'invalid_deleted_docx': invalid_deleted_docx,
'invalid_added_docx': invalid_added_docx,
'output_folder': output_folder,
'procurement_path': procurement_path,
'evaluation_method_path': evaluation_method_path,
'qualification_path': qualification_path,
'notice_path': notice_path,
'clause_path': clause_path,
'merged_baseinfo_path': merged_baseinfo_path
}
def fetch_project_basic_info(invalid_deleted_docx, merged_baseinfo_path, procurement_path, clause_path, logger):
logger.info("starting 基础信息...")
start_time = time.time()
try:
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_deleted_docx
if not procurement_path:
procurement_path = invalid_deleted_docx
basic_res = combine_basic_info(merged_baseinfo_path, procurement_path, clause_path, invalid_deleted_docx)
base_info, good_list = post_process_baseinfo(basic_res, logger)
result = base_info, good_list
end_time = time.time()
logger.info(f"基础信息 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 基础信息: {exc}")
# 返回默认值
result = {"基础信息": {}}, []
return result
def fetch_qualification_review(invalid_deleted_docx, qualification_path, notice_path, logger):
logger.info("starting 资格审查...")
start_time = time.time()
try:
if not notice_path:
notice_path=invalid_deleted_docx
review_standards_res = combine_qualification_review(invalid_deleted_docx, qualification_path, notice_path)
result = review_standards_res
end_time = time.time()
logger.info(f"资格审查 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 资格审查: {exc}")
# 返回默认值
result = {"资格审查": {}}
return result
def fetch_evaluation_standards(invalid_deleted_docx, evaluation_method_path,logger):
logger.info("starting 商务评分和技术评分...")
start_time = time.time()
if not evaluation_method_path:
evaluation_method_path = invalid_deleted_docx
evaluation_standards_res = combine_evaluation_standards(evaluation_method_path,invalid_deleted_docx,2)
technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})}
commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})}
end_time = time.time()
logger.info(f"商务评分和技术评分 done耗时{end_time - start_time:.2f}")
return {
"technical_standards": technical_standards,
"commercial_standards": commercial_standards
}
def fetch_invalid_requirements(invalid_added_docx, output_folder, logger):
logger.info("starting 无效标与废标...")
start_time = time.time()
try:
find_invalid_res = combine_find_invalid(invalid_added_docx, output_folder)
result = find_invalid_res
end_time = time.time()
logger.info(f"无效标与废标 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 无效标与废标: {exc}")
# 返回默认值
result = {"无效标与废标": {}}
return result
#投标文件要求
def fetch_bidding_documents_requirements(invalid_deleted_docx, merged_baseinfo_path, clause_path, logger):
logger.info("starting 投标文件要求...")
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_deleted_docx
start_time = time.time()
selection = 1
try:
fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path, clause_path, selection)
result = {"投标文件要求": fetch_bidding_documents_requirements_json}
end_time = time.time()
logger.info(f"投标文件要求 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 投标文件要求: {exc}")
# 返回默认值,假设默认值为一个空字典
result = {"投标文件要求": {}}
return result
# 开评定标流程
def fetch_bid_opening(invalid_deleted_docx, merged_baseinfo_path, clause_path, logger):
logger.info("starting 开评定标流程...")
if not merged_baseinfo_path:
merged_baseinfo_path = invalid_deleted_docx
start_time = time.time()
selection = 2
try:
fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path, clause_path, selection)
result = {"开评定标流程": fetch_bid_opening_json}
end_time = time.time()
logger.info(f"开评定标流程 done耗时{end_time - start_time:.2f}")
except Exception as exc:
logger.error(f"Error in 开评定标流程: {exc}")
# 返回默认值,假设默认值为一个空字典
result = {"开评定标流程": {}}
return result
def post_process_baseinfo(base_info,logger):
"""
'base_info' 任务完成后执行的函数。
确保在缺少某些键时,返回 good_list=[]。
参数:
- base_info (dict): 原始的 base_info 数据。
返回:
- tuple: (处理后的 base_info, good_list)
"""
try:
pure_base_info = base_info.get("基础信息", {})
# 提取 '货物列表' 并删除
procurement_reqs = pure_base_info.get('采购要求', {})
technical_requirements = procurement_reqs.get('采购需求', {})
good_list = technical_requirements.pop('货物列表', []) # 如果不存在,返回 []
# 更新基础信息(删除 '货物列表' 后的结果)
base_info['基础信息'] = pure_base_info
return base_info, good_list
except Exception as e:
logger.error(f"Error in post_process_baseinfo: {e}")
return base_info, [] # 返回空列表
def goods_bid_main(output_folder, file_path, file_type, unique_id):
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_file_main(output_folder, file_path, file_type,logger)
if not processed_data:
error_response = {
'error': '文件预处理失败。请检查文件类型并重试。'
}
yield json.dumps(error_response, ensure_ascii=False)
return # 停止进一步处理
with ThreadPoolExecutor() as executor: #开启子进程,能保证运行结束后回收资源消耗
# 立即启动不依赖 knowledge_name 和 index 的任务
futures = {
# 'evaluation_standards': executor.submit(fetch_evaluation_standards,processed_data['invalid_deleted_docx'], #技术评分 商务评分
# processed_data['evaluation_method_path'],logger),
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_added_docx'], #无效标与废标项
output_folder,logger),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path'],
processed_data['clause_path'],logger), #投标文件要求
'opening_bid': executor.submit(fetch_bid_opening, processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path'],
processed_data['clause_path'],logger), #开评定标流程
# 'base_info': executor.submit(fetch_project_basic_info, processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path'], #基础信息
# processed_data['procurement_path'],processed_data['clause_path'],logger),
#
# 'qualification_review': executor.submit(fetch_qualification_review, processed_data['invalid_deleted_docx'], #资格审查
# processed_data['qualification_path'],
# processed_data['notice_path'],logger),
}
collected_good_list = []
# 提前处理这些不依赖的任务,按完成顺序返回
for future in as_completed(futures.values()): #as_completed哪个先运行结束就先返回
key = next(k for k, v in futures.items() if v == future)
try:
result = future.result()
if key == 'base_info':
base_info, good_list = result
collected_good_list = good_list or [] # Store good_list for later use
yield json.dumps({'base_info': transform_json_values(base_info)}, ensure_ascii=False)
# 如果是 evaluation_standards拆分技术标和商务标
elif key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别返回技术标和商务标
yield json.dumps({'technical_standards': transform_json_values(technical_standards)},ensure_ascii=False) #技术评分
yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)},ensure_ascii=False) #商务评分
else:
# 处理其他任务的结果
yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False)
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
if key == 'evaluation_standards':
# 返回默认的商务评分和技术评分
default_evaluation = {
'technical_standards': {"技术评分": ""},
'commercial_standards': {"商务评分": ""}
}
yield json.dumps(default_evaluation, ensure_ascii=False)
# yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False)
# 无论如何,都返回 good_list 信息,哪怕是空列表
yield json.dumps({'good_list': transform_json_values(collected_good_list)}, ensure_ascii=False)
#TODO:小解析考虑提速1直接pdf转文本再切分。后期考虑。
#TODO:
# 解决禅道 测试的bu
# 货物标和工程标的资格审查整合
##TODO:陕西省公安厅交通警察总队高速公路交通安全智能感知巡查系统项目(1)_tobidders_notice_part2.pdf 唐山市公安交通警察支队机动车查验机构视频存储回放系统竞争性谈判-招标文件正文(1)_tobidders_notice_part1.pdf 不好搞
# 无法判断用户上传的是否为乱码文件,可以考虑并行调用大模型,如果为乱码文件直接return None
# 国道107 在提取成json文件时有'湖北众恒永业工程项目管理有限公司广水分公司编'干扰,尝试清除
if __name__ == "__main__":
# 配置日志器
unique_id = "uuidzyzy11"
logger = get_global_logger(unique_id)
output_folder = r"C:\Users\Administrator\Desktop\货物标\little_parse"
file_type = 2 # 1:docx 2:pdf 3:其他
input_file = r"C:\Users\Administrator\Desktop\货物标\little_parse\ztbfile.pdf"
start_time = time.time()
preprocess_files(output_folder, input_file, file_type, logger)
# # 创建生成器
# generator = goods_bid_main(output_folder, input_file, file_type, unique_id)
# # 迭代生成器,逐步获取和处理结果
# for output in generator:
# print(output)
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time:.2f} seconds.")