From 94c124ee0443b0f7af5c6b4a06816a3ad7c84106 Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Mon, 25 Nov 2024 16:24:38 +0800 Subject: [PATCH] =?UTF-8?q?11.25=20=E8=B6=85=E6=97=B6=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E3=80=81=E8=BF=9E=E6=8E=A5=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/ConnectionLimiter.py | 72 ++++++++++++++++++------------ flask_app/routes/little_zbparse.py | 4 +- flask_app/start_up.py | 9 +++- 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py index 2578dc3..0833ebc 100644 --- a/flask_app/ConnectionLimiter.py +++ b/flask_app/ConnectionLimiter.py @@ -1,8 +1,9 @@ +# flask_app/ConnectionLimiter.py + import threading import time from functools import wraps -from flask import current_app, jsonify, stream_with_context, Response, request - +from flask import current_app, jsonify, g, request, Response, stream_with_context class ExecutionTimeoutMonitor: """监控请求执行时间,超时后释放信号量""" @@ -14,19 +15,21 @@ class ExecutionTimeoutMonitor: self.thread.daemon = True def _monitor(self): - """等待指定时间后标记为超时并释放信号量 - 目前的超时机制是通过 ExecutionTimeoutMonitor 的标志位来通知生成器超时。 - 但是,生成器只有在下一次 yield 时才会检查标志位,因此不会在当前执行的代码段中间强行中断。 - """ + """等待指定时间后标记为超时并释放信号量""" time.sleep(self.timeout) self.is_timeout = True self.semaphore.release() # 超时后释放信号量 - current_app.logger.error(f"Request execution exceeded {self.timeout} seconds and was terminated.") + current_app.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") def start(self): self.thread.start() +class ConnectionLimiter: + def __init__(self, max_connections=1): + self.semaphore = threading.Semaphore(max_connections) + def require_connection_limit(timeout=900): + """装饰器:确保路由使用连接限制,并监控请求执行时间""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): @@ -36,39 +39,50 @@ def require_connection_limit(timeout=900): current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") return jsonify({'error': '服务器配置错误'}), 500 - # 获取信号量(阻塞) acquired = limiter.semaphore.acquire(blocking=True) if not acquired: current_app.logger.warning("并发连接过多") return jsonify({'error': '并发连接过多'}), 429 + # 启动执行超时监控器 + monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore) + monitor.start() + + # 将限流器和监控器存储在 g 中,以便在 teardown_request 中释放信号量 + g.limiter = limiter + g.monitor = monitor + try: - # 启动执行超时监控器 - monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore) - monitor.start() + result = f(*args, **kwargs) - generator = f(*args, **kwargs) + if isinstance(result, Response): + # 对于普通的 Response 对象,不需要额外处理 + return result - @stream_with_context - def generator_wrapper(): - try: - for item in generator: - if monitor.is_timeout: - current_app.logger.error("请求执行时间超过限制并被终止。") - break - yield item - finally: - if not monitor.is_timeout: - limiter.semaphore.release() # 正常结束时释放信号量 + elif hasattr(result, '__iter__'): + # 如果返回的是生成器,按原有逻辑处理 + @stream_with_context + def generator_wrapper(): + try: + for item in result: + if monitor.is_timeout: + current_app.logger.error("请求执行时间超过限制并被终止。") + break + yield item + finally: + if not monitor.is_timeout: + limiter.semaphore.release() + + return Response(generator_wrapper(), mimetype='text/event-stream') + + else: + # 对于其他类型的响应,直接返回 + return result - return Response(generator_wrapper(), mimetype='text/event-stream') except Exception as e: - limiter.semaphore.release() # 异常时释放信号量 + limiter.semaphore.release() current_app.logger.error(f"路由处理异常: {e}") return jsonify({'error': '内部服务器错误'}), 500 + return wrapped return decorator - -class ConnectionLimiter: - def __init__(self, max_connections=1): - self.semaphore = threading.Semaphore(max_connections) diff --git a/flask_app/routes/little_zbparse.py b/flask_app/routes/little_zbparse.py index 8bd8f7e..a0f4a4a 100644 --- a/flask_app/routes/little_zbparse.py +++ b/flask_app/routes/little_zbparse.py @@ -1,9 +1,8 @@ # flask_app/routes/little_zbparse.py -import time -from flask import Blueprint, request, jsonify, Response, stream_with_context, g import json import os +from flask import Blueprint, request, jsonify, Response, g from flask_app.ConnectionLimiter import require_connection_limit from flask_app.main.download import download_file @@ -15,6 +14,7 @@ from flask_app.logger_setup import CSTFormatter from flask_app.routes.utils import validate_request little_zbparse_bp = Blueprint('little_zbparse', __name__) + @little_zbparse_bp.route('/little_zbparse', methods=['POST']) @require_connection_limit(timeout=300) def little_zbparse(): diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 4aa19c4..36cc334 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -1,7 +1,7 @@ # flask_app/start_up.py import logging -from flask import Flask, request +from flask import Flask, request, g from flask_app.ConnectionLimiter import ConnectionLimiter from flask_app.logger_setup import CSTFormatter, create_logger @@ -51,6 +51,13 @@ def create_app(): app.connection_limiters['upload'] = ConnectionLimiter(max_connections=7) app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=7) app.connection_limiters['default'] = ConnectionLimiter(max_connections=10) + + @app.teardown_request + def teardown_request(exception): + limiter = getattr(g, 'limiter', None) + monitor = getattr(g, 'monitor', None) + if limiter and not monitor.is_timeout: + limiter.semaphore.release() return app #TODO:培训要求、总体要求、进度要求、'建设要求'到技术要求中,归类到其他要求中