# flask_app/ConnectionLimiter.py import threading import logging from functools import wraps from flask import jsonify, request, Response, stream_with_context, current_app, g # 配置模块级别的日志记录器 logger = logging.getLogger(__name__) # class ExecutionTimeoutMonitor: # """监控请求执行时间,超时后释放信号量""" # def __init__(self, timeout, semaphore, logger): # self.timeout = timeout # self.semaphore = semaphore # self.is_timeout = False # self.stopped = threading.Event() # self.logger = logger # self.thread = threading.Thread(target=self._monitor) # self.thread.daemon = True # # def _monitor(self): # """等待指定时间后标记为超时并释放信号量""" # if not self.stopped.wait(self.timeout): # self.is_timeout = True # self.semaphore.release() # 超时后释放信号量 # self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") # # def start(self): # self.thread.start() # # def stop(self): # self.stopped.set() # self.thread.join() class ConnectionLimiter: def __init__(self, max_connections=10): self.semaphore = threading.Semaphore(max_connections) class ExecutionTimeoutMonitor: """ 监控请求执行时间,超时后仅做标记(is_timeout = True), 并记录日志;不再依赖信号量进行释放操作。 """ def __init__(self, timeout, logger): self.timeout = timeout self.logger = logger self.is_timeout = False self.stopped = threading.Event() self.thread = threading.Thread(target=self._monitor) self.thread.daemon = True def _monitor(self): """ 等待指定时间后标记为超时(is_timeout = True)。 仅做日志输出,不真正强杀后台线程。 """ if not self.stopped.wait(self.timeout): self.is_timeout = True self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被标记为终止。") def start(self): self.thread.start() def stop(self): self.stopped.set() self.thread.join() #当请求超时时,连接资源(即信号量)会被释放,但是正在执行的程序不会被终止。这意味着虽然新的请求可以继续被接受和处理,但超时的请求仍然会在后台继续运行,直到完成或遇到其他中断条件。 def require_connection_limit(timeout=900): """装饰器:确保路由使用连接限制,并监控请求执行时间""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): # 统一获取 logger 对象 logger_to_use = getattr(g, 'logger', logger) blueprint = request.blueprint limiter = current_app.connection_limiters.get( blueprint, current_app.connection_limiters.get('default')) if limiter is None: logger_to_use.error( f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") return jsonify({'error': '服务器配置错误'}), 500 acquired = limiter.semaphore.acquire(blocking=True) if not acquired: logger_to_use.warning("并发连接过多") return jsonify({'error': '并发连接过多'}), 429 # 启动执行超时监控器 monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use) monitor.start() try: result = f(*args, **kwargs) if isinstance(result, Response): # 对于 Response 对象,使用 call_on_close 释放信号量 def release_semaphore(): if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.info("Response Object 正确释放") result.call_on_close(release_semaphore) return result elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)): # 处理生成器响应 @stream_with_context def generator_wrapper(): try: for item in result: if monitor.is_timeout: logger_to_use.error("请求执行时间超过限制并被终止。") break yield item except GeneratorExit: # 客户端断开连接 logger_to_use.info("客户端断开连接") finally: if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器") return Response(generator_wrapper(), mimetype='text/event-stream') else: # 对于其他类型的响应,返回前释放信号量 if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.info("Semaphore released for non-Response, non-generator result") return result except Exception as e: if not monitor.is_timeout: limiter.semaphore.release() monitor.stop() logger_to_use.error(f"路由处理异常: {e}") return jsonify({'error': '内部服务器错误'}), 500 return wrapped return decorator def require_execution_timeout(timeout=1800): """装饰器:仅监控请求执行时间,超过即标记超时,并在可能的地方中断输出。""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): # 获取或使用默认 logger logger_to_use = getattr(g, 'logger', None) if logger_to_use is None: # 如果你有全局 logger,就改成自己的全局 logger import logging logger_to_use = logging.getLogger("default") # 启动执行超时监控器 monitor = ExecutionTimeoutMonitor(timeout, logger_to_use) monitor.start() try: result = f(*args, **kwargs) # 如果是 Flask 的 Response 对象 if isinstance(result, Response): # 在关闭响应时检查是否超时,并停止监控 def on_close(): if not monitor.is_timeout: monitor.stop() logger_to_use.info("Response 正常结束,停止监控器。") else: logger_to_use.warning("Response 结束时监控器已超时标记。") result.call_on_close(on_close) return result # 如果返回的是一个生成器(流式返回) elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)): @stream_with_context def generator_wrapper(): try: for item in result: # 如果已超时,则中断输出 if monitor.is_timeout: logger_to_use.error("请求执行已超时,中断输出。") break yield item except GeneratorExit: logger_to_use.info("客户端断开连接。") finally: if not monitor.is_timeout: monitor.stop() logger_to_use.info("在生成器 finally 中停止监控器。") else: logger_to_use.warning("生成器 finally 时已超时标记。") return Response(generator_wrapper(), mimetype='text/event-stream') # 如果返回的是普通数据(str / dict / jsonify等) else: # 在函数返回前判断一下是否超时 if not monitor.is_timeout: monitor.stop() logger_to_use.info("普通返回,停止监控器。") else: logger_to_use.warning("普通返回时已超时标记。") return result except Exception as e: if not monitor.is_timeout: monitor.stop() logger_to_use.error(f"路由处理异常: {e}", exc_info=True) return jsonify({'error': '内部服务器错误'}), 500 return wrapped return decorator