zbparse/flask_app/main/招标文件解析.py
2024-09-25 10:45:32 +08:00

293 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import logging
import os
import time
from flask_app.main.截取pdf import truncate_pdf_multiple
from flask_app.main.table_content_extraction import extract_tables_main
from flask_app.main.知识库操作 import addfileToKnowledge, deleteKnowledge
from flask_app.main.投标人须知正文条款提取成json文件 import convert_clause_to_json
from flask_app.main.json_utils import nest_json_under_key, transform_json_values, combine_json_results
from flask_app.main.无效标和废标和禁止投标整合 import combine_find_invalid
from flask_app.main.投标人须知正文提取指定内容 import extract_from_notice
import concurrent.futures
from flask_app.main.基础信息整合 import project_basic_info
from flask_app.main.资格审查模块 import combine_review_standards
from flask_app.main.商务标技术标整合 import combine_evaluation_standards
from flask_app.main.format_change import pdf2docx, docx2pdf
from flask_app.main.docx截取docx import copy_docx
def get_global_logger(unique_id):
if unique_id is None:
return logging.getLogger() # 获取默认的日志器
logger = logging.getLogger(unique_id)
return logger
logger=None
# 可能有问题pdf转docx导致打勾符号消失
def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id):
logger.info("starting 文件预处理...")
logger.info("output_folder..."+output_folder)
# 根据文件类型处理文件路径
if file_type == 1: # docx
docx_path = downloaded_file_path
pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理
elif file_type == 2: # pdf
pdf_path = downloaded_file_path
docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库
else:
# 如果文件类型不是预期中的1或2记录错误并返回None
logger.error("Unsupported file type provided. Preprocessing halted.")
return None
# 上传知识库
knowledge_name = "招标解析" + unique_id
index = addfileToKnowledge(docx_path, knowledge_name)
# 调用截取PDF多次
truncate_files = truncate_pdf_multiple(pdf_path, output_folder) # [前附表, 评标办法, 须知正文, 资格审查条件]
# 处理各个部分
truncate0_docpath = pdf2docx(truncate_files[0]) # 投标人须知前附表转docx
invalid_docpath = copy_docx(docx_path) #docx截取无效标部分
truncate_jsonpath=extract_tables_main(truncate0_docpath, output_folder) # 投标人须知前附表docx->json,从表格提取数据
truncate0 = truncate_files[0] #投标人须知前附表
truncate1 = truncate_files[1] #评标办法前附表
truncate3 = truncate_files[3] #资格审查表
clause_path = convert_clause_to_json(truncate_files[2], output_folder) # 投标人须知正文条款pdf->json
logger.info("文件预处理done")
return {
'input_file_path':downloaded_file_path,
'output_folder':output_folder,
'truncate0': truncate0,
'truncate1': truncate1,
'truncate3': truncate3,
'knowledge_index': index,
'knowledge_name': knowledge_name,
'truncate0_jsonpath': truncate_jsonpath,
'clause_path': clause_path,
'invalid_docpath': invalid_docpath
}
def post_processing(data,includes):
# 初始化结果字典,预设'其他'分类为空字典
result = {"其他": {}}
# 遍历原始字典的每一个键值对
for key, value in data.items():
if key in includes:
# 如果键在includes列表中直接保留这个键值对
result[key] = value
else:
# 如果键不在includes列表中将这个键值对加入到'其他'分类中
result["其他"][key] = value
# 如果'其他'分类没有任何内容,可以选择删除这个键
if not result["其他"]:
del result["其他"]
return result
# 基本信息
def fetch_project_basic_info(knowledge_name, truncate0, output_folder, clause_path): # 投标人须知前附表
logger.info("starting基础信息...")
basic_res = project_basic_info(knowledge_name, truncate0, output_folder, clause_path)
logger.info("基础信息done")
return basic_res
# 形式、响应、资格评审
def fetch_qualification_review(truncate1, truncate3, knowledge_name, truncate0_jsonpath, clause_path,input_file,output_folder):
logger.info("starting资格审查...")
review_standards_res = combine_review_standards(truncate1, truncate3, knowledge_name, truncate0_jsonpath,
clause_path,input_file,output_folder)
logger.info("资格审查done")
return review_standards_res
# 评分细则 流式
def fetch_evaluation_standards(truncate1): # 评标办法前附表
logger.info("starting 商务标和技术标...")
# 获取评标办法前附表的字典结果
evaluation_standards_res = combine_evaluation_standards(truncate1)
# 从结果中提取"商务标"和"技术标"
technical_standards = evaluation_standards_res.get("技术标", {})
commercial_standards = evaluation_standards_res.get("商务标", {})
logger.info("商务标和技术标 done")
# 返回技术标和商务标
return {"technical_standards": technical_standards, "commercial_standards": commercial_standards}
# def fetch_evaluation_standards(truncate1): # 评标办法前附表
# logger.info("starting商务标技术标...")
# evaluation_standards_res = combine_evaluation_standards(truncate1)
# logger.info("商务标技术标done")
# return evaluation_standards_res
# 无效、废标项解析
def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3):
# 废标项要求:千问
logger.info("starting无效标与废标...")
find_invalid_res = combine_find_invalid(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3)
logger.info("无效标与废标done...")
return find_invalid_res
# 投标文件要求
def fetch_bidding_documents_requirements(clause_path):
logger.info("starting投标文件要求...")
fetch_bidding_documents_requirements_json = extract_from_notice(clause_path, 1)
qualify_nested_res = nest_json_under_key(fetch_bidding_documents_requirements_json, "投标文件要求")
logger.info("投标文件要求done...")
return qualify_nested_res
# 开评定标流程
def fetch_bid_opening(clause_path):
logger.info("starting开评定标流程...")
fetch_bid_opening_json = extract_from_notice(clause_path, 2)
qualify_nested_res = nest_json_under_key(fetch_bid_opening_json, "开评定标流程")
logger.info("开评定标流程done...")
return qualify_nested_res
# def main_processing(output_folder, downloaded_file_path, file_type, unique_id): # file_type=1->docx file_type=2->pdf
# global logger
# logger = get_global_logger(unique_id)
# # Preprocess files and get necessary data paths and knowledge index
# processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id)
# if not processed_data:
# return ""
#
# with concurrent.futures.ThreadPoolExecutor() as executor:
# # Submit all tasks to the executor
# futures = {
# 'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'],
# processed_data['truncate0'], output_folder,
# processed_data['clause_path']),
# 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'],
# processed_data['truncate3'],
# processed_data['knowledge_name'], processed_data['truncate0_jsonpath'],
# processed_data['clause_path'],processed_data['input_file_path'],processed_data['output_folder']),
# 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']),
# 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'],
# output_folder, processed_data['truncate0_jsonpath'],
# processed_data['clause_path'], processed_data['truncate3']),
# 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,
# processed_data['clause_path']),
# 'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path'])
# }
#
# comprehensive_responses = []
# # Collect results in the defined order
# for key in ['base_info', 'qualification_review', 'evaluation_standards', 'invalid_requirements',
# 'bidding_documents_requirements', 'opening_bid']:
# try:
# # Wait for the future to complete and get the result
# result = futures[key].result()
# comprehensive_responses.append(result)
# except Exception as exc:
# logger.error(f"Error processing {key}: {exc}")
# # 合并 JSON 结果
# combined_final_result = combine_json_results(comprehensive_responses)
# includes = ["基础信息", "资格审查", "商务标", "技术标", "无效标与废标项", "投标文件要求", "开评定标流程"]
# result = post_processing(combined_final_result, includes)
# modified_json = transform_json_values(result)
#
# final_result_path = os.path.join(output_folder, "final_result.json")
# with open(final_result_path, 'w', encoding='utf-8') as file:
# json.dump(modified_json, file, ensure_ascii=False, indent=2)
# logger.info("final_result.json has been saved")
# deleteKnowledge(processed_data['knowledge_index'])
# return final_result_path
#分段返回
def main_processing(output_folder, downloaded_file_path, file_type, unique_id):
global logger
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id)
if not processed_data:
yield "data: {}\n\n" # 如果处理数据失败,返回空
with concurrent.futures.ThreadPoolExecutor() as executor:
# 创建任务字典
futures = {
'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'],
processed_data['truncate0'], output_folder,
processed_data['clause_path']),
'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'],
processed_data['truncate3'],
processed_data['knowledge_name'], processed_data['truncate0_jsonpath'],
processed_data['clause_path'], processed_data['input_file_path'],
processed_data['output_folder']),
'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), # 并行获取技术标和商务标
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'],
output_folder, processed_data['truncate0_jsonpath'],
processed_data['clause_path'], processed_data['truncate3']),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,
processed_data['clause_path']),
'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path'])
}
# 每提交一个任务暂停1秒确保任务逐个提交
for task_name, future in futures.items():
time.sleep(1) # 可以保留这个暂停
# 按照任务完成的顺序返回结果
for future in concurrent.futures.as_completed(futures.values()):
key = next(k for k, v in futures.items() if v == future)
try:
# 获取任务执行结果
result = future.result()
# 处理 evaluation_standards 返回的技术标和商务标分别作为两个结果
if key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别发送技术标和商务标
modified_technical_result = transform_json_values({"technical_standards": technical_standards})
modified_commercial_result = transform_json_values({"commercial_standards": commercial_standards})
yield f"data: {json.dumps(modified_technical_result, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps(modified_commercial_result, ensure_ascii=False)}\n\n"
else:
# 对非 evaluation_standards 的结果进行 JSON 转换和修改
modified_result = transform_json_values({key: result})
yield f"data: {json.dumps(modified_result, ensure_ascii=False)}\n\n"
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
# 返回错误信息
yield f"data: {json.dumps({'error': f'Error processing {key}: {str(exc)}'})}\n\n"
# 删除知识索引
deleteKnowledge(processed_data['knowledge_index'])
#TODO:流式输出要改商务标技术标整合
if __name__ == "__main__":
output_folder = "flask_app/static/output/zytest1"
# truncate0 = os.path.join(output_folder, "ztb_tobidders_notice_table.pdf")
# truncate1=os.path.join(output_folder,"ztb_evaluation_method.pdf")
# clause_path = convert_clause_to_json(truncate1, output_folder)
# truncate0_jsonpath = os.path.join(output_folder, "truncate_output3.json")
start_time = time.time()
file_type = 1 #1:docx 2:pdf 3:其他
input_file = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest11.pdf"
# file_path = main_processing(output_folder, input_file, file_type, "uuidzyzy11")
preprocess_files(output_folder, input_file, file_type, "zytest1")
end_time = time.time()
elapsed_time = end_time - start_time # 计算耗时
print(f"Function execution took {elapsed_time} seconds.")