# flask_app/ConnectionLimiter.py import threading import logging from functools import wraps from flask import jsonify, request, Response, stream_with_context, current_app, g # 配置模块级别的日志记录器 logger = logging.getLogger(__name__) class ExecutionTimeoutMonitor: """监控请求执行时间,超时后释放信号量""" def __init__(self, timeout, semaphore, logger): self.timeout = timeout self.semaphore = semaphore self.is_timeout = False self.stopped = threading.Event() self.logger = logger self.thread = threading.Thread(target=self._monitor) self.thread.daemon = True def _monitor(self): """等待指定时间后标记为超时并释放信号量""" if not self.stopped.wait(self.timeout): self.is_timeout = True self.semaphore.release() # 超时后释放信号量 self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") def start(self): self.thread.start() def stop(self): self.stopped.set() self.thread.join() class ConnectionLimiter: def __init__(self, max_connections=10): self.semaphore = threading.Semaphore(max_connections) #当请求超时时,连接资源(即信号量)会被释放,但是正在执行的程序不会被终止。这意味着虽然新的请求可以继续被接受和处理,但超时的请求仍然会在后台继续运行,直到完成或遇到其他中断条件。 def require_connection_limit(timeout=900): """装饰器:确保路由使用连接限制,并监控请求执行时间""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): # 统一获取 logger 对象 logger_to_use = getattr(g, 'logger', logger) blueprint = request.blueprint limiter = current_app.connection_limiters.get( blueprint, current_app.connection_limiters.get('default')) if limiter is None: logger_to_use.error( f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") return jsonify({'error': '服务器配置错误'}), 500 acquired = limiter.semaphore.acquire(blocking=True) if not acquired: logger_to_use.warning("并发连接过多") return jsonify({'error': '并发连接过多'}), 429 # 启动执行超时监控器 monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use) monitor.start() try: result = f(*args, **kwargs) if isinstance(result, Response): # 对于 Response 对象,使用 call_on_close 释放信号量 def release_semaphore(): if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.info("Response Object 正确释放") result.call_on_close(release_semaphore) return result elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)): # 处理生成器响应 @stream_with_context def generator_wrapper(): try: for item in result: if monitor.is_timeout: logger_to_use.error("请求执行时间超过限制并被终止。") break yield item except GeneratorExit: # 客户端断开连接 logger_to_use.info("客户端断开连接") finally: if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器") return Response(generator_wrapper(), mimetype='text/event-stream') else: # 对于其他类型的响应,返回前释放信号量 if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.info("Semaphore released for non-Response, non-generator result") return result except Exception as e: if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.error(f"路由处理异常: {e}") return jsonify({'error': '内部服务器错误'}), 500 return wrapped return decorator