# -*- encoding:utf-8 -*- import json import logging import time from concurrent.futures import ThreadPoolExecutor from flask_app.main.截取pdf import truncate_pdf_multiple from flask_app.main.table_content_extraction import extract_tables_main from flask_app.main.投标人须知正文条款提取成json文件 import convert_clause_to_json from flask_app.general.json_utils import transform_json_values from flask_app.main.无效标和废标和禁止投标整合 import combine_find_invalid from flask_app.main.投标人须知正文提取指定内容 import extract_from_notice import concurrent.futures from flask_app.main.基础信息整合快速版 import combine_basic_info from flask_app.main.资格审查模块 import combine_review_standards from flask_app.main.商务评分技术评分整合 import combine_evaluation_standards from flask_app.general.format_change import pdf2docx, docx2pdf,doc2docx from flask_app.general.docx截取docx import copy_docx from flask_app.general.通义千问long import upload_file,qianwen_long from flask_app.general.json_utils import clean_json_string def get_global_logger(unique_id): if unique_id is None: return logging.getLogger() # 获取默认的日志器 logger = logging.getLogger(unique_id) return logger logger=None # 创建全局线程池 executor = ThreadPoolExecutor() def preprocess_files(output_folder, downloaded_file_path, file_type,unique_id): logger.info("starting 文件预处理...") logger.info("output_folder..." + output_folder) # 根据文件类型处理文件路径 if file_type == 1: # docx docx_path = downloaded_file_path pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理 elif file_type == 2: # pdf pdf_path = downloaded_file_path docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库 elif file_type == 3: #doc pdf_path=docx2pdf(downloaded_file_path) docx_path=doc2docx(downloaded_file_path) else: logger.error("Unsupported file type provided. Preprocessing halted.") return None # 调用截取PDF多次 truncate_files = truncate_pdf_multiple(pdf_path, output_folder,unique_id) # 处理各个部分 tobidders_notice_table=truncate_files[0] truncate0_docpath = pdf2docx(tobidders_notice_table) # 投标人须知前附表转docx truncate_jsonpath = extract_tables_main(truncate0_docpath, output_folder) # 投标人须知前附表docx->json tobidders_notice = truncate_files[1] #投标人须知正文 evaluation_method = truncate_files[2] #评标方法 qualification = truncate_files[3] #资格审查 invalid_path=truncate_files[5] invalid_docpath = copy_docx(docx_path) # docx截取无效标部分 # invalid_docpath=pdf2docx(invalid_path) merged_baseinfo_path=truncate_files[-1] clause_path = convert_clause_to_json(tobidders_notice, output_folder) # 投标人须知正文条款pdf->json logger.info("文件预处理done") # 返回包含预处理后文件路径的字典 return { 'file_path': downloaded_file_path, 'output_folder': output_folder, 'invalid_path':invalid_path, 'tobidders_notice_table': tobidders_notice_table, 'tobidders_notice': tobidders_notice, 'evaluation_method':evaluation_method, 'qualification': qualification, 'truncate0_jsonpath': truncate_jsonpath, 'merged_baseinfo_path':merged_baseinfo_path, 'clause_path': clause_path, 'invalid_docpath': invalid_docpath } def post_processing(data,includes): # 初始化结果字典,预设'其他'分类为空字典 result = {"其他": {}} # 遍历原始字典的每一个键值对 for key, value in data.items(): if key in includes: # 如果键在includes列表中,直接保留这个键值对 result[key] = value else: # 如果键不在includes列表中,将这个键值对加入到'其他'分类中 result["其他"][key] = value # 如果'其他'分类没有任何内容,可以选择删除这个键 if not result["其他"]: del result["其他"] return result # 基本信息 def fetch_project_basic_info(invalid_path,merged_baseinfo_path, tobidders_notice_table, tobidders_notice, clause_path): # 投标人须知前附表 logger.info("starting基础信息...") if not merged_baseinfo_path: merged_baseinfo_path=invalid_path if not tobidders_notice_table: tobidders_notice_table=invalid_path if not tobidders_notice: tobidders_notice=invalid_path basic_res = combine_basic_info(invalid_path,merged_baseinfo_path, tobidders_notice_table, tobidders_notice, clause_path) logger.info("基础信息done") return basic_res # 形式、响应、资格评审 def fetch_qualification_review(evaluation_method, qualification,output_folder, truncate0_jsonpath, clause_path,invalid_path,merged_baseinfo_path): logger.info("starting资格审查...") if not evaluation_method: evaluation_method=invalid_path if not merged_baseinfo_path: merged_baseinfo_path=invalid_path review_standards_res = combine_review_standards(evaluation_method, qualification,output_folder, truncate0_jsonpath, clause_path,invalid_path,merged_baseinfo_path) logger.info("资格审查done") return review_standards_res # 评分细则 流式 def fetch_evaluation_standards(invalid_path,evaluation_method): # 评标办法前附表 logger.info("starting 商务标和技术标...") if not evaluation_method: evaluation_method=invalid_path # 获取评标办法前附表的字典结果 evaluation_standards_res = combine_evaluation_standards(evaluation_method) # 获取技术标和商务标 technical_standards = {"技术评分": evaluation_standards_res.get("技术评分", {})} commercial_standards = {"商务评分": evaluation_standards_res.get("商务评分", {})} logger.info("商务标和技术标 done") # 返回将 "技术标" 和 "商务标" 包含在新的键中 return { "technical_standards": technical_standards, "commercial_standards": commercial_standards } # 无效、废标项解析 def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, qualification): # 废标项要求:千问 logger.info("starting无效标与废标...") find_invalid_res = combine_find_invalid(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, qualification) logger.info("无效标与废标done...") return find_invalid_res # 投标文件要求 def fetch_bidding_documents_requirements(invalid_path,clause_path): logger.info("starting投标文件要求...") fetch_bidding_documents_requirements_json = extract_from_notice(clause_path, 1) if not fetch_bidding_documents_requirements_json: file_id=upload_file(invalid_path) user_query=""" 该招标文件中对投标文件的要求是什么?你需要从'编写要求'、'格式'、'承诺书要求'、'递交要求'四个角度来回答,其中'格式'可以从投标文件格式要求、标记要求、装订要求、文件数量要求角度说明,'递交要求'可以从投标地点、投标文件交标方式、投标文件的修改与撤回角度说明,请以json格式返回给我结果,外层键名分别为'编写要求','格式','承诺书要求','递交要求',你可以用嵌套键值对组织回答,嵌套键名为你对相关子要求的总结,而嵌套键名应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。输出格式示例如下: { "编写要求":"投标函的编写要求xxx;法定代表人身份证明要求xx", "格式":{ "投标文件格式要求":"投标文件格式要求", "标记要求":"投标文件标记要求", "装订要求":"投标文件装订要求", "文件数量":"投标文件文件数量要求" }, "承诺书要求":"未知", "递交要求":{ "投标地点":"使用加密其投标文件的CA数字证书(企业锁)登录“电子交易系统”,进入“开标大厅”选择所投标段进行签到,并实时在线关注招标人的操作情况。", "投标文件交标方式":"线上开标", "投标文件的修改与撤回":"在投标人须知前附表规定的投标有效期内,投标人不得要求撤销或修改其投标文件。出现特殊情况需要延长投标有效期的,招标人以书面形式通知所有投标人延长投标有效期。投标人同意延长的,应相应延长其投标保证金的有效期,但不得要求或被允许修改或撤销其投标文件;投标人拒绝延长的,其投标失效,但投标人有权收回其投标保证金。" } } """ res=qianwen_long(file_id, user_query) fetch_bidding_documents_requirements_json=clean_json_string(res) logger.info("投标文件要求done...") return {"投标文件要求":fetch_bidding_documents_requirements_json} # 开评定标流程 def fetch_bid_opening(invalid_path,clause_path): logger.info("starting开评定标流程...") fetch_bid_opening_json = extract_from_notice(clause_path, 2) if not fetch_bid_opening_json: user_query = """ 该招标文件中开评定标要求是什么?你需要从'开标'、'开标异议'、'评标'、'定标'四个角度回答,其中'评标'可以从特殊情况的处置、评标办法及流程、评标委员会的组建角度来说明,'定标'可以从定标流程、履约能力的审查角度来说明,请以json格式返回给我结果,外层键名分别为'开标'、'开标异议'、'评标'、'定标',你可以用嵌套键值对组织回答,嵌套键名为你对相关子要求的总结,而嵌套键名应该完全与原文内容保持一致,不得擅自总结删减,如果原文中未提及相关内容,在键值中填'未知'。输出格式示例如下: { "开标":"招标文件关于项目开标的要求", "开标异议":"招标文件中关于开标异议的项", "评标":{ "特殊情况的处置":"因“电子交易系统”系统故障导致无法投标的,交易中心及时通知招标人,招标人视情况决定是否顺延投标截止时间。因投标人自身原因导致无法完成投标的,由投标人自行承担后果。" "评标办法及流程":"评标流程", "评标委员会的组建":"评标由招标人依法组建的评标委员会负责。评标委员会由招标人或其委托的招标代理机构熟悉相关业务的代表,以及有关技术、经济等方面的专家组成。" } "定标":{ "定标流程":"定标流程", "履约能力的审查":"履约能力的审查" } } """ file_id = upload_file(invalid_path) res = clean_json_string(qianwen_long(file_id, user_query)) fetch_bid_opening_json=clean_json_string(res) logger.info("开评定标流程done...") return {"开评定标流程":fetch_bid_opening_json} #分段返回 def engineering_bid_main(output_folder, downloaded_file_path, file_type, unique_id): global logger logger = get_global_logger(unique_id) # 预处理文件,获取处理后的数据 processed_data = preprocess_files(output_folder, downloaded_file_path, file_type,unique_id) if not processed_data: yield json.dumps({}) # 如果处理数据失败,返回空的 JSON with concurrent.futures.ThreadPoolExecutor() as executor: # 立即启动不依赖 knowledge_name 和 index 的任务 futures = { 'base_info': executor.submit(fetch_project_basic_info,processed_data['invalid_path'] ,processed_data['merged_baseinfo_path'], processed_data['tobidders_notice_table'], processed_data['tobidders_notice'], processed_data['clause_path']), 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], processed_data['qualification'], output_folder, processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['invalid_path'], processed_data['merged_baseinfo_path']), 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['invalid_path'],processed_data['evaluation_method']), 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'], output_folder, processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['qualification']), 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,processed_data['invalid_path'], processed_data['clause_path']), 'opening_bid': executor.submit(fetch_bid_opening,processed_data['invalid_path'], processed_data['clause_path']) } # 提前处理这些不依赖的任务,按完成顺序返回 for future in concurrent.futures.as_completed(futures.values()): key = next(k for k, v in futures.items() if v == future) try: result = future.result() # 如果是 evaluation_standards,拆分技术标和商务标 if key == 'evaluation_standards': technical_standards = result["technical_standards"] commercial_standards = result["commercial_standards"] # 分别返回技术标和商务标 yield json.dumps({'technical_standards': transform_json_values(technical_standards)}, ensure_ascii=False) yield json.dumps({'commercial_standards': transform_json_values(commercial_standards)}, ensure_ascii=False) else: # 处理其他任务的结果 yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False) except Exception as exc: logger.error(f"Error processing {key}: {exc}") yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) if __name__ == "__main__": start_time = time.time() output_folder = "C:\\Users\\Administrator\\Desktop\\招标文件\\new_test1" file_type = 2 #1:docx 2:pdf 3:其他 input_file = "C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest2.pdf" print("yes") for output in engineering_bid_main(output_folder, input_file, file_type, "zytest"): print(output) end_time = time.time() elapsed_time = end_time - start_time # 计算耗时 print(f"Function execution took {elapsed_time} seconds.")