# -*- encoding:utf-8 -*- import json import logging import time from concurrent.futures import ThreadPoolExecutor from flask_app.main.截取pdf import truncate_pdf_multiple from flask_app.main.table_content_extraction import extract_tables_main from flask_app.main.知识库操作 import addfileToKnowledge, deleteKnowledge from flask_app.main.投标人须知正文条款提取成json文件 import convert_clause_to_json from flask_app.main.json_utils import transform_json_values, combine_json_results from flask_app.main.无效标和废标和禁止投标整合 import combine_find_invalid from flask_app.main.投标人须知正文提取指定内容 import extract_from_notice import concurrent.futures from flask_app.main.基础信息整合 import combine_basic_info from flask_app.main.资格审查模块 import combine_review_standards from flask_app.main.商务标技术标整合 import combine_evaluation_standards from flask_app.main.format_change import pdf2docx, docx2pdf from flask_app.main.docx截取docx import copy_docx def get_global_logger(unique_id): if unique_id is None: return logging.getLogger() # 获取默认的日志器 logger = logging.getLogger(unique_id) return logger logger=None # 创建全局线程池 executor = ThreadPoolExecutor() def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id): logger.info("starting 文件预处理...") logger.info("output_folder..." + output_folder) # 根据文件类型处理文件路径 if file_type == 1: # docx docx_path = downloaded_file_path pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理 elif file_type == 2: # pdf pdf_path = downloaded_file_path docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库 else: logger.error("Unsupported file type provided. Preprocessing halted.") return None # 异步上传知识库 future_knowledge = executor.submit(addfileToKnowledge, docx_path, "招标解析" + unique_id) # 调用截取PDF多次 truncate_files = truncate_pdf_multiple(pdf_path, output_folder) # 处理各个部分 truncate0_docpath = pdf2docx(truncate_files[0]) # 投标人须知前附表转docx invalid_docpath = copy_docx(docx_path) # docx截取无效标部分 truncate_jsonpath = extract_tables_main(truncate0_docpath, output_folder) # 投标人须知前附表docx->json truncate0 = truncate_files[0] truncate1 = truncate_files[1] truncate3 = truncate_files[3] clause_path = convert_clause_to_json(truncate_files[2], output_folder) # 投标人须知正文条款pdf->json logger.info("文件预处理done") # 提前返回,不等待 future_knowledge 完成,返回包含 Future 对象 return { 'file_path': downloaded_file_path, 'output_folder': output_folder, 'truncate0': truncate0, 'truncate1': truncate1, 'truncate3': truncate3, 'knowledge_future': future_knowledge, # 返回 Future 对象 'truncate0_jsonpath': truncate_jsonpath, 'clause_path': clause_path, 'invalid_docpath': invalid_docpath } def post_processing(data,includes): # 初始化结果字典,预设'其他'分类为空字典 result = {"其他": {}} # 遍历原始字典的每一个键值对 for key, value in data.items(): if key in includes: # 如果键在includes列表中,直接保留这个键值对 result[key] = value else: # 如果键不在includes列表中,将这个键值对加入到'其他'分类中 result["其他"][key] = value # 如果'其他'分类没有任何内容,可以选择删除这个键 if not result["其他"]: del result["其他"] return result # 基本信息 def fetch_project_basic_info(knowledge_name, truncate0, output_folder, clause_path): # 投标人须知前附表 logger.info("starting基础信息...") basic_res = combine_basic_info(knowledge_name, truncate0, output_folder, clause_path) logger.info("基础信息done") return basic_res # 形式、响应、资格评审 def fetch_qualification_review(truncate1, truncate3, knowledge_name, truncate0_jsonpath, clause_path,input_file,output_folder): logger.info("starting资格审查...") review_standards_res = combine_review_standards(truncate1, truncate3, knowledge_name, truncate0_jsonpath, clause_path,input_file,output_folder) logger.info("资格审查done") return review_standards_res # 评分细则 流式 def fetch_evaluation_standards(truncate1): # 评标办法前附表 logger.info("starting 商务标和技术标...") # 获取评标办法前附表的字典结果 evaluation_standards_res = combine_evaluation_standards(truncate1) # 获取技术标和商务标 technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})} commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})} logger.info("商务标和技术标 done") # 返回将 "技术标" 和 "商务标" 包含在新的键中 return { "technical_standards": technical_standards, "commercial_standards": commercial_standards } # def fetch_evaluation_standards(truncate1): # 评标办法前附表 # logger.info("starting商务标技术标...") # evaluation_standards_res = combine_evaluation_standards(truncate1) # logger.info("商务标技术标done") # return evaluation_standards_res # 无效、废标项解析 def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3): # 废标项要求:千问 logger.info("starting无效标与废标...") find_invalid_res = combine_find_invalid(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3) logger.info("无效标与废标done...") return find_invalid_res # 投标文件要求 def fetch_bidding_documents_requirements(clause_path): logger.info("starting投标文件要求...") fetch_bidding_documents_requirements_json = extract_from_notice(clause_path, 1) logger.info("投标文件要求done...") return {"投标文件要求":fetch_bidding_documents_requirements_json} # 开评定标流程 def fetch_bid_opening(clause_path): logger.info("starting开评定标流程...") fetch_bid_opening_json = extract_from_notice(clause_path, 2) logger.info("开评定标流程done...") return {"开评定标流程":fetch_bid_opening_json} # def main_processing(output_folder, downloaded_file_path, file_type, unique_id): # file_type=1->docx file_type=2->pdf # global logger # logger = get_global_logger(unique_id) # # Preprocess files and get necessary data paths and knowledge index # processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id) # if not processed_data: # return "" # # with concurrent.futures.ThreadPoolExecutor() as executor: # # Submit all tasks to the executor # futures = { # 'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'], # processed_data['truncate0'], output_folder, # processed_data['clause_path']), # 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], # processed_data['truncate3'], # processed_data['knowledge_name'], processed_data['truncate0_jsonpath'], # processed_data['clause_path'],processed_data['file_path'],processed_data['output_folder']), # 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), # 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'], # output_folder, processed_data['truncate0_jsonpath'], # processed_data['clause_path'], processed_data['truncate3']), # 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, # processed_data['clause_path']), # 'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path']) # } # # comprehensive_responses = [] # # Collect results in the defined order # for key in ['base_info', 'qualification_review', 'evaluation_standards', 'invalid_requirements', # 'bidding_documents_requirements', 'opening_bid']: # try: # # Wait for the future to complete and get the result # result = futures[key].result() # comprehensive_responses.append(result) # except Exception as exc: # logger.error(f"Error processing {key}: {exc}") # # 合并 JSON 结果 # combined_final_result = combine_json_results(comprehensive_responses) # includes = ["基础信息", "资格审查", "商务标", "技术标", "无效标与废标项", "投标文件要求", "开评定标流程"] # result = post_processing(combined_final_result, includes) # modified_json = transform_json_values(result) # # final_result_path = os.path.join(output_folder, "final_result.json") # with open(final_result_path, 'w', encoding='utf-8') as file: # json.dump(modified_json, file, ensure_ascii=False, indent=2) # logger.info("final_result.json has been saved") # deleteKnowledge(processed_data['knowledge_index']) # return final_result_path #分段返回 def engineering_bid_main(output_folder, downloaded_file_path, file_type, unique_id): global logger logger = get_global_logger(unique_id) # 预处理文件,获取处理后的数据 processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id) if not processed_data: yield json.dumps({}) # 如果处理数据失败,返回空的 JSON with concurrent.futures.ThreadPoolExecutor() as executor: # 立即启动不依赖 knowledge_name 和 index 的任务 futures = { 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'], output_folder, processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['truncate3']), 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, processed_data['clause_path']), 'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path']) } # 提前处理这些不依赖的任务,按完成顺序返回 for future in concurrent.futures.as_completed(futures.values()): key = next(k for k, v in futures.items() if v == future) try: result = future.result() # 如果是 evaluation_standards,拆分技术标和商务标 if key == 'evaluation_standards': technical_standards = result["technical_standards"] commercial_standards = result["commercial_standards"] # 分别返回技术标和商务标 yield json.dumps({'technical_standards': transform_json_values(technical_standards)}, ensure_ascii=False) yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)}, ensure_ascii=False) else: # 处理其他任务的结果 yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False) except Exception as exc: logger.error(f"Error processing {key}: {exc}") yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) # 只有在需要 knowledge_name 和 index 时才等待 future_knowledge 完成 try: knowledge_name = "招标解析" + unique_id index = processed_data['knowledge_future'].result() # 阻塞等待知识库上传任务完成 # 提交依赖 knowledge_name 和 index 的任务 future_dependencies = { 'base_info': executor.submit(fetch_project_basic_info, knowledge_name, processed_data['truncate0'], output_folder, processed_data['clause_path']), 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], processed_data['truncate3'], knowledge_name, processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['file_path'], processed_data['output_folder']), } # 按完成顺序返回依赖任务的结果 for future in concurrent.futures.as_completed(future_dependencies.values()): key = next(k for k, v in future_dependencies.items() if v == future) try: result = future.result() yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False) except Exception as exc: logger.error(f"Error processing {key}: {exc}") yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) except Exception as e: logger.error(f"Error uploading to knowledge base: {e}") yield json.dumps({'error': f'Knowledge upload failed: {str(e)}'}, ensure_ascii=False) # 删除知识索引 deleteKnowledge(index) if __name__ == "__main__": output_folder = "flask_app/static/output/zytest1" # truncate0 = os.path.join(output_folder, "ztb_tobidders_notice_table.pdf") # truncate1=os.path.join(output_folder,"ztb_evaluation_method.pdf") # clause_path = convert_clause_to_json(truncate1, output_folder) # truncate0_jsonpath = os.path.join(output_folder, "truncate_output3.json") start_time = time.time() file_type = 1 #1:docx 2:pdf 3:其他 input_file = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest11.pdf" # file_path = main_processing(output_folder, input_file, file_type, "uuidzyzy11") preprocess_files(output_folder, input_file, file_type, "zytest1") end_time = time.time() elapsed_time = end_time - start_time # 计算耗时 print(f"Function execution took {elapsed_time} seconds.")