# -*- encoding:utf-8 -*- import json import logging import os import time from flask_app.main.format_change import docx2pdf, pdf2docx from flask_app.main.json_utils import clean_json_string from flask_app.main.判断是否分包等 import merge_json_to_list, read_questions_from_judge from flask_app.main.基础信息整合 import judge_consortium_bidding from flask_app.main.多线程提问 import read_questions_from_file, multi_threading from flask_app.main.通义千问long import upload_file from flask_app.货物标.基础信息解析main import aggregate_basic_info_goods from flask_app.货物标.货物标截取pdf import truncate_pdf_specific_goods from flask_app.main.截取pdf import truncate_pdf_specific_engineering from flask_app.main.post_processing import inner_post_processing from flask_app.main.基础信息整合 import aggregate_basic_info_engineering def get_global_logger(unique_id): if unique_id is None: return logging.getLogger() # 获取默认的日志器 logger = logging.getLogger(unique_id) return logger logger = None #货物标 def little_parse_goods(output_folder, file_path): """ 解析货物相关的基础信息。 参数: output_folder (str): 输出文件夹路径。 file_path (str): 输入文件的路径。 返回: dict: 包含 '基础信息' 的字典。 """ # 截取特定的货物 PDF 文件 selections = [4, 5] # 仅处理 selection 4 和 5 files = truncate_pdf_specific_goods(file_path, output_folder,selections) if not files: raise ValueError("未找到截取后的文件。") # 假设最后一个文件是需要处理的基础信息文件 baseinfo_file_path = files[-1] # 上传文件并获取文件 ID file_id = upload_file(baseinfo_file_path) # 注意:以下路径被硬编码,确保该路径存在并且正确 baseinfo_prompt_file_path='flask_app/static/提示词/小解析基本信息货物标.txt' # baseinfo_prompt_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\小解析基本信息货物标.txt' # 从提示词文件中读取问题 questions = read_questions_from_file(baseinfo_prompt_file_path) # 多线程处理问题,使用指定的处理模式(2 代表使用 qianwen-long) baseinfo_results = multi_threading(questions, "", file_id, 2) # 清理 JSON 字符串 baseinfo_list = [clean_json_string(res) for _, res in baseinfo_results] if baseinfo_results else [] # 聚合基础信息 aggregated_baseinfo = aggregate_basic_info_goods(baseinfo_list) return {"基础信息": aggregated_baseinfo} def little_parse_engineering(output_folder, file_path): """ 解析工程相关的基础信息。 参数: output_folder (str): 输出文件夹路径。 file_path (str): 输入文件的路径。 返回: dict: 包含 '基础信息' 的字典。 """ # 截取特定的工程 PDF 文件 selections = [5, 1,3] files = truncate_pdf_specific_engineering(file_path, output_folder,selections) if not files: raise ValueError("未找到截取后的文件。") # 假设最后一个文件是需要处理的基础信息文件 baseinfo_file_path = files[-1] # 上传文件并获取文件 ID file_id = upload_file(baseinfo_file_path) # 注意:以下路径被硬编码,确保该路径存在并且正确 baseinfo_prompt_file_path='flask_app/static/提示词/小解析基本信息工程标.txt' # baseinfo_prompt_file_path = 'D:\\flask_project\\flask_app\\static\\提示词\\小解析基本信息工程标.txt' # 从提示词文件中读取问题 questions = read_questions_from_file(baseinfo_prompt_file_path) # 多线程处理问题,使用指定的处理模式(2 代表使用 qianwen-long) baseinfo_results = multi_threading(questions, "", file_id, 2) # 清理 JSON 字符串 baseinfo_list = [clean_json_string(res) for _, res in baseinfo_results] if baseinfo_results else [] # 聚合基础信息 aggregated_baseinfo = aggregate_basic_info_engineering(baseinfo_list) return {"基础信息": aggregated_baseinfo} def little_parse_main(output_folder, file_path, file_type,zb_type,unique_id): """ 主解析函数,根据文件类型和招标类型处理文件,并保存结果为JSON文件。 参数: output_folder (str): 输出文件夹路径。 file_path (str): 待处理文件的路径。 file_type (int): 文件类型(1: docx, 2: pdf, 3: doc)。 zb_type (int): 招标类型(2: 货物标, 其他: 工程标)。 返回: str: 保存的JSON文件的路径。 """ global logger logger = get_global_logger(unique_id) logger.info("zb_type:"+str(zb_type)) # 根据文件类型处理文件路径 if file_type == 1: # docx docx_path = file_path pdf_path = docx2pdf(docx_path) # 将docx转换为pdf以供后续处理 elif file_type == 2: # pdf pdf_path = file_path # docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库 elif file_type == 3: # doc pdf_path = docx2pdf(file_path) # docx_path = pdf2docx(pdf_path) else: logger.error("Unsupported file type provided. Preprocessing halted.") return None # 根据招标类型调用相应的解析函数 if zb_type == 2: # 货物标 combined_data = little_parse_goods(output_folder, pdf_path) logger.info("Parsed goods data: %s", json.dumps(combined_data, ensure_ascii=False, indent=4)) res = inner_post_processing(combined_data["基础信息"]) else: combined_data = little_parse_engineering(output_folder, pdf_path) logger.info("Parsed engineering data: %s", json.dumps(combined_data, ensure_ascii=False, indent=4)) res = inner_post_processing(combined_data["基础信息"]) # 定义保存JSON文件的路径 json_path = os.path.join(output_folder, "final_result.json") # 将结果保存为JSON文件 with open(json_path, 'w', encoding='utf-8') as json_file: json.dump(res, json_file, ensure_ascii=False, indent=4) logger.info(f"Extraction results saved to {json_path}") # 返回JSON文件的路径 return json_path if __name__ == "__main__": start_time = time.time() file_type = 2 # 1:docx 2:pdf 3:其他 # output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\little_parse_output" # zb_type=2 #1:工程标 2:货物标 # input_file = "C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\ztbfile.pdf" output_folder="C:\\Users\\Administrator\\Desktop\\招标文件\\special_output" zb_type=1 input_file="C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest2.pdf" final_json_path=little_parse_main(output_folder, input_file, file_type, zb_type,"122334") with open(final_json_path, 'r', encoding='utf-8') as f: logger.info('final_json_path:' + final_json_path) zbparse_data = json.load(f) json_str = json.dumps(zbparse_data, ensure_ascii=False,indent=4) print(json_str) end_time = time.time() elapsed_time = end_time - start_time # 计算耗时 print(f"Function execution took {elapsed_time} seconds.")