diff --git a/.idea/encodings.xml b/.idea/encodings.xml index 28b03da..dce1f12 100644 --- a/.idea/encodings.xml +++ b/.idea/encodings.xml @@ -1,6 +1,9 @@ + + + diff --git a/flask_app/main/start_up.py b/flask_app/main/start_up.py index 6448458..d70df61 100644 --- a/flask_app/main/start_up.py +++ b/flask_app/main/start_up.py @@ -1,9 +1,9 @@ import logging import shutil +import sys import time import uuid from datetime import datetime, timedelta - from flask import Flask, request, jsonify, Response, stream_with_context, g import json import os @@ -117,32 +117,42 @@ def process_and_stream(file_url): logger.info("Local file path: " + downloaded_filepath) combined_data = {} + + # 从 main_processing 获取数据 for data in main_processing(output_folder, downloaded_filepath, file_type, unique_id): response = { 'message': 'Processing', 'filename': os.path.basename(downloaded_filepath), 'data': data } - yield f"data: {json.dumps(response)}\n\n" + # 日志记录和流式响应 + yield f"data: {json.dumps(response, ensure_ascii=False)}\n\n" - # 解析数据并添加到 combined_data - parsed_data = json.loads(data.split('data: ')[1]) + if not data: + logger.error(f"Empty data received: {data}") + continue + + # 解析 data 作为 JSON 格式数据 + if not data.strip(): + logger.error("Received empty data, skipping JSON parsing.") + else: + try: + parsed_data = json.loads(data) + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + logger.error(f"Data received: {data}") + continue # 跳过该数据处理 + + # 遍历 parsed_data 只提取内层内容进行合并 for outer_key, inner_dict in parsed_data.items(): if isinstance(inner_dict, dict): - # 获取内层字典的第一个(也是唯一的)键值对 - inner_key, inner_value = next(iter(inner_dict.items())) - if outer_key not in combined_data: - combined_data[outer_key] = {} - combined_data[outer_key][inner_key] = inner_value - else: - # 处理 evaluation_standards 的特殊情况 - combined_data[outer_key] = inner_dict + combined_data.update(inner_dict) - # 发送整合后的完整数据 + # 等待所有数据都处理完后,发送整合后的完整数据 complete_response = { 'message': 'Combined data', 'filename': os.path.basename(downloaded_filepath), - 'data': combined_data + 'data': json.dumps(combined_data, ensure_ascii=False) } yield f"data: {json.dumps(complete_response, ensure_ascii=False)}\n\n" @@ -152,7 +162,7 @@ def process_and_stream(file_url): 'filename': os.path.basename(downloaded_filepath), 'data': 'END' } - yield f"data: {json.dumps(final_response)}\n\n" + yield f"data: {json.dumps(final_response, ensure_ascii=False)}\n\n" def validate_request(): diff --git a/flask_app/main/test.py b/flask_app/main/test.py index 26daceb..7434b80 100644 --- a/flask_app/main/test.py +++ b/flask_app/main/test.py @@ -1,91 +1,124 @@ -# -*- encoding:utf-8 -*- +import concurrent.futures import json +import time +import sys -def find_keys_by_value(target_value, json_data): - # 查找完全匹配目标值的键,如果没有找到,检查字符串值是否包含目标值 - matched_keys = [k for k, v in json_data.items() if v == target_value] - if not matched_keys: - matched_keys = [k for k, v in json_data.items() if isinstance(v, str) and v.startswith(target_value)] - print(matched_keys) - return matched_keys +def fetch_project_basic_info(knowledge_name, truncate0, output_folder, clause_path): + time.sleep(2) # 模拟任务耗时5秒 + return {"basic_info": "Project basic info"} -#若match_keys中有3.1,那么以3.1为前缀的键都会被找出来,如3.1.1 3.1.2... -def find_keys_with_prefix(key_prefix, json_data): - # 查找以特定前缀开始的所有键 - subheadings = [k for k in json_data if k.startswith(key_prefix)] - return subheadings +def fetch_qualification_review(truncate1, truncate3, knowledge_name, truncate0_jsonpath, clause_path, input_file_path, output_folder): + time.sleep(4) # 模拟任务耗时10秒 + return {"qualification_review": "Qualification review"} -def extract_json(data, target_values): - results = {} - for target_value in target_values: - matched_keys = find_keys_by_value(target_value, data) - for key in matched_keys: - key_and_subheadings = find_keys_with_prefix(key, data) - for subkey in key_and_subheadings: - if "." in subkey: - parent_key = subkey.rsplit('.', 1)[0] - top_level_key = parent_key.split('.')[0] + '.' - # 特别处理定标相关的顶级键,确保不会重复添加其他键 - if top_level_key not in results: - results[top_level_key] = target_value - # 添加或更新父级键 - if parent_key not in results: - if parent_key in data: - results[parent_key] = data[parent_key] - # 添加当前键 - results[subkey] = data[subkey] - return results +def fetch_evaluation_standards(truncate1): + time.sleep(6) # 模拟任务耗时15秒 + return {"technical_standards": "Technical standards", "commercial_standards": "Commercial standards"} -# 示例 JSON 数据 -data = { - "3.1": "投标文件的组成", - "3.1.1": "投标文件应包括下列内容:(1)投标函及投标函附录;(2)法定代表人身份证明;(3)联合体协议书(如有);(4)投标保证金;(5)监理服务费投标报价表;(6)监理大纲;(7)监理机构;(8)资格审查资料;(9)投标人须知前附表规定的其他材料。", - "3.1.2": "招标公告规定不接受联合体投标的,或投标人没有组成联合体的,投标文件不包括本章第3.1.1项第(3)目所指的联合体协议书。", - "3.1.3": "投标人须知前附表规定不允许分包的,投标文件不包括本章第3.1.1项第(8)目所指的拟分包项目情况表。", - "3.2": "投标报价", - "3.2.1": "投标报价是投标人按照招标文件的要求完成投标人须知前附表规定监理服务阶段监理工作所需的费用。", - "3.2.2": "招标人是否设置最高投标限价见投标人须知前附表,如采用设置了最高投标限价,投标人在投标书中的报价应在最高投标限价范围内,最高投标限价在招标文件中或最迟在投标截止时间前15日,通过“电子交易平台”以书面形式发给所有下载招标文件的投标人。", - "3.2.3": "投标人应按照招标文件规定的格式和内容计算投标报价。", - "3.2.4": "其他报价规定见投标人须知前附表。", - "3.3": "投标有效期", - "3.3.1": "在投标人须知前附表规定的投标有效期内,投标人不得要求撤销或修改其投标文件。", - "3.3.2": "出现特殊情况需要延长投标有效期的,招标人以书面形式通知所有投标人延长投标有效期。投标人同意延长的,应相应延长其投标保证金的有效期,但不得要求或被允许修改或撤销其投标文件;投标人拒绝延长的,其投标失效,但投标人有权收回其投标保证金。", - "3.4": "投标保证金", - "3.4.1": "投标人须知前附表规定提交投标保证金的,投标人在递交投标文件的同时,应按投标人须知前附表规定的形式、金额、递交截止时间、递交方式提交投标保证金,并作为其投标文件的组成部分。联合体投标的,其投标保证金由牵头人递交,并应符合投标人须知前附表的规定。联合体其他成员提交保证金的,保证金无效。投标保证金有效期均应与投标有效期一致。招标人如果按本章第3.3.2项的规定延长了投标有效期,则投标保证金有效期也相应延长。", - "3.4.2": "招标人与中标人签订监理合同后5日内,向中标人和未中标的投标人退还投标保证金及银行同期存款利息;如果要求中标人提供履约担保的,中标人的投标保证金在中标人提交了履约担保并签订监理合同后5日内退还。", - "3.4.3": "投标保证金有效期与投标文件有效期一致,招标人按规定延长投标文件有效期的,投标保证金按本章投标人须知第3.3款的规定执行。", - "3.4.4": "有下列情形之一的,投标保证金将不予退还:(1)投标人在规定的投标有效期内撤销或修改其投标文件;(2)中标人在收到中标通知书后,无正当理由拒签合同协议书或未按招标文件规定提交履约担保;(3)中标人在签订合同时向招标人提出附加条件;(4)推荐的中标候选人在中标通知书发出前放弃中标;(5)投标人不接受依据评标办法的规定对其投标文件中细微偏差进行澄清和补正;(6)投标人存在以他人名义投标、与他人串通投标、以行贿手段谋取中标、弄虚作假等行为。", - "3.5": "资格审查资料", - "3.5.1": "投标人应按招标文件第八章“投标文件格式”中规定的表格内容填写资格审查表,并按各资格审查表的具体要求提供相关证件及证明材料。招标文件中提到的“近3年”除有特别说明外,指从投标截止日往前推算的3年,如投标截止日为2018年2月1日,则近3年是指2015年2月1日至2018年1月31日。其他情况依此类推。", - "3.5.2": "“投标人近年财务状况表”具体年份要求见投标人须知前附表。财务状况的年份要求指的是年度。如投标截止日如在当年6月30日以前,则近年指上上个年度往前推算的3年,例如投标截止日为2018年5月30日,近三年是指2014年度、2015年度、2016年度。投标截止日如在当年6月30日以后,则近三年是指上个年度往前推算的3年,例如投标截止日为2018年7月30日,近三年是指2015年度、2016年度、2017年度。", - "3.5.3": "“投标人近年已完工的类似工程一览表”具体年份要求见投标人须知前附表。", - "3.5.4": "招标公告规定接受联合体投标的,本章第3.5.1项规定的表格和资料应包括联合体各方相关情况。", - "3.6": "投标文件的编制", - "3.6.1": "投标文件应按“投标文件格式”进行编写,如有必要,可以增加附页,作为投标文件的组成部分。其中,投标函附录在满足招标文件实质性要求的基础上,可以提出比招标文件要求更有利于招标人的承诺。", - "3.6.2": "投标文件应当对招标文件有关工期、投标有效期、质量要求、技术标准和要求、招标范围等实质性内容作出响应。", - "3.6.3": "投标文件制作(1)投标文件由投标人使用“市电子交易平台”自带的“投标文件制作工具”制作生成。(2)投标人在编制投标文件时应当建立分级目录,并按照标签提示导入相关内容。(3)投标文件中证明资料的“复印件”均为“原件的扫描件”,(4)投标文件中的已标价工程量清单数据文件应与招标人提供的工程量清单数据文件格式一致。(5)第八章投标文件格式文件要求盖单位章和(或)签字的地方,投标人均应使用CA数字证书加盖投标人的单位电子印章和(或)法定代表人的个人电子印章或电子签名章。联合体投标的,投标文件由联合体牵头人按上述规定加盖联合体牵头人单位电子印章和(或)法定代表人的个人电子印章或电子签名章。(6)投标文件制作完成后,将生成一份加密的电子投标文件和一份不加密的电子投标文件。(7)投标人将不加密的电子投标文件复制到一张U盘中,U盘表面粘贴“标签贴”,并将项目名称、招标编号、投标人名称等信息填写在“标签贴”上。(8)投标文件制作的具体方法详见“投标文件制作工具”中的帮助文档。", - "3.6.4": "投标文件份数投标文件包括加密的电子投标文件和不加密的电子投标文件(U盘备份)各一份。投标人中标后向招标人另行提供与投标时相同的纸质版投标文件份数见投标人须知前附表。", - "4.": "投标", - "4.1": "投标文件的密封与标记", - "4.1.1": "投标文件的加密加密的电子投标文件应按照本章第3.6.3项要求制作并加密,未按要求加密的电子投标文件,招标人(“市电子交易平台”)将拒收并提示。", - "4.1.2": "不加密的电子投标文件的密封和标识(1)不加密的电子投标文件(U盘备份)应单独密封包装,并在封套的封口处加贴封条。(2)不加密的电子投标文件(U盘备份)封套上应写明的其他内容见投标人须知前附表。3未按本章第4.1.2项要求密封和加写标记的投标文件,招标人将拒收。", - "4.2": "投标文件的递交", - "4.2.1": "在招标公告规定的投标截止时间前,投标人可以修改或撤回已递交的投标文件。", - "4.2.2": "投标人对加密的电子投标文件进行撤回的,在“市电子交易平台”直接进行撤回操作;投标人对不加密的电子投标文件(U盘备份)进行撤回的,应以书面形式通知招标人,撤回的书面通知应加盖投标人的单位章或由法定代表人或其委托代理人签字(指亲笔签名),招标人收到书面通知后,向投标人出具签收凭证。", - "4.2.3": "投标人撤回投标文件的,招标人自收到投标人书面撤回通知之日按公共资源交易中心退还已收取的投标保证金。", - "4.2.4": "投标人修改投标文件的,应当先按本章第4.2项的规定撤回投标文件,再使用“投标文件制作工具”制作成完整的投标文件,并按照本章第3条、第4条规定进行编制、密封、标记和递交。", - "4.2.5": "任何情况下,投标人都有义务保证其递交的加密的电子投标文件和不加密的电子投标文件(U盘备份)内容一致,否则造成的后果由投标人自行承担。", - "4.3": "投标文件的修改与撤回", - "4.3.1": "在送交投标文件截止期以前,投标人可以更改或撤回投标文件,并按本章第项的规定操作。", - "4.3.2": "送交投标文件截止期以后,投标文件不得更改。需对投标文件做出澄清时,必须按照本须知第23条的规定办理。", - "4.3.4": "如果在送交投标文件截止期以后且投标文件有效期内撤回投标文件,则按本须知第3.4.4款的规定不予退还其投标担保。", -} +def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3): + time.sleep(8) # 模拟任务耗时20秒 + return {"invalid_requirements": "Invalid requirements"} -# 目标值 -target_values = ["投标文件","投标"] +def fetch_bidding_documents_requirements(clause_path): + time.sleep(10) # 模拟任务耗时25秒 + return {"bidding_documents_requirements": "Bidding documents requirements"} -# 提取数据 -extracted_data = extract_json(data, target_values) +def fetch_bid_opening(clause_path): + time.sleep(12) # 模拟任务耗时30秒 + return {"opening_bid": "Opening bid"} -# 打印结果 -print(json.dumps(extracted_data, indent=4, ensure_ascii=False)) +def transform_json_values(data): + # 假设这个函数对数据进行某些转换,简单返回数据 + return data + +def get_global_logger(unique_id): + import logging + logger = logging.getLogger(unique_id) + return logger + +def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id): + # 模拟返回处理后的数据 + return { + 'knowledge_name': 'KnowledgeName_' + unique_id, + 'truncate0': 'truncate0', + 'truncate1': 'truncate1', + 'truncate3': 'truncate3', + 'truncate0_jsonpath': 'truncate0_jsonpath', + 'clause_path': 'clause_path', + 'invalid_docpath': 'invalid_docpath', + 'input_file_path': 'input_file_path', + 'output_folder': output_folder + } + +def deleteKnowledge(knowledge_index): + # 模拟删除操作 + pass + +def main_process(output_folder, downloaded_file_path, file_type, unique_id): + global logger + logger = get_global_logger(unique_id) + + # 预处理文件,获取处理后的数据 + processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id) + if not processed_data: + yield json.dumps({}) # 如果处理数据失败,返回空的 JSON + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = { + 'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'], + processed_data['truncate0'], output_folder, + processed_data['clause_path']), + 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], + processed_data['truncate3'], + processed_data['knowledge_name'], + processed_data['truncate0_jsonpath'], + processed_data['clause_path'], processed_data['input_file_path'], + processed_data['output_folder']), + 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), + 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'], + output_folder, processed_data['truncate0_jsonpath'], + processed_data['clause_path'], processed_data['truncate3']), + 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, + processed_data['clause_path']), + 'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path']) + } + + # 按照任务完成的顺序返回结果 + for future in concurrent.futures.as_completed(futures.values()): + key = next(k for k, v in futures.items() if v == future) + try: + result = future.result() + + # 记录哪个任务完成了 + logger.info(f"Task {key} completed.") + + # 处理 evaluation_standards 返回的技术标和商务标分别作为两个结果 + if key == 'evaluation_standards': + technical_standards = result["technical_standards"] + commercial_standards = result["commercial_standards"] + + # 分别发送技术标和商务标 + modified_technical_result = transform_json_values({"technical_standards": technical_standards}) + modified_commercial_result = transform_json_values({"commercial_standards": commercial_standards}) + + yield json.dumps(modified_technical_result, ensure_ascii=False) # 直接返回 JSON 数据 + yield json.dumps(modified_commercial_result, ensure_ascii=False) # 直接返回 JSON 数据 + + sys.stdout.flush() # 强制刷新流式数据 + + else: + modified_result = transform_json_values({key: result}) + yield json.dumps(modified_result, ensure_ascii=False) # 直接返回 JSON 数据 + + sys.stdout.flush() # 强制刷新流式数据 + + except Exception as exc: + logger.error(f"Error processing {key}: {exc}") + yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) + sys.stdout.flush() + + # 删除知识索引 + deleteKnowledge(processed_data['knowledge_name']) + + +# 调用主函数,模拟执行 diff --git a/flask_app/main/商务标技术标整合.py b/flask_app/main/商务标技术标整合.py index 618a963..c979c66 100644 --- a/flask_app/main/商务标技术标整合.py +++ b/flask_app/main/商务标技术标整合.py @@ -106,8 +106,8 @@ def combine_evaluation_standards(truncate2): # update_json=combine_technical_and_business(clean_json_string(evaluation_res),target_values1,target_values2) update_json = combine_technical_and_business(clean_json_string(evaluation_res), target_values1) evaluation_combined_res = json.dumps(update_json,ensure_ascii=False,indent=4) - return evaluation_combined_res - # return update_json #商务标技术标整合 + # return evaluation_combined_res + return update_json #商务标技术标整合 if __name__ == "__main__": truncate2="C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest20_evaluation_method.pdf" res=combine_evaluation_standards(truncate2) diff --git a/flask_app/main/投标人须知正文提取指定内容.py b/flask_app/main/投标人须知正文提取指定内容.py index 131fc2b..39c5a3a 100644 --- a/flask_app/main/投标人须知正文提取指定内容.py +++ b/flask_app/main/投标人须知正文提取指定内容.py @@ -145,10 +145,10 @@ def extract_from_notice(clause_path, type): # 假设原始数据文件路径 if __name__ == "__main__": - file_path = 'D:\\flask_project\\flask_app\\static\\output\\cfd4959d-5ea9-4112-8b50-9e543803f029\\clause1.json' + file_path = 'C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\tmp\\clause1.json' # file_path='C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\clause1.json' try: - res = extract_from_notice(file_path, 3) # 可以改变此处的 type 参数测试不同的场景 + res = extract_from_notice(file_path, 2) # 可以改变此处的 type 参数测试不同的场景 res2=json.dumps(res,ensure_ascii=False,indent=4) print(res2) except ValueError as e: diff --git a/flask_app/main/投标人须知正文条款提取成json文件.py b/flask_app/main/投标人须知正文条款提取成json文件.py index 7d86235..ac36918 100644 --- a/flask_app/main/投标人须知正文条款提取成json文件.py +++ b/flask_app/main/投标人须知正文条款提取成json文件.py @@ -80,8 +80,6 @@ def parse_text_by_heading(text): current_key = None current_content = [] append_newline = False - sub_key_count = 0 # 用于生成二级标题的计数器 - third_key_count = 0 # 用于生成三级标题的计数器 lines = text.split('\n') for i, line in enumerate(lines): @@ -98,48 +96,26 @@ def parse_text_by_heading(text): if current_key is None or (compare_headings(current_key, new_key) and ( len(current_content) == 0 or current_content[-1][-1] != '第')): if current_key is not None: - # 将之前的内容保存到 data 中 + # 将之前的内容保存到data中,保留第一个换行符,后续的换行符转为空字符 content_string = ''.join(current_content).strip() data[current_key] = content_string.replace(' ', '') current_key = new_key current_content = [line_content] - # 初始化二级和三级标题的计数器 - sub_key_count = 0 - third_key_count = 0 + # 只有当标题级别为两级(如 1.1)时,才设置 append_newline 为 True append_newline = len(new_key.split('.')) == 2 else: append_newline = handle_content_append(current_content, line_content, append_newline, keywords) else: - # 匹配子序号 '1. 2.' 生成三级标题 - sub_match = re.match(r'^(\d+)[\.\、]\s*(.*)$', line_stripped) - if sub_match: - sub_number, sub_content = sub_match.groups() - # 生成三级标题 key,如 5.0.1, 5.0.2 等 - if current_key: - if sub_key_count == 0: - # 生成二级标题,如 5.0 - sub_key_count += 1 - sub_key = f"{current_key}.0" - data[sub_key] = ''.join(current_content).strip() - current_content = [] # 重置内容以生成新的内容 - third_key_count += 1 - sub_key_with_third = f"{current_key}.0.{third_key_count}" - data[sub_key_with_third] = sub_content.strip() - continue - else: - # 处理一般行内容 - if line_stripped: - append_newline = handle_content_append(current_content, line_stripped, append_newline, keywords) + if line_stripped: + append_newline = handle_content_append(current_content, line_stripped, append_newline, keywords) if current_key is not None: # 保存最后一部分内容 content_string = ''.join(current_content).strip() - if content_string: - data[current_key] = content_string.replace(' ', '') + data[current_key] = content_string.replace(' ', '') return data - def convert_to_json(file_path, start_word, end_phrases): if file_path.endswith('.docx'): text = extract_text_from_docx(file_path) @@ -171,9 +147,53 @@ def convert_clause_to_json(input_path,output_folder,type=1): output_path = os.path.join(output_folder, file_name) with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, indent=4, ensure_ascii=False) + post_process_json(output_path) print(f"投标人须知正文条款提取成json文件: The data has been processed and saved to '{output_path}'.") return output_path +def post_process_json(json_file_path): #处理一级标题如'5.1'过长的内容 zbtest20 + # 读取 JSON 文件 + with open(json_file_path, 'r', encoding='utf-8') as file: + data = json.load(file) + + processed_data = {} + + for key, value in data.items(): + # 检查是否是一级标题(如 '5.'),并且其值包含 '\n' + if re.match(r'^\d+\.\s*$', key) and '\n' in value: + # 分割标题和正文 + title, content = value.split('\n', 1) + + # 添加原来的标题作为 '5.0',其值为原来标题的内容(即 title) + processed_data[key] = title.strip() + sub_key = f"{key.rstrip('.')}." + "0" # 自动生成 '5.0',与 '5.' 一致,保证点号的存在 + + processed_data[sub_key] = title.strip() + + # 初始化计数器 + sub_count = 1 + + # 根据子序号 '1.' 或 '1、' 进行分割 + sub_sections = re.split(r'(\d+[\.\、])\s*', content) + + current_sub_content = "" + for i in range(1, len(sub_sections), 2): + sub_number = sub_sections[i].strip() # 获取子序号 + sub_content = sub_sections[i + 1].strip() # 获取内容 + + # 生成三级标题,如 '5.0.1', '5.0.2' + sub_key_with_number = f"{sub_key}.{sub_count}" + processed_data[sub_key_with_number] = sub_content + sub_count += 1 + + else: + # 如果没有分割需求,保留原数据 + processed_data[key] = value + + # 将修改后的数据重新写入到原来的 JSON 文件中 + with open(json_file_path, 'w', encoding='utf-8') as file: + json.dump(processed_data, file, ensure_ascii=False, indent=4) + if __name__ == "__main__": # file_path = 'D:\\flask_project\\flask_app\\static\\output\\cfd4959d-5ea9-4112-8b50-9e543803f029\\ztbfile_tobidders_notice.pdf' file_path='C:\\Users\\Administrator\\Desktop\\招标文件\\招标test文件夹\\zbtest20_tobidders_notice.pdf' diff --git a/flask_app/main/招标文件解析.py b/flask_app/main/招标文件解析.py index 58c1bc5..6a04761 100644 --- a/flask_app/main/招标文件解析.py +++ b/flask_app/main/招标文件解析.py @@ -2,8 +2,9 @@ import json import logging import os +import sys import time - +from concurrent.futures import ThreadPoolExecutor from flask_app.main.截取pdf import truncate_pdf_multiple from flask_app.main.table_content_extraction import extract_tables_main from flask_app.main.知识库操作 import addfileToKnowledge, deleteKnowledge @@ -27,9 +28,12 @@ def get_global_logger(unique_id): logger=None # 可能有问题:pdf转docx导致打勾符号消失 +# 创建全局线程池 +executor = ThreadPoolExecutor() def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id): logger.info("starting 文件预处理...") - logger.info("output_folder..."+output_folder) + logger.info("output_folder..." + output_folder) + # 根据文件类型处理文件路径 if file_type == 1: # docx docx_path = downloaded_file_path @@ -38,42 +42,40 @@ def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id): pdf_path = downloaded_file_path docx_path = pdf2docx(pdf_path) # 将pdf转换为docx以供上传到知识库 else: - # 如果文件类型不是预期中的1或2,记录错误并返回None logger.error("Unsupported file type provided. Preprocessing halted.") return None - # 上传知识库 - knowledge_name = "招标解析" + unique_id - index = addfileToKnowledge(docx_path, knowledge_name) + # 异步上传知识库 + future_knowledge = executor.submit(addfileToKnowledge, docx_path, "招标解析" + unique_id) # 调用截取PDF多次 - truncate_files = truncate_pdf_multiple(pdf_path, output_folder) # [前附表, 评标办法, 须知正文, 资格审查条件] + truncate_files = truncate_pdf_multiple(pdf_path, output_folder) # 处理各个部分 truncate0_docpath = pdf2docx(truncate_files[0]) # 投标人须知前附表转docx - - invalid_docpath = copy_docx(docx_path) #docx截取无效标部分 - - truncate_jsonpath=extract_tables_main(truncate0_docpath, output_folder) # 投标人须知前附表docx->json,从表格提取数据 - truncate0 = truncate_files[0] #投标人须知前附表 - truncate1 = truncate_files[1] #评标办法前附表 - truncate3 = truncate_files[3] #资格审查表 + invalid_docpath = copy_docx(docx_path) # docx截取无效标部分 + truncate_jsonpath = extract_tables_main(truncate0_docpath, output_folder) # 投标人须知前附表docx->json + truncate0 = truncate_files[0] + truncate1 = truncate_files[1] + truncate3 = truncate_files[3] clause_path = convert_clause_to_json(truncate_files[2], output_folder) # 投标人须知正文条款pdf->json + logger.info("文件预处理done") + # 提前返回,不等待 future_knowledge 完成,返回包含 Future 对象 return { - 'input_file_path':downloaded_file_path, - 'output_folder':output_folder, + 'input_file_path': downloaded_file_path, + 'output_folder': output_folder, 'truncate0': truncate0, 'truncate1': truncate1, 'truncate3': truncate3, - 'knowledge_index': index, - 'knowledge_name': knowledge_name, + 'knowledge_future': future_knowledge, # 返回 Future 对象 'truncate0_jsonpath': truncate_jsonpath, 'clause_path': clause_path, 'invalid_docpath': invalid_docpath } + def post_processing(data,includes): # 初始化结果字典,预设'其他'分类为空字典 result = {"其他": {}} @@ -216,61 +218,76 @@ def main_processing(output_folder, downloaded_file_path, file_type, unique_id): # 预处理文件,获取处理后的数据 processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id) if not processed_data: - yield "data: {}\n\n" # 如果处理数据失败,返回空 + yield json.dumps({}) # 如果处理数据失败,返回空的 JSON with concurrent.futures.ThreadPoolExecutor() as executor: - # 创建任务字典 + # 立即启动不依赖 knowledge_name 和 index 的任务 futures = { - 'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'], - processed_data['truncate0'], output_folder, - processed_data['clause_path']), - 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], - processed_data['truncate3'], - processed_data['knowledge_name'], processed_data['truncate0_jsonpath'], - processed_data['clause_path'], processed_data['input_file_path'], - processed_data['output_folder']), - 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), # 并行获取技术标和商务标 + 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'], output_folder, processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['truncate3']), - 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, - processed_data['clause_path']), + 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, processed_data['clause_path']), 'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path']) } - # 每提交一个任务暂停1秒,确保任务逐个提交 - for task_name, future in futures.items(): - time.sleep(1) # 可以保留这个暂停 - - # 按照任务完成的顺序返回结果 + # 提前处理这些不依赖的任务,按完成顺序返回 for future in concurrent.futures.as_completed(futures.values()): key = next(k for k, v in futures.items() if v == future) try: - # 获取任务执行结果 result = future.result() - # 处理 evaluation_standards 返回的技术标和商务标分别作为两个结果 + # 如果是 evaluation_standards,拆分技术标和商务标 if key == 'evaluation_standards': technical_standards = result["technical_standards"] commercial_standards = result["commercial_standards"] - # 分别发送技术标和商务标 - modified_technical_result = transform_json_values({"technical_standards": technical_standards}) - modified_commercial_result = transform_json_values({"commercial_standards": commercial_standards}) + # 分别返回技术标和商务标 + yield json.dumps(transform_json_values({'technical_standards': technical_standards}), ensure_ascii=False) + yield json.dumps(transform_json_values({'commercial_standards': commercial_standards}), ensure_ascii=False) - yield f"data: {json.dumps(modified_technical_result, ensure_ascii=False)}\n\n" - yield f"data: {json.dumps(modified_commercial_result, ensure_ascii=False)}\n\n" else: - # 对非 evaluation_standards 的结果进行 JSON 转换和修改 - modified_result = transform_json_values({key: result}) - yield f"data: {json.dumps(modified_result, ensure_ascii=False)}\n\n" + # 处理其他任务的结果 + yield json.dumps(transform_json_values({key: result}), ensure_ascii=False) + except Exception as exc: logger.error(f"Error processing {key}: {exc}") - # 返回错误信息 - yield f"data: {json.dumps({'error': f'Error processing {key}: {str(exc)}'})}\n\n" + yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) + + # 只有在需要 knowledge_name 和 index 时才等待 future_knowledge 完成 + try: + knowledge_name = "招标解析" + unique_id + index = processed_data['knowledge_future'].result() # 阻塞等待知识库上传任务完成 + + # 提交依赖 knowledge_name 和 index 的任务 + future_dependencies = { + 'base_info': executor.submit(fetch_project_basic_info, knowledge_name, processed_data['truncate0'], + output_folder, processed_data['clause_path']), + 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], + processed_data['truncate3'], knowledge_name, + processed_data['truncate0_jsonpath'], + processed_data['clause_path'], processed_data['input_file_path'], + processed_data['output_folder']), + } + + # 按完成顺序返回依赖任务的结果 + for future in concurrent.futures.as_completed(future_dependencies.values()): + key = next(k for k, v in future_dependencies.items() if v == future) + try: + result = future.result() + yield json.dumps(transform_json_values({key: result}), ensure_ascii=False) + except Exception as exc: + logger.error(f"Error processing {key}: {exc}") + yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) + + except Exception as e: + logger.error(f"Error uploading to knowledge base: {e}") + yield json.dumps({'error': f'Knowledge upload failed: {str(e)}'}, ensure_ascii=False) # 删除知识索引 - deleteKnowledge(processed_data['knowledge_index']) + deleteKnowledge(index) + + #TODO:流式输出要改商务标技术标整合 if __name__ == "__main__":