zbparse/flask_app/ConnectionLimiter.py

115 lines
4.7 KiB
Python
Raw Permalink Normal View History

2024-11-25 16:24:38 +08:00
# flask_app/ConnectionLimiter.py
2024-11-25 10:13:39 +08:00
import threading
2024-12-02 11:10:49 +08:00
import logging
2024-11-25 09:15:56 +08:00
from functools import wraps
2024-12-02 11:10:49 +08:00
from flask import jsonify, request, Response, stream_with_context, current_app, g
# 配置模块级别的日志记录器
logger = logging.getLogger(__name__)
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
class ExecutionTimeoutMonitor:
"""监控请求执行时间,超时后释放信号量"""
2024-12-02 11:10:49 +08:00
def __init__(self, timeout, semaphore, logger):
2024-11-25 14:38:58 +08:00
self.timeout = timeout
self.semaphore = semaphore
self.is_timeout = False
2024-12-02 11:10:49 +08:00
self.stopped = threading.Event()
self.logger = logger
2024-11-25 14:38:58 +08:00
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
def _monitor(self):
2024-11-25 16:24:38 +08:00
"""等待指定时间后标记为超时并释放信号量"""
2024-12-02 11:10:49 +08:00
if not self.stopped.wait(self.timeout):
self.is_timeout = True
self.semaphore.release() # 超时后释放信号量
self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
def start(self):
self.thread.start()
2024-11-25 10:13:39 +08:00
2024-12-02 11:10:49 +08:00
def stop(self):
self.stopped.set()
self.thread.join()
2024-11-25 16:24:38 +08:00
class ConnectionLimiter:
2024-12-02 11:10:49 +08:00
def __init__(self, max_connections=10):
2024-11-25 16:24:38 +08:00
self.semaphore = threading.Semaphore(max_connections)
2024-11-25 14:38:58 +08:00
def require_connection_limit(timeout=900):
2024-11-25 16:24:38 +08:00
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
2024-11-25 14:38:58 +08:00
def decorator(f):
2024-11-25 09:15:56 +08:00
@wraps(f)
2024-11-25 10:13:39 +08:00
def wrapped(*args, **kwargs):
2024-12-02 11:10:49 +08:00
# 统一获取 logger 对象
logger_to_use = getattr(g, 'logger', logger)
2024-11-25 16:04:53 +08:00
blueprint = request.blueprint
2024-12-02 11:10:49 +08:00
limiter = current_app.connection_limiters.get(
blueprint, current_app.connection_limiters.get('default'))
2024-11-25 14:38:58 +08:00
if limiter is None:
2024-12-02 11:10:49 +08:00
logger_to_use.error(
f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter并且未设置默认限流器。")
2024-11-25 16:04:53 +08:00
return jsonify({'error': '服务器配置错误'}), 500
acquired = limiter.semaphore.acquire(blocking=True)
if not acquired:
2024-12-02 11:10:49 +08:00
logger_to_use.warning("并发连接过多")
2024-11-25 16:04:53 +08:00
return jsonify({'error': '并发连接过多'}), 429
2024-11-25 14:38:58 +08:00
2024-11-25 16:24:38 +08:00
# 启动执行超时监控器
2024-12-02 11:10:49 +08:00
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use)
2024-11-25 16:24:38 +08:00
monitor.start()
2024-11-25 09:15:56 +08:00
try:
2024-11-25 16:24:38 +08:00
result = f(*args, **kwargs)
if isinstance(result, Response):
2024-12-02 11:10:49 +08:00
# 对于 Response 对象,使用 call_on_close 释放信号量
def release_semaphore():
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Response Object 正确释放")
result.call_on_close(release_semaphore)
2024-11-25 16:24:38 +08:00
return result
2024-12-02 11:10:49 +08:00
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
# 处理生成器响应
2024-11-25 16:24:38 +08:00
@stream_with_context
def generator_wrapper():
try:
for item in result:
if monitor.is_timeout:
2024-12-02 11:10:49 +08:00
logger_to_use.error("请求执行时间超过限制并被终止。")
2024-11-25 16:24:38 +08:00
break
yield item
2024-12-02 11:10:49 +08:00
except GeneratorExit:
# 客户端断开连接
logger_to_use.info("客户端断开连接")
2024-11-25 16:24:38 +08:00
finally:
if not monitor.is_timeout:
limiter.semaphore.release()
2024-12-02 11:10:49 +08:00
monitor.stop()
logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器")
2024-11-25 16:24:38 +08:00
return Response(generator_wrapper(), mimetype='text/event-stream')
else:
2024-12-02 11:10:49 +08:00
# 对于其他类型的响应,返回前释放信号量
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Semaphore released for non-Response, non-generator result")
2024-11-25 16:24:38 +08:00
return result
2024-11-25 14:38:58 +08:00
except Exception as e:
2024-12-02 11:10:49 +08:00
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.error(f"路由处理异常: {e}")
2024-11-25 16:04:53 +08:00
return jsonify({'error': '内部服务器错误'}), 500
2024-11-25 16:24:38 +08:00
2024-11-25 10:13:39 +08:00
return wrapped
2024-11-25 14:38:58 +08:00
return decorator