zbparse/flask_app/old_version/解析old_old.py
2025-02-07 15:27:24 +08:00

222 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from flask_app.工程标.截取pdf工程标版 import truncate_pdf_multiple
from flask_app.general.table_content_extraction import extract_tables_main
from flask_app.old_version.提取json工程标版_old import convert_clause_to_json
from flask_app.general.json_utils import transform_json_values
from flask_app.工程标.无效标和废标和禁止投标整合 import combine_find_invalid
from flask_app.工程标.投标人须知正文提取指定内容工程标 import extract_from_notice
import concurrent.futures
from flask_app.工程标.基础信息整合工程标 import combine_basic_info
from flask_app.工程标.资格审查模块main import combine_review_standards
from flask_app.old_version.商务评分技术评分整合old_version import combine_evaluation_standards
from flask_app.general.format_change import pdf2docx, docx2pdf,doc2docx
from flask_app.general.docx截取docx import copy_docx
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
logger=None
# 创建全局线程池
executor = ThreadPoolExecutor()
def preprocess_files(output_folder, downloaded_file_path, file_type,unique_id):
logger.info("starting 文件预处理...")
logger.info("output_folder..." + output_folder)
# 根据文件类型处理文件路径
if file_type == 1: # docx
docx_path = downloaded_file_path
pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = downloaded_file_path
docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库
elif file_type == 3: #doc
pdf_path=docx2pdf(downloaded_file_path)
docx_path=doc2docx(downloaded_file_path)
else:
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
# 调用截取PDF多次
truncate_files = truncate_pdf_multiple(pdf_path, output_folder,unique_id)
# 处理各个部分
truncate0_docpath = pdf2docx(truncate_files[0]) # 投标人须知前附表转docx
invalid_path=truncate_files[5]
invalid_docpath = copy_docx(docx_path) # docx截取无效标部分
# invalid_docpath=pdf2docx(invalid_path)
truncate_jsonpath = extract_tables_main(truncate0_docpath, output_folder) # 投标人须知前附表docx->json
truncate0 = truncate_files[0]
truncate1 = truncate_files[1]
truncate2=truncate_files[2]
truncate3 = truncate_files[3]
merged_baseinfo_path=truncate_files[-1]
clause_path = convert_clause_to_json(truncate_files[2], output_folder) # 投标人须知正文条款pdf->json
logger.info("文件预处理done")
# 返回包含预处理后文件路径的字典
return {
'file_path': downloaded_file_path,
'output_folder': output_folder,
'invalid_path':invalid_path,
'truncate0': truncate0,
'truncate1': truncate1,
'truncate2':truncate2,
'truncate3': truncate3,
'truncate0_jsonpath': truncate_jsonpath,
'merged_baseinfo_path':merged_baseinfo_path,
'clause_path': clause_path,
'invalid_docpath': invalid_docpath
}
def post_processing(data,includes):
# 初始化结果字典,预设'其他'分类为空字典
result = {"其他": {}}
# 遍历原始字典的每一个键值对
for key, value in data.items():
if key in includes:
# 如果键在includes列表中直接保留这个键值对
result[key] = value
else:
# 如果键不在includes列表中将这个键值对加入到'其他'分类中
result["其他"][key] = value
# 如果'其他'分类没有任何内容,可以选择删除这个键
if not result["其他"]:
del result["其他"]
return result
# 基本信息
def fetch_project_basic_info(merged_baseinfo_path, truncate0, truncate2, clause_path): # 投标人须知前附表
logger.info("starting基础信息...")
basic_res = combine_basic_info(merged_baseinfo_path, truncate0, truncate2, clause_path)
logger.info("基础信息done")
return basic_res
# 形式、响应、资格评审
def fetch_qualification_review(truncate1, truncate3,output_folder, truncate0_jsonpath, clause_path,invalid_path,merged_baseinfo_path):
logger.info("starting资格审查...")
review_standards_res = combine_review_standards(truncate1, truncate3,output_folder, truncate0_jsonpath,
clause_path,invalid_path,merged_baseinfo_path)
logger.info("资格审查done")
return review_standards_res
# 评分细则 流式
def fetch_evaluation_standards(truncate1): # 评标办法前附表
logger.info("starting 商务标和技术标...")
# 获取评标办法前附表的字典结果
evaluation_standards_res = combine_evaluation_standards(truncate1)
# 获取技术标和商务标
technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})}
commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})}
logger.info("商务标和技术标 done")
# 返回将 "技术标" 和 "商务标" 包含在新的键中
return {
"technical_standards": technical_standards,
"commercial_standards": commercial_standards
}
# 无效、废标项解析
def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3):
# 废标项要求:千问
logger.info("starting无效标与废标...")
find_invalid_res = combine_find_invalid(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3)
logger.info("无效标与废标done...")
return find_invalid_res
# 投标文件要求
def fetch_bidding_documents_requirements(clause_path):
logger.info("starting投标文件要求...")
fetch_bidding_documents_requirements_json = extract_from_notice(clause_path, 1)
logger.info("投标文件要求done...")
return {"投标文件要求":fetch_bidding_documents_requirements_json}
# 开评定标流程
def fetch_bid_opening(clause_path):
logger.info("starting开评定标流程...")
fetch_bid_opening_json = extract_from_notice(clause_path, 2)
logger.info("开评定标流程done...")
return {"开评定标流程":fetch_bid_opening_json}
#分段返回
def engineering_bid_main(output_folder, downloaded_file_path, file_type, unique_id):
global logger
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_files(output_folder, downloaded_file_path, file_type)
if not processed_data:
yield json.dumps({}) # 如果处理数据失败,返回空的 JSON
with concurrent.futures.ThreadPoolExecutor() as executor:
# 立即启动不依赖 knowledge_name 和 index 的任务
futures = {
'base_info': executor.submit(fetch_project_basic_info, processed_data['merged_baseinfo_path'],
processed_data['truncate0'],
processed_data['truncate2'], processed_data['clause_path']),
'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'],
processed_data['truncate3'], output_folder,
processed_data['truncate0_jsonpath'],
processed_data['clause_path'], processed_data['invalid_path'],
processed_data['merged_baseinfo_path']),
'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']),
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'],
output_folder, processed_data['truncate0_jsonpath'],
processed_data['clause_path'], processed_data['truncate3']),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, processed_data['clause_path']),
'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path'])
}
# 提前处理这些不依赖的任务,按完成顺序返回
for future in concurrent.futures.as_completed(futures.values()):
key = next(k for k, v in futures.items() if v == future)
try:
result = future.result()
# 如果是 evaluation_standards拆分技术标和商务标
if key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别返回技术标和商务标
yield json.dumps({'technical_standards': transform_json_values(technical_standards)}, ensure_ascii=False)
yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)}, ensure_ascii=False)
else:
# 处理其他任务的结果
yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False)
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False)
if __name__ == "__main__":
start_time = time.time()
output_folder = "C:\\Users\\Administrator\\Desktop\\招标文件\\new_test1"
file_type = 2 #1:docx 2:pdf 3:其他
input_file = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest2.pdf"
print("yes")
for output in engineering_bid_main(output_folder, input_file, file_type, "zytest"):
print(output)
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time} seconds.")