# 竞磋 竞谈 磋商 询价 邀请 单一来源 import json import time from docx import Document from flask_app.general.format_change import docx2pdf, pdf2docx,doc2docx from flask_app.general.insert_del_pagemark import insert_mark, delete_mark from flask_app.general.json_utils import transform_json_values from flask_app.general.通用功能函数 import get_global_logger from flask_app.货物标.基础信息解析main import combine_basic_info from flask_app.货物标.投标人须知正文提取指定内容货物标版 import extract_from_notice from flask_app.general.截取pdf_main import truncate_pdf_multiple from concurrent.futures import ThreadPoolExecutor import concurrent.futures from flask_app.货物标.提取json货物标版 import convert_clause_to_json from flask_app.general.无效标和废标公共代码 import combine_find_invalid from flask_app.货物标.资格审查main import combine_qualification_review from flask_app.general.商务技术评分提取 import combine_evaluation_standards # 创建全局线程池 executor = ThreadPoolExecutor() def preprocess_files(output_folder, file_path, file_type,logger): logger.info("starting 文件预处理...") start_time = time.time() logger.info("output_folder..." + output_folder) # 根据文件类型处理文件路径 if file_type == 1: # docx # docx_path = file_path pdf_path = docx2pdf(file_path) # 将docx转换为pdf以供后续处理 elif file_type == 2: # pdf pdf_path = file_path # docx_path = pdf2docx(pdf_path) elif file_type == 3: # doc pdf_path = docx2pdf(file_path) # docx_path = doc2docx(file_path) else: logger.error("Unsupported file type provided. Preprocessing halted.") return None # 调用截取PDF多次 truncate_files = truncate_pdf_multiple(pdf_path, output_folder,logger,'goods') # index: 0->商务技术服务要求 1->评标办法 2->资格审查 3->投标人须知前附表 4->投标人须知正文 # 处理各个部分 invalid_path=pdf_path invalid_added_pdf = insert_mark(invalid_path) invalid_added_docx = pdf2docx(invalid_added_pdf) # 有标记的invalid_path try: # 尝试加载 .docx 文件 doc = Document(invalid_added_docx) print("yes") except Exception as e: # 捕获异常并打印错误信息 invalid_added_docx=pdf2docx(pdf_path) invalid_deleted_docx = delete_mark(invalid_added_docx) # 无标记的invalid_path # invalid_docpath = invalid_added_docx # docx截取无效标部分 procurement_path = truncate_files[5] # 采购需求 evaluation_method_path = truncate_files[1] # 评标办法 qualification_path = truncate_files[2] # 资格审查 tobidders_notice_path = truncate_files[4] # 投标人须知正文 notice_path = truncate_files[0] #招标公告 merged_baseinfo_path = truncate_files[6] # 合并封面+招标公告+投标人须知前附表+须知正文 clause_path = convert_clause_to_json(tobidders_notice_path, output_folder) # 投标人须知正文条款pdf->json end_time = time.time() logger.info(f"文件预处理 done,耗时:{end_time - start_time:.2f} 秒") # 提前返回,不等待 future_knowledge 完成,返回包含 Future 对象 return { 'invalid_deleted_docx': invalid_deleted_docx, 'invalid_added_docx': invalid_added_docx, 'output_folder': output_folder, 'procurement_path': procurement_path, 'evaluation_method_path': evaluation_method_path, 'qualification_path': qualification_path, 'notice_path': notice_path, 'clause_path': clause_path, 'merged_baseinfo_path': merged_baseinfo_path } def fetch_project_basic_info(invalid_deleted_docx, merged_baseinfo_path, procurement_path, clause_path, logger): logger.info("starting 基础信息...") start_time = time.time() try: if not merged_baseinfo_path: merged_baseinfo_path = invalid_deleted_docx if not procurement_path: procurement_path = invalid_deleted_docx basic_res = combine_basic_info(merged_baseinfo_path, procurement_path, clause_path, invalid_deleted_docx) print(json.dumps(basic_res,ensure_ascii=False,indent=4)) base_info, good_list = post_process_baseinfo(basic_res, logger) result = base_info, good_list end_time = time.time() logger.info(f"基础信息 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 基础信息: {exc}") # 返回默认值 result = {"基础信息": {}}, [] return result def fetch_qualification_review(invalid_deleted_docx, qualification_path, notice_path, logger): logger.info("starting 资格审查...") start_time = time.time() try: if not notice_path: notice_path=invalid_deleted_docx review_standards_res = combine_qualification_review(invalid_deleted_docx, qualification_path, notice_path) result = review_standards_res end_time = time.time() logger.info(f"资格审查 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 资格审查: {exc}") # 返回默认值 result = {"资格审查": {}} return result def fetch_evaluation_standards(invalid_deleted_docx, evaluation_method_path,logger): logger.info("starting 商务评分和技术评分...") start_time = time.time() if not evaluation_method_path: evaluation_method_path = invalid_deleted_docx evaluation_standards_res = combine_evaluation_standards(evaluation_method_path,invalid_deleted_docx,2) technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})} commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})} end_time = time.time() logger.info(f"商务评分和技术评分 done,耗时:{end_time - start_time:.2f} 秒") return { "technical_standards": technical_standards, "commercial_standards": commercial_standards } def fetch_invalid_requirements(invalid_added_docx, output_folder, logger): logger.info("starting 无效标与废标...") start_time = time.time() try: find_invalid_res = combine_find_invalid(invalid_added_docx, output_folder) result = find_invalid_res end_time = time.time() logger.info(f"无效标与废标 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 无效标与废标: {exc}") # 返回默认值 result = {"无效标与废标": {}} return result def fetch_bidding_documents_requirements(invalid_deleted_docx, merged_baseinfo_path, clause_path, logger): logger.info("starting 投标文件要求...") if not merged_baseinfo_path: merged_baseinfo_path = invalid_deleted_docx start_time = time.time() selection = 1 try: fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path, clause_path, selection) result = {"投标文件要求": fetch_bidding_documents_requirements_json} end_time = time.time() logger.info(f"投标文件要求 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 投标文件要求: {exc}") # 返回默认值,假设默认值为一个空字典 result = {"投标文件要求": {}} return result # 开评定标流程 def fetch_bid_opening(invalid_deleted_docx, merged_baseinfo_path, clause_path, logger): logger.info("starting 开评定标流程...") if not merged_baseinfo_path: merged_baseinfo_path = invalid_deleted_docx start_time = time.time() selection = 2 try: fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path, clause_path, selection) result = {"开评定标流程": fetch_bid_opening_json} end_time = time.time() logger.info(f"开评定标流程 done,耗时:{end_time - start_time:.2f} 秒") except Exception as exc: logger.error(f"Error in 开评定标流程: {exc}") # 返回默认值,假设默认值为一个空字典 result = {"开评定标流程": {}} return result def post_process_baseinfo(base_info,logger): """ 在 'base_info' 任务完成后执行的函数。 确保在缺少某些键时,返回 good_list=[]。 参数: - base_info (dict): 原始的 base_info 数据。 返回: - tuple: (处理后的 base_info, good_list) """ try: pure_base_info = base_info.get("基础信息", {}) # 尝试提取 '货物列表',若中间某个键不存在,返回 good_list=[] procurement_reqs = pure_base_info.get('采购要求', {}) technical_requirements = procurement_reqs.get('采购需求', {}) good_list = technical_requirements.pop('货物列表', []) # 如果 '货物列表' 不存在,返回 [] # 删除 '货物列表' 后更新原始的 base_info if '采购需求' in procurement_reqs and not technical_requirements: # 若技术要求为空,删除该键 procurement_reqs.pop('采购需求') if '采购要求' in pure_base_info and not procurement_reqs: # 若采购要求为空,删除该键 pure_base_info.pop('采购要求') # 更新基础信息 base_info['基础信息'] = pure_base_info # logger.info(f"Extracted good_list: {good_list}") return base_info, good_list except Exception as e: logger.error(f"Error in post_process_baseinfo: {e}") return base_info, [] # 返回空列表 #TODO:错误处理,通过返回值completion来错误处理,而不是正则表达 学习装饰器、 整体后处理 def goods_bid_main(output_folder, file_path, file_type, unique_id): logger = get_global_logger(unique_id) # 预处理文件,获取处理后的数据 processed_data = preprocess_files(output_folder, file_path, file_type,logger) if not processed_data: yield json.dumps({}) # 如果处理数据失败,返回空的 JSON with concurrent.futures.ThreadPoolExecutor() as executor: # 立即启动不依赖 knowledge_name 和 index 的任务 futures = { 'evaluation_standards': executor.submit(fetch_evaluation_standards,processed_data['invalid_deleted_docx'], processed_data['evaluation_method_path'],logger), 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_added_docx'], output_folder,logger), 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path'], processed_data['clause_path'],logger), 'opening_bid': executor.submit(fetch_bid_opening, processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path'],processed_data['clause_path'],logger), 'base_info': executor.submit(fetch_project_basic_info, processed_data['invalid_deleted_docx'],processed_data['merged_baseinfo_path'], processed_data['procurement_path'],processed_data['clause_path'],logger), 'qualification_review': executor.submit(fetch_qualification_review, processed_data['invalid_deleted_docx'], processed_data['qualification_path'], processed_data['notice_path'],logger), } # 提前处理这些不依赖的任务,按完成顺序返回 for future in concurrent.futures.as_completed(futures.values()): key = next(k for k, v in futures.items() if v == future) try: result = future.result() if key == 'base_info': base_info, good_list = result collected_good_list = good_list # Store good_list for later use yield json.dumps({'base_info': transform_json_values(base_info)}, ensure_ascii=False) # 如果是 evaluation_standards,拆分技术标和商务标 elif key == 'evaluation_standards': technical_standards = result["technical_standards"] commercial_standards = result["commercial_standards"] # 分别返回技术标和商务标 yield json.dumps({'technical_standards': transform_json_values(technical_standards)},ensure_ascii=False) yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)},ensure_ascii=False) else: # 处理其他任务的结果 yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False) except Exception as exc: logger.error(f"Error processing {key}: {exc}") if key == 'evaluation_standards': # 返回默认的商务评分和技术评分 default_evaluation = { 'technical_standards': {"技术评分": ""}, 'commercial_standards': {"商务评分": ""} } yield json.dumps(default_evaluation, ensure_ascii=False) # yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) if collected_good_list is not None: yield json.dumps({'good_list': transform_json_values(collected_good_list)}, ensure_ascii=False) #广水市 2022 年义务教育学校多媒体补充采购项目 资格审查有问题 #TODO:把所有未知都删掉。 #TODO:考虑把解析失败的调用豆包,全文上传。 #TODO:重置一下投标文件格式提取那部分的代码 #TODO:小解析考虑提速:1:直接pdf转文本,再切分。后期考虑。 #TODO: ec7d5328-9c57-450f-baf4-2e5a6f90ed1d #TODO:1.先截取合同、投标文件格式之前的页码 即 invalid 如果页码小于50页,那么剩下的不切了直接仍。 # 2.废标项这边,考虑大模型+正则并用 # 3.限制评分项的因素。 #商务标这里改为列表最里层 #good_list 金额 截取上下文 if __name__ == "__main__": # 配置日志器 unique_id = "uuidzyzy11" # logger = get_global_logger(unique_id) output_folder = "flask_app/static/output/zytest1" file_type = 1 # 1:docx 2:pdf 3:其他 input_file = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\6.2定版视频会议磋商文件.pdf" start_time = time.time() # 创建生成器 generator = goods_bid_main(output_folder, input_file, file_type, unique_id) # 迭代生成器,逐步获取和处理结果 for output in generator: print(output) end_time = time.time() elapsed_time = end_time - start_time # 计算耗时 print(f"Function execution took {elapsed_time:.2f} seconds.")