# -*- encoding:utf-8 -*- import json import multiprocessing import os import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed from docx import Document from flask_app.general.insert_del_pagemark import insert_mark, delete_mark from flask_app.general.截取pdf_main import truncate_pdf_multiple from flask_app.general.merge_pdfs import merge_pdfs from flask_app.general.读取文件.clean_pdf import is_pure_image from flask_app.general.通用功能函数 import get_global_logger from flask_app.general.投标人须知正文条款提取成json文件 import convert_clause_to_json from flask_app.general.json_utils import transform_json_values from flask_app.general.无效标和废标公共代码 import combine_find_invalid from flask_app.general.投标人须知正文提取指定内容 import extract_from_notice import concurrent.futures from flask_app.工程标.基础信息整合工程标 import combine_basic_info from flask_app.工程标.资格审查模块main import combine_review_standards from flask_app.general.商务技术评分提取 import combine_evaluation_standards from flask_app.general.format_change import pdf2docx, docx2pdf def engineering_bid_main_process(output_folder, file_path, file_type, logger): # 这里是你原本处理请求的地方 with multiprocessing.Pool(processes=1) as pool: result = pool.apply( preprocess_files, # 你的实际执行函数 args=(output_folder, file_path, file_type, logger) ) return result def preprocess_files(output_folder, file_path, file_type, logger): logger.info("starting 文件预处理...") start_time = time.time() is_pure_image_flag = False pdf_path = "" # 根据文件类型处理文件路径 if file_type == 1: # docx # docx_path = file_path if is_pure_image(file_path): is_pure_image_flag = True else: pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理 elif file_type == 2: # pdf pdf_path = file_path # docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库 elif file_type == 3: # doc # docx_path=doc2docx(downloaded_file_path) pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理 else: logger.error("Unsupported file type provided. Preprocessing halted.") return None if not is_pure_image_flag: # 大多数情况 不是纯图片doc/docx # 调用截取PDF多次 truncate_files = truncate_pdf_multiple(pdf_path, output_folder, logger, 'engineering') else: truncate_files = ['', '', '', '', '', file_path, ''] # print("切割出的文件:"+str(truncate_files)) # 处理各个部分 notice_path = truncate_files[0] # 招标公告 evaluation_method = truncate_files[1] # 评标方法 qualification = truncate_files[2] # 资格审查 tobidders_notice_table = truncate_files[3] # 投标人须知前附表 tobidders_notice = truncate_files[4] # 投标人须知正文 invalid_path = truncate_files[5] if truncate_files[5] != "" else pdf_path # 无效标 merged_baseinfo_path = truncate_files[-1] more_path = [merged_baseinfo_path, tobidders_notice] merged_baseinfo_path_more = os.path.join(output_folder, "merged_baseinfo_path_more.pdf") merged_baseinfo_path_more = merge_pdfs(more_path, merged_baseinfo_path_more) clause_path = convert_clause_to_json(tobidders_notice, output_folder) # 投标人须知正文条款pdf->json # invalid_docpath = copy_docx(docx_path) # docx截取无效标部分 # invalid_docpath=pdf2docx(invalid_path) truncate_endtime = time.time() if not is_pure_image_flag: invalid_added_pdf = insert_mark(invalid_path) logger.info(f"文件切分CPU耗时:{truncate_endtime - start_time:.2f} 秒") invalid_added_docx = pdf2docx(invalid_added_pdf) # 有标记的invalid_path try: # 尝试加载 .docx 文件 doc = Document(invalid_added_docx) print("yes") except Exception as e: # 捕获异常并打印错误信息 invalid_added_docx = pdf2docx(invalid_path) invalid_deleted_docx = delete_mark(invalid_added_docx) # 无标记的invalid_path if not invalid_deleted_docx: invalid_deleted_docx = pdf2docx(invalid_path) else: invalid_deleted_docx = file_path invalid_added_docx = '' try: # 尝试加载 .docx 文件 doc = Document(invalid_deleted_docx) except Exception as e: invalid_deleted_docx = pdf_path if pdf_path != "" else file_path # 文档转换还是失败!最后的手段保证文件可用! end_time = time.time() logger.info(f"文件预处理总耗时:{end_time - start_time:.2f} 秒") # 返回包含预处理后文件路径的字典 return { 'invalid_deleted_docx': invalid_deleted_docx, 'invalid_added_docx': invalid_added_docx, 'notice_path': notice_path, 'tobidders_notice_table': tobidders_notice_table, 'evaluation_method': evaluation_method, 'qualification': qualification, 'merged_baseinfo_path': merged_baseinfo_path, 'merged_baseinfo_path_more': merged_baseinfo_path_more, 'clause_path': clause_path } # 基本信息 def fetch_project_basic_info(invalid_deleted_docx, merged_baseinfo_path, merged_baseinfo_path_more, clause_path, logger): logger.info("starting 基础信息...") start_time = time.time() try: if not merged_baseinfo_path: merged_baseinfo_path = invalid_deleted_docx if not merged_baseinfo_path_more: merged_baseinfo_path_more = invalid_deleted_docx basic_res = combine_basic_info(merged_baseinfo_path, merged_baseinfo_path_more, clause_path, invalid_deleted_docx) result = basic_res end_time = time.time() logger.info(f"基础信息 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 基础信息: {exc}") # 返回默认值 result = {"基础信息": {}} return result def fetch_qualification_review(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_deleted_docx, merged_baseinfo_path, notice_path, logger): logger.info("starting 资格审查...") start_time = time.time() try: if not notice_path: notice_path = invalid_deleted_docx if not evaluation_method: evaluation_method = invalid_deleted_docx if not merged_baseinfo_path: merged_baseinfo_path = invalid_deleted_docx review_standards_res = combine_review_standards( evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_deleted_docx, merged_baseinfo_path, notice_path) result = review_standards_res end_time = time.time() logger.info(f"资格审查 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 资格审查: {exc}") # 返回默认值 result = {"资格审查": {}} return result # 评分细则 流式 def fetch_evaluation_standards(invalid_deleted_docx, evaluation_method, logger): logger.info("starting 商务标和技术标...") start_time = time.time() if not evaluation_method: evaluation_method = invalid_deleted_docx evaluation_standards_res = combine_evaluation_standards(evaluation_method, invalid_deleted_docx, 1) technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})} commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})} end_time = time.time() logger.info(f"商务标和技术标 done,耗时:{end_time - start_time:.2f} 秒") return { "technical_standards": technical_standards, "commercial_standards": commercial_standards } # 无效、废标项解析 def fetch_invalid_requirements(invalid_added_docx, output_folder, logger): logger.info("starting 无效标与废标...") start_time = time.time() try: find_invalid_res = combine_find_invalid(invalid_added_docx, output_folder) result = find_invalid_res end_time = time.time() logger.info(f"无效标与废标 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 无效标与废标: {exc}") # 返回默认值 result = {"无效标与废标": {}} return result # 投标文件要求 def fetch_bidding_documents_requirements(invalid_deleted_docx, merged_baseinfo_path_more, clause_path, logger): logger.info("starting 投标文件要求...") start_time = time.time() try: if not merged_baseinfo_path_more: merged_baseinfo_path_more = invalid_deleted_docx selection = 1 fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection) result = {"投标文件要求": fetch_bidding_documents_requirements_json} end_time = time.time() logger.info(f"投标文件要求 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 投标文件要求: {exc}") # 返回默认值 result = {"投标文件要求": {}} return result # 开评定标流程 def fetch_bid_opening(invalid_deleted_docx, merged_baseinfo_path_more, clause_path, logger): logger.info("starting 开评定标流程...") start_time = time.time() try: if not merged_baseinfo_path_more: merged_baseinfo_path_more = invalid_deleted_docx selection = 2 fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection) result = {"开评定标流程": fetch_bid_opening_json} end_time = time.time() logger.info(f"开评定标流程 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 开评定标流程: {exc}") # 返回默认值 result = {"开评定标流程": {}} return result def engineering_bid_main(output_folder, file_path, file_type, unique_id): logger = get_global_logger(unique_id) # 预处理文件,获取处理后的数据 processed_data = preprocess_files(output_folder, file_path, file_type, logger) if not processed_data: error_response = { 'error': '文件预处理失败。请检查文件类型并重试。' } yield json.dumps(error_response, ensure_ascii=False) return # 停止进一步处理 #with ProcessPoolExecutor() as executor: with ThreadPoolExecutor() as executor: # 立即启动不依赖 knowledge_name 和 index 的任务 futures = { 'base_info': executor.submit(fetch_project_basic_info, processed_data['invalid_deleted_docx'], processed_data['merged_baseinfo_path'], processed_data['merged_baseinfo_path_more'], processed_data['clause_path'], logger), 'qualification_review': executor.submit(fetch_qualification_review, processed_data['evaluation_method'], processed_data['qualification'], output_folder, processed_data['tobidders_notice_table'], processed_data['clause_path'], processed_data['invalid_deleted_docx'], processed_data['merged_baseinfo_path'], processed_data['notice_path'], logger), 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['invalid_deleted_docx'], processed_data['evaluation_method'], logger), 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_added_docx'], output_folder, logger), 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, processed_data['invalid_deleted_docx'], processed_data['merged_baseinfo_path_more'], processed_data['clause_path'], logger), 'opening_bid': executor.submit(fetch_bid_opening, processed_data['invalid_deleted_docx'], processed_data['merged_baseinfo_path_more'], processed_data['clause_path'], logger) } # 提前处理这些不依赖的任务,按完成顺序返回 for future in as_completed(futures.values()): key = next(k for k, v in futures.items() if v == future) try: result = future.result() # 如果是 evaluation_standards,拆分技术标和商务标 if key == 'evaluation_standards': technical_standards = result["technical_standards"] commercial_standards = result["commercial_standards"] # 分别返回技术标和商务标 yield json.dumps({'technical_standards': transform_json_values(technical_standards)}, ensure_ascii=False) yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)}, ensure_ascii=False) else: # 处理其他任务的结果 yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False) except Exception as exc: logger.error(f"Error processing {key}: {exc}") if key == 'evaluation_standards': # 返回默认的商务评分和技术评分 default_evaluation = { 'technical_standards': {"技术评分": ""}, 'commercial_standards': {"商务评分": ""} } yield json.dumps(default_evaluation, ensure_ascii=False) # yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) # TODO:基本信息,判断是否这里,打勾逻辑取消了。 if __name__ == "__main__": start_time = time.time() output_folder = r"C:\Users\Administrator\Desktop\fsdownload\bb2747ad-5578-4b3b-b60f-3a2d1b672b6f" file_type = 2 # 1:docx 2:pdf 3:其他 input_file = r"C:\Users\Administrator\Desktop\fsdownload\bb2747ad-5578-4b3b-b60f-3a2d1b672b6f\zytest.pdf" print("yes") for output in engineering_bid_main(output_folder, input_file, file_type, "zytest"): print(output) # preprocess_files(output_folder,input_file,2,"121") # engineering_bid_main(output_folder,input_file,2,"111") end_time = time.time() elapsed_time = end_time - start_time # 计算耗时 print(f"Function execution took {elapsed_time} seconds.")