From d11e4886407585630ed494edaed78cfe038818af Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Mon, 2 Dec 2024 11:10:49 +0800 Subject: [PATCH] =?UTF-8?q?12.2=20=E6=B8=85=E7=90=86bug=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/ConnectionLimiter.py | 76 ++++++++++++++++++++---------- flask_app/routes/get_deviation.py | 11 ++--- flask_app/routes/little_zbparse.py | 8 ++-- flask_app/routes/upload.py | 9 ++-- flask_app/routes/utils.py | 6 ++- flask_app/start_up.py | 5 +- 6 files changed, 66 insertions(+), 49 deletions(-) diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py index 0833ebc..ffb28e4 100644 --- a/flask_app/ConnectionLimiter.py +++ b/flask_app/ConnectionLimiter.py @@ -1,31 +1,40 @@ # flask_app/ConnectionLimiter.py import threading -import time +import logging from functools import wraps -from flask import current_app, jsonify, g, request, Response, stream_with_context +from flask import jsonify, request, Response, stream_with_context, current_app, g + +# 配置模块级别的日志记录器 +logger = logging.getLogger(__name__) class ExecutionTimeoutMonitor: """监控请求执行时间,超时后释放信号量""" - def __init__(self, timeout, semaphore): + def __init__(self, timeout, semaphore, logger): self.timeout = timeout self.semaphore = semaphore self.is_timeout = False + self.stopped = threading.Event() + self.logger = logger self.thread = threading.Thread(target=self._monitor) self.thread.daemon = True def _monitor(self): """等待指定时间后标记为超时并释放信号量""" - time.sleep(self.timeout) - self.is_timeout = True - self.semaphore.release() # 超时后释放信号量 - current_app.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") + if not self.stopped.wait(self.timeout): + self.is_timeout = True + self.semaphore.release() # 超时后释放信号量 + self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") def start(self): self.thread.start() + def stop(self): + self.stopped.set() + self.thread.join() + class ConnectionLimiter: - def __init__(self, max_connections=1): + def __init__(self, max_connections=10): self.semaphore = threading.Semaphore(max_connections) def require_connection_limit(timeout=900): @@ -33,55 +42,72 @@ def require_connection_limit(timeout=900): def decorator(f): @wraps(f) def wrapped(*args, **kwargs): + # 统一获取 logger 对象 + logger_to_use = getattr(g, 'logger', logger) + blueprint = request.blueprint - limiter = current_app.connection_limiters.get(blueprint, current_app.connection_limiters.get('default')) + limiter = current_app.connection_limiters.get( + blueprint, current_app.connection_limiters.get('default')) if limiter is None: - current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") + logger_to_use.error( + f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") return jsonify({'error': '服务器配置错误'}), 500 acquired = limiter.semaphore.acquire(blocking=True) if not acquired: - current_app.logger.warning("并发连接过多") + logger_to_use.warning("并发连接过多") return jsonify({'error': '并发连接过多'}), 429 # 启动执行超时监控器 - monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore) + monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use) monitor.start() - # 将限流器和监控器存储在 g 中,以便在 teardown_request 中释放信号量 - g.limiter = limiter - g.monitor = monitor - try: result = f(*args, **kwargs) if isinstance(result, Response): - # 对于普通的 Response 对象,不需要额外处理 + # 对于 Response 对象,使用 call_on_close 释放信号量 + def release_semaphore(): + if not monitor.is_timeout: + limiter.semaphore.release() + monitor.stop() + logger_to_use.info("Response Object 正确释放") + + result.call_on_close(release_semaphore) return result - elif hasattr(result, '__iter__'): - # 如果返回的是生成器,按原有逻辑处理 + elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)): + # 处理生成器响应 @stream_with_context def generator_wrapper(): try: for item in result: if monitor.is_timeout: - current_app.logger.error("请求执行时间超过限制并被终止。") + logger_to_use.error("请求执行时间超过限制并被终止。") break yield item + except GeneratorExit: + # 客户端断开连接 + logger_to_use.info("客户端断开连接") finally: if not monitor.is_timeout: limiter.semaphore.release() - + monitor.stop() + logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器") return Response(generator_wrapper(), mimetype='text/event-stream') - else: - # 对于其他类型的响应,直接返回 + # 对于其他类型的响应,返回前释放信号量 + if not monitor.is_timeout: + limiter.semaphore.release() + monitor.stop() + logger_to_use.info("Semaphore released for non-Response, non-generator result") return result except Exception as e: - limiter.semaphore.release() - current_app.logger.error(f"路由处理异常: {e}") + if not monitor.is_timeout: + limiter.semaphore.release() + monitor.stop() + logger_to_use.error(f"路由处理异常: {e}") return jsonify({'error': '内部服务器错误'}), 500 return wrapped diff --git a/flask_app/routes/get_deviation.py b/flask_app/routes/get_deviation.py index 2fbaba2..629f09b 100644 --- a/flask_app/routes/get_deviation.py +++ b/flask_app/routes/get_deviation.py @@ -5,23 +5,18 @@ from flask import Blueprint, request, jsonify, Response, stream_with_context, g import json import os from flask_app.main.download import download_file -from flask_app.general.post_processing import outer_post_processing from flask_app.general.接口_技术偏离表 import get_tech_and_business_deviation from flask_app.routes.utils import generate_deviation_response, validate_request, validate_and_setup_logger from flask_app.ConnectionLimiter import require_connection_limit get_deviation_bp = Blueprint('get_deviation', __name__) @get_deviation_bp.route('/get_deviation', methods=['POST']) -@require_connection_limit(timeout=700) +@require_connection_limit(timeout=720) @validate_and_setup_logger def get_deviation(): logger = g.logger unique_id = g.unique_id - validation = validate_request() - - if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str): - file_url, zb_type = validation - else: - return validation # 错误响应 + file_url = g.file_url + zb_type = g.zb_type try: logger.info("开始解析 URL: " + file_url) diff --git a/flask_app/routes/little_zbparse.py b/flask_app/routes/little_zbparse.py index aef5f43..32ebe7e 100644 --- a/flask_app/routes/little_zbparse.py +++ b/flask_app/routes/little_zbparse.py @@ -7,17 +7,17 @@ from flask import Blueprint, request, jsonify, Response, g from flask_app.ConnectionLimiter import require_connection_limit from flask_app.main.download import download_file from flask_app.general.接口_小解析 import little_parse_main -from flask_app.routes.utils import validate_request, validate_and_setup_logger +from flask_app.routes.utils import validate_and_setup_logger little_zbparse_bp = Blueprint('little_zbparse', __name__) @little_zbparse_bp.route('/little_zbparse', methods=['POST']) -@require_connection_limit(timeout=300) @validate_and_setup_logger +@require_connection_limit(timeout=300) def little_zbparse(): logger = g.logger - file_url, zb_type = validate_request() - + file_url = g.file_url + zb_type = g.zb_type if isinstance(file_url, tuple): # 检查是否为错误响应 return file_url diff --git a/flask_app/routes/upload.py b/flask_app/routes/upload.py index 3d5062d..783b07b 100644 --- a/flask_app/routes/upload.py +++ b/flask_app/routes/upload.py @@ -15,19 +15,16 @@ from flask_app.ConnectionLimiter import require_connection_limit upload_bp = Blueprint('upload', __name__) @upload_bp.route('/upload', methods=['POST']) -@require_connection_limit(timeout=800) @validate_and_setup_logger +@require_connection_limit(timeout=720) def zbparse(): logger = g.logger try: logger.info("大解析开始!!!") received_data = request.get_json() logger.info("Received JSON data: " + str(received_data)) - validation = validate_request() - if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str): - file_url, zb_type = validation - else: - return validation # 错误响应 + file_url = g.file_url + zb_type = g.zb_type try: logger.info("starting parsing url:" + file_url) return process_and_stream(file_url, zb_type) diff --git a/flask_app/routes/utils.py b/flask_app/routes/utils.py index 59323e4..a2b3d80 100644 --- a/flask_app/routes/utils.py +++ b/flask_app/routes/utils.py @@ -2,7 +2,7 @@ import json from functools import wraps -from flask import request, jsonify, current_app +from flask import request, jsonify, current_app, g from flask_app.logger_setup import create_logger @@ -105,7 +105,9 @@ def validate_and_setup_logger(f): # 创建 logger 和 output_folder create_logger(current_app, subfolder) - + # 将验证后的数据存储在 g 对象中 + g.file_url = file_url + g.zb_type = zb_type return f(*args, **kwargs) else: # 验证失败,返回错误响应 diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 9194eb3..fc9e781 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -35,10 +35,6 @@ def create_app(): @app.teardown_request def teardown_request(exception): - limiter = getattr(g, 'limiter', None) - monitor = getattr(g, 'monitor', None) - if limiter and not monitor.is_timeout: - limiter.semaphore.release() output_folder = getattr(g, 'output_folder', None) if output_folder: # 执行与output_folder相关的清理操作(例如删除临时文件) @@ -46,6 +42,7 @@ def create_app(): logger.info(f"正在清理输出文件夹: {output_folder}") file_ids=read_file_ids(output_folder) delete_file_by_ids(file_ids) + logger.info("清理完毕!") return app #TODO:培训要求、总体要求、进度要求、'建设要求'到技术要求中,归类到其他要求中