From 89d665adae6132144c13bdebccc30ada6410b4a6 Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Mon, 25 Nov 2024 10:13:39 +0800 Subject: [PATCH] =?UTF-8?q?11.24=20=E9=80=9F=E7=8E=87=E9=99=90=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/ConnectionLimiter.py | 26 +++---- flask_app/main/工程标解析main.py | 108 +++++++++++++++++++---------- flask_app/routes/upload.py | 21 +++--- flask_app/货物标/货物标解析main.py | 88 ++++++++++++++++------- 4 files changed, 152 insertions(+), 91 deletions(-) diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py index c7ddfb8..e4d77ff 100644 --- a/flask_app/ConnectionLimiter.py +++ b/flask_app/ConnectionLimiter.py @@ -1,31 +1,21 @@ -from threading import Semaphore +# flask_app/ConnectionLimiter.py +import threading from functools import wraps -from flask import jsonify, current_app class ConnectionLimiter: - _instance = None - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - def __init__(self, max_connections=10): - if not hasattr(self, 'semaphore'): - self.semaphore = Semaphore(max_connections) + self.semaphore = threading.Semaphore(max_connections) def limit_connections(self, f): + """装饰器:限制并发连接数""" + @wraps(f) - def decorated_function(*args, **kwargs): - if not self.semaphore.acquire(blocking=False): - return jsonify({ - 'error': 'Server is busy. Maximum number of concurrent connections reached.', - 'code': 429 - }), 429 + def wrapped(*args, **kwargs): + self.semaphore.acquire() try: return f(*args, **kwargs) finally: self.semaphore.release() - return decorated_function \ No newline at end of file + return wrapped diff --git a/flask_app/main/工程标解析main.py b/flask_app/main/工程标解析main.py index 1a630f1..275c43a 100644 --- a/flask_app/main/工程标解析main.py +++ b/flask_app/main/工程标解析main.py @@ -98,35 +98,48 @@ def preprocess_files(output_folder, downloaded_file_path, file_type,unique_id,lo # 基本信息 -def fetch_project_basic_info(invalid_path, merged_baseinfo_path, merged_baseinfo_path_more,tobidders_notice, clause_path, logger): +def fetch_project_basic_info(invalid_path, merged_baseinfo_path, merged_baseinfo_path_more, tobidders_notice, clause_path, logger): logger.info("starting 基础信息...") start_time = time.time() - if not merged_baseinfo_path: - merged_baseinfo_path = invalid_path - if not merged_baseinfo_path_more: - merged_baseinfo_path_more=invalid_path - if not tobidders_notice: - tobidders_notice = invalid_path - basic_res = combine_basic_info(merged_baseinfo_path,merged_baseinfo_path_more,tobidders_notice, clause_path) + try: + if not merged_baseinfo_path: + merged_baseinfo_path = invalid_path + if not merged_baseinfo_path_more: + merged_baseinfo_path_more = invalid_path + if not tobidders_notice: + tobidders_notice = invalid_path + basic_res = combine_basic_info(merged_baseinfo_path, merged_baseinfo_path_more, tobidders_notice, clause_path) + result = basic_res + except Exception as exc: + logger.error(f"Error in 基础信息: {exc}") + # 返回默认值 + result = {"基础信息": {}} end_time = time.time() logger.info(f"基础信息 done,耗时:{end_time - start_time:.2f} 秒") - return basic_res + return result -# 形式、响应、资格评审 -def fetch_qualification_review(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_path, merged_baseinfo_path,notice_path,logger): +def fetch_qualification_review(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_path, merged_baseinfo_path, notice_path, logger): logger.info("starting 资格审查...") start_time = time.time() - if not notice_path: - notice_path=invalid_path - if not evaluation_method: - evaluation_method = invalid_path - if not merged_baseinfo_path: - merged_baseinfo_path = invalid_path - review_standards_res = combine_review_standards(evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_path, merged_baseinfo_path,notice_path) + try: + if not notice_path: + notice_path = invalid_path + if not evaluation_method: + evaluation_method = invalid_path + if not merged_baseinfo_path: + merged_baseinfo_path = invalid_path + review_standards_res = combine_review_standards( + evaluation_method, qualification, output_folder, tobidders_notice_table, clause_path, invalid_path, merged_baseinfo_path, notice_path + ) + result = {"资格审查": review_standards_res} + except Exception as exc: + logger.error(f"Error in 资格审查: {exc}") + # 返回默认值 + result = {"资格审查": {}} end_time = time.time() logger.info(f"资格审查 done,耗时:{end_time - start_time:.2f} 秒") - return review_standards_res + return result # 评分细则 流式 @@ -147,38 +160,56 @@ def fetch_evaluation_standards(invalid_path, evaluation_method,logger): # 无效、废标项解析 -def fetch_invalid_requirements(invalid_docpath, output_folder,logger): +def fetch_invalid_requirements(invalid_docpath, output_folder, logger): logger.info("starting 无效标与废标...") start_time = time.time() - find_invalid_res = combine_find_invalid(invalid_docpath, output_folder) + try: + find_invalid_res = combine_find_invalid(invalid_docpath, output_folder) + result = find_invalid_res + except Exception as exc: + logger.error(f"Error in 无效标与废标: {exc}") + # 返回默认值 + result = {"无效标与废标": {}} end_time = time.time() logger.info(f"无效标与废标 done,耗时:{end_time - start_time:.2f} 秒") - return find_invalid_res + return result # 投标文件要求 -def fetch_bidding_documents_requirements(invalid_path, merged_baseinfo_path_more,clause_path,logger): +def fetch_bidding_documents_requirements(invalid_path, merged_baseinfo_path_more, clause_path, logger): logger.info("starting 投标文件要求...") start_time = time.time() - if not merged_baseinfo_path_more: - merged_baseinfo_path_more=invalid_path - selection = 1 - fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection) + try: + if not merged_baseinfo_path_more: + merged_baseinfo_path_more = invalid_path + selection = 1 + fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection) + result = {"投标文件要求": fetch_bidding_documents_requirements_json} + except Exception as exc: + logger.error(f"Error in 投标文件要求: {exc}") + # 返回默认值 + result = {"投标文件要求": {}} end_time = time.time() logger.info(f"投标文件要求 done,耗时:{end_time - start_time:.2f} 秒") - return {"投标文件要求": fetch_bidding_documents_requirements_json} + return result # 开评定标流程 -def fetch_bid_opening(invalid_path, merged_baseinfo_path_more,clause_path,logger): +def fetch_bid_opening(invalid_path, merged_baseinfo_path_more, clause_path, logger): logger.info("starting 开评定标流程...") start_time = time.time() - if not merged_baseinfo_path_more: - merged_baseinfo_path_more=invalid_path - selection = 2 - fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection) + try: + if not merged_baseinfo_path_more: + merged_baseinfo_path_more = invalid_path + selection = 2 + fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path_more, clause_path, selection) + result = {"开评定标流程": fetch_bid_opening_json} + except Exception as exc: + logger.error(f"Error in 开评定标流程: {exc}") + # 返回默认值 + result = {"开评定标流程": {}} end_time = time.time() logger.info(f"开评定标流程 done,耗时:{end_time - start_time:.2f} 秒") - return {"开评定标流程": fetch_bid_opening_json} + return result #分段返回 @@ -226,7 +257,14 @@ def engineering_bid_main(output_folder, downloaded_file_path, file_type, unique_ except Exception as exc: logger.error(f"Error processing {key}: {exc}") - yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) + if key == 'evaluation_standards': + # 返回默认的商务评分和技术评分 + default_evaluation = { + 'technical_standards': {"技术评分": ""}, + 'commercial_standards': {"商务评分": ""} + } + yield json.dumps(default_evaluation, ensure_ascii=False) + # yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) #TODO:基本信息,判断是否这里,打勾逻辑取消了。 if __name__ == "__main__": diff --git a/flask_app/routes/upload.py b/flask_app/routes/upload.py index c1c9ab8..d5ec922 100644 --- a/flask_app/routes/upload.py +++ b/flask_app/routes/upload.py @@ -22,20 +22,16 @@ def require_connection_limit(): @wraps(f) def wrapped(*args, **kwargs): limiter = current_app.connection_limiter - return limiter.limit_connections(f)(*args, **kwargs) + limiter.semaphore.acquire() # 阻塞式获取信号量 + try: + return f(*args, **kwargs) + finally: + limiter.semaphore.release() return wrapped return decorator @upload_bp.route('/upload', methods=['POST']) +@require_connection_limit() def zbparse(): - # 获取当前应用的 connection_limiter - limiter = current_app.connection_limiter - - if not limiter.semaphore.acquire(blocking=False): - return jsonify({ - 'error': 'Server is busy. Maximum number of concurrent connections reached.', - 'code': 429 - }), 429 - try: logger = g.logger logger.info("zbparse start!!!") @@ -55,8 +51,9 @@ def zbparse(): except Exception as e: logger.error('Exception occurred: ' + str(e)) return jsonify({'error': str(e)}), 500 - finally: - limiter.semaphore.release() + except Exception as e: + logger.error('Unexpected exception: ' + str(e)) + return jsonify({'error': 'Internal server error'}), 500 def process_and_stream(file_url, zb_type): """ diff --git a/flask_app/货物标/货物标解析main.py b/flask_app/货物标/货物标解析main.py index 3a670e7..ac689ea 100644 --- a/flask_app/货物标/货物标解析main.py +++ b/flask_app/货物标/货物标解析main.py @@ -78,28 +78,39 @@ def preprocess_files(output_folder, file_path, file_type,logger): 'merged_baseinfo_path': merged_baseinfo_path } -def fetch_project_basic_info(invalid_path,merged_baseinfo_path, procurement_path, clause_path,logger): +def fetch_project_basic_info(invalid_path, merged_baseinfo_path, procurement_path, clause_path, logger): logger.info("starting 基础信息...") start_time = time.time() - if not merged_baseinfo_path: - merged_baseinfo_path = invalid_path - if not procurement_path: - procurement_path=invalid_path - basic_res = combine_basic_info(merged_baseinfo_path, procurement_path,clause_path,invalid_path) - base_info, good_list = post_process_baseinfo(basic_res,logger) + try: + if not merged_baseinfo_path: + merged_baseinfo_path = invalid_path + if not procurement_path: + procurement_path = invalid_path + basic_res = combine_basic_info(merged_baseinfo_path, procurement_path, clause_path, invalid_path) + base_info, good_list = post_process_baseinfo(basic_res, logger) + result = base_info, good_list + except Exception as exc: + logger.error(f"Error in 基础信息: {exc}") + # 返回默认值 + result = {"基础信息": {}}, [] end_time = time.time() logger.info(f"基础信息 done,耗时:{end_time - start_time:.2f} 秒") - return base_info, good_list + return result -def fetch_qualification_review(invalid_path,qualification_path, notice_path,logger): +def fetch_qualification_review(invalid_path, qualification_path, notice_path, logger): logger.info("starting 资格审查...") start_time = time.time() - review_standards_res = combine_qualification_review(invalid_path,qualification_path, notice_path) + try: + review_standards_res = combine_qualification_review(invalid_path, qualification_path, notice_path) + result = {"资格审查": review_standards_res} + except Exception as exc: + logger.error(f"Error in 资格审查: {exc}") + # 返回默认值 + result = {"资格审查": {}} end_time = time.time() logger.info(f"资格审查 done,耗时:{end_time - start_time:.2f} 秒") - return review_standards_res - + return result def fetch_evaluation_standards(invalid_path, evaluation_method_path,logger): logger.info("starting 商务评分和技术评分...") @@ -116,37 +127,55 @@ def fetch_evaluation_standards(invalid_path, evaluation_method_path,logger): "commercial_standards": commercial_standards } -def fetch_invalid_requirements(invalid_docpath, output_folder,logger): +def fetch_invalid_requirements(invalid_docpath, output_folder, logger): logger.info("starting 无效标与废标...") start_time = time.time() - find_invalid_res = combine_find_invalid(invalid_docpath, output_folder) + try: + find_invalid_res = combine_find_invalid(invalid_docpath, output_folder) + result = find_invalid_res + except Exception as exc: + logger.error(f"Error in 无效标与废标: {exc}") + # 返回默认值 + result = {"无效标与废标": {}} end_time = time.time() logger.info(f"无效标与废标 done,耗时:{end_time - start_time:.2f} 秒") - return find_invalid_res + return result -def fetch_bidding_documents_requirements(invalid_path,merged_baseinfo_path,clause_path,logger): +def fetch_bidding_documents_requirements(invalid_path, merged_baseinfo_path, clause_path, logger): logger.info("starting 投标文件要求...") if not merged_baseinfo_path: - merged_baseinfo_path=invalid_path + merged_baseinfo_path = invalid_path start_time = time.time() - selection=1 - fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path,clause_path, selection) + selection = 1 + try: + fetch_bidding_documents_requirements_json = extract_from_notice(merged_baseinfo_path, clause_path, selection) + result = {"投标文件要求": fetch_bidding_documents_requirements_json} + except Exception as exc: + logger.error(f"Error in 投标文件要求: {exc}") + # 返回默认值,假设默认值为一个空字典 + result = {"投标文件要求": {}} end_time = time.time() logger.info(f"投标文件要求 done,耗时:{end_time - start_time:.2f} 秒") - return {"投标文件要求": fetch_bidding_documents_requirements_json} + return result # 开评定标流程 -def fetch_bid_opening(invalid_path,merged_baseinfo_path,clause_path,logger): +def fetch_bid_opening(invalid_path, merged_baseinfo_path, clause_path, logger): logger.info("starting 开评定标流程...") if not merged_baseinfo_path: - merged_baseinfo_path=invalid_path + merged_baseinfo_path = invalid_path start_time = time.time() - selection=2 - fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path,clause_path, selection) + selection = 2 + try: + fetch_bid_opening_json = extract_from_notice(merged_baseinfo_path, clause_path, selection) + result = {"开评定标流程": fetch_bid_opening_json} + except Exception as exc: + logger.error(f"Error in 开评定标流程: {exc}") + # 返回默认值,假设默认值为一个空字典 + result = {"开评定标流程": {}} end_time = time.time() logger.info(f"开评定标流程 done,耗时:{end_time - start_time:.2f} 秒") - return {"开评定标流程": fetch_bid_opening_json} + return result def post_process_baseinfo(base_info,logger): @@ -230,7 +259,14 @@ def goods_bid_main(output_folder, file_path, file_type, unique_id): yield json.dumps({key: transform_json_values(result)}, ensure_ascii=False) except Exception as exc: logger.error(f"Error processing {key}: {exc}") - yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) + if key == 'evaluation_standards': + # 返回默认的商务评分和技术评分 + default_evaluation = { + 'technical_standards': {"技术评分": ""}, + 'commercial_standards': {"商务评分": ""} + } + yield json.dumps(default_evaluation, ensure_ascii=False) + # yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) if collected_good_list is not None: yield json.dumps({'good_list': transform_json_values(collected_good_list)}, ensure_ascii=False)