zbparse/flask_app/ConnectionLimiter.py

116 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# flask_app/ConnectionLimiter.py
import threading
import logging
from functools import wraps
from flask import jsonify, request, Response, stream_with_context, current_app, g
# 配置模块级别的日志记录器
logger = logging.getLogger(__name__)
class ExecutionTimeoutMonitor:
"""监控请求执行时间,超时后释放信号量"""
def __init__(self, timeout, semaphore, logger):
self.timeout = timeout
self.semaphore = semaphore
self.is_timeout = False
self.stopped = threading.Event()
self.logger = logger
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
def _monitor(self):
"""等待指定时间后标记为超时并释放信号量"""
if not self.stopped.wait(self.timeout):
self.is_timeout = True
self.semaphore.release() # 超时后释放信号量
self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
def start(self):
self.thread.start()
def stop(self):
self.stopped.set()
self.thread.join()
class ConnectionLimiter:
def __init__(self, max_connections=10):
self.semaphore = threading.Semaphore(max_connections)
#当请求超时时,连接资源(即信号量)会被释放,但是正在执行的程序不会被终止。这意味着虽然新的请求可以继续被接受和处理,但超时的请求仍然会在后台继续运行,直到完成或遇到其他中断条件。
def require_connection_limit(timeout=900):
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# 统一获取 logger 对象
logger_to_use = getattr(g, 'logger', logger)
blueprint = request.blueprint
limiter = current_app.connection_limiters.get(
blueprint, current_app.connection_limiters.get('default'))
if limiter is None:
logger_to_use.error(
f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter并且未设置默认限流器。")
return jsonify({'error': '服务器配置错误'}), 500
acquired = limiter.semaphore.acquire(blocking=True)
if not acquired:
logger_to_use.warning("并发连接过多")
return jsonify({'error': '并发连接过多'}), 429
# 启动执行超时监控器
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use)
monitor.start()
try:
result = f(*args, **kwargs)
if isinstance(result, Response):
# 对于 Response 对象,使用 call_on_close 释放信号量
def release_semaphore():
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Response Object 正确释放")
result.call_on_close(release_semaphore)
return result
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
# 处理生成器响应
@stream_with_context
def generator_wrapper():
try:
for item in result:
if monitor.is_timeout:
logger_to_use.error("请求执行时间超过限制并被终止。")
break
yield item
except GeneratorExit:
# 客户端断开连接
logger_to_use.info("客户端断开连接")
finally:
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器")
return Response(generator_wrapper(), mimetype='text/event-stream')
else:
# 对于其他类型的响应,返回前释放信号量
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Semaphore released for non-Response, non-generator result")
return result
except Exception as e:
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.error(f"路由处理异常: {e}")
return jsonify({'error': '内部服务器错误'}), 500
return wrapped
return decorator