# flask_app/ConnectionLimiter.py import threading import time from functools import wraps from flask import current_app, jsonify, g, request, Response, stream_with_context class ExecutionTimeoutMonitor: """监控请求执行时间,超时后释放信号量""" def __init__(self, timeout, semaphore): self.timeout = timeout self.semaphore = semaphore self.is_timeout = False self.thread = threading.Thread(target=self._monitor) self.thread.daemon = True def _monitor(self): """等待指定时间后标记为超时并释放信号量""" time.sleep(self.timeout) self.is_timeout = True self.semaphore.release() # 超时后释放信号量 current_app.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") def start(self): self.thread.start() class ConnectionLimiter: def __init__(self, max_connections=1): self.semaphore = threading.Semaphore(max_connections) def require_connection_limit(timeout=900): """装饰器:确保路由使用连接限制,并监控请求执行时间""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): blueprint = request.blueprint limiter = current_app.connection_limiters.get(blueprint, current_app.connection_limiters.get('default')) if limiter is None: current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") return jsonify({'error': '服务器配置错误'}), 500 acquired = limiter.semaphore.acquire(blocking=True) if not acquired: current_app.logger.warning("并发连接过多") return jsonify({'error': '并发连接过多'}), 429 # 启动执行超时监控器 monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore) monitor.start() # 将限流器和监控器存储在 g 中,以便在 teardown_request 中释放信号量 g.limiter = limiter g.monitor = monitor try: result = f(*args, **kwargs) if isinstance(result, Response): # 对于普通的 Response 对象,不需要额外处理 return result elif hasattr(result, '__iter__'): # 如果返回的是生成器,按原有逻辑处理 @stream_with_context def generator_wrapper(): try: for item in result: if monitor.is_timeout: current_app.logger.error("请求执行时间超过限制并被终止。") break yield item finally: if not monitor.is_timeout: limiter.semaphore.release() return Response(generator_wrapper(), mimetype='text/event-stream') else: # 对于其他类型的响应,直接返回 return result except Exception as e: limiter.semaphore.release() current_app.logger.error(f"路由处理异常: {e}") return jsonify({'error': '内部服务器错误'}), 500 return wrapped return decorator