zbparse/flask_app/ConnectionLimiter.py
2025-02-13 22:22:04 +08:00

220 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# flask_app/ConnectionLimiter.py
import threading
import logging
from functools import wraps
from flask import jsonify, request, Response, stream_with_context, current_app, g
# 配置模块级别的日志记录器
logger = logging.getLogger(__name__)
# class ExecutionTimeoutMonitor:
# """监控请求执行时间,超时后释放信号量"""
# def __init__(self, timeout, semaphore, logger):
# self.timeout = timeout
# self.semaphore = semaphore
# self.is_timeout = False
# self.stopped = threading.Event()
# self.logger = logger
# self.thread = threading.Thread(target=self._monitor)
# self.thread.daemon = True
#
# def _monitor(self):
# """等待指定时间后标记为超时并释放信号量"""
# if not self.stopped.wait(self.timeout):
# self.is_timeout = True
# self.semaphore.release() # 超时后释放信号量
# self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
#
# def start(self):
# self.thread.start()
#
# def stop(self):
# self.stopped.set()
# self.thread.join()
class ConnectionLimiter:
def __init__(self, max_connections=10):
self.semaphore = threading.Semaphore(max_connections)
class ExecutionTimeoutMonitor:
"""
监控请求执行时间超时后仅做标记is_timeout = True
并记录日志;不再依赖信号量进行释放操作。
"""
def __init__(self, timeout, logger):
self.timeout = timeout
self.logger = logger
self.is_timeout = False
self.stopped = threading.Event()
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
def _monitor(self):
"""
等待指定时间后标记为超时is_timeout = True
仅做日志输出,不真正强杀后台线程。
"""
if not self.stopped.wait(self.timeout):
self.is_timeout = True
self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被标记为终止。")
def start(self):
self.thread.start()
def stop(self):
self.stopped.set()
self.thread.join()
#当请求超时时,连接资源(即信号量)会被释放,但是正在执行的程序不会被终止。这意味着虽然新的请求可以继续被接受和处理,但超时的请求仍然会在后台继续运行,直到完成或遇到其他中断条件。
def require_connection_limit(timeout=900):
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# 统一获取 logger 对象
logger_to_use = getattr(g, 'logger', logger)
blueprint = request.blueprint
limiter = current_app.connection_limiters.get(
blueprint, current_app.connection_limiters.get('default'))
if limiter is None:
logger_to_use.error(
f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter并且未设置默认限流器。")
return jsonify({'error': '服务器配置错误'}), 500
acquired = limiter.semaphore.acquire(blocking=True)
if not acquired:
logger_to_use.warning("并发连接过多")
return jsonify({'error': '并发连接过多'}), 429
# 启动执行超时监控器
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use)
monitor.start()
try:
result = f(*args, **kwargs)
if isinstance(result, Response):
# 对于 Response 对象,使用 call_on_close 释放信号量
def release_semaphore():
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Response Object 正确释放")
result.call_on_close(release_semaphore)
return result
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
# 处理生成器响应
@stream_with_context
def generator_wrapper():
try:
for item in result:
if monitor.is_timeout:
logger_to_use.error("请求执行时间超过限制并被终止。")
break
yield item
except GeneratorExit:
# 客户端断开连接
logger_to_use.info("客户端断开连接")
finally:
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器")
return Response(generator_wrapper(), mimetype='text/event-stream')
else:
# 对于其他类型的响应,返回前释放信号量
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Semaphore released for non-Response, non-generator result")
return result
except Exception as e:
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.error(f"路由处理异常: {e}")
return jsonify({'error': '内部服务器错误'}), 500
return wrapped
return decorator
def require_execution_timeout(timeout=1800):
"""装饰器:仅监控请求执行时间,超过即标记超时,并在可能的地方中断输出。"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# 获取或使用默认 logger
logger_to_use = getattr(g, 'logger', None)
if logger_to_use is None:
# 如果你有全局 logger就改成自己的全局 logger
import logging
logger_to_use = logging.getLogger("default")
# 启动执行超时监控器
monitor = ExecutionTimeoutMonitor(timeout, logger_to_use)
monitor.start()
try:
result = f(*args, **kwargs)
# 如果是 Flask 的 Response 对象
if isinstance(result, Response):
# 在关闭响应时检查是否超时,并停止监控
def on_close():
if not monitor.is_timeout:
monitor.stop()
logger_to_use.info("Response 正常结束,停止监控器。")
else:
logger_to_use.warning("Response 结束时监控器已超时标记。")
result.call_on_close(on_close)
return result
# 如果返回的是一个生成器(流式返回)
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
@stream_with_context
def generator_wrapper():
try:
for item in result:
# 如果已超时,则中断输出
if monitor.is_timeout:
logger_to_use.error("请求执行已超时,中断输出。")
break
yield item
except GeneratorExit:
logger_to_use.info("客户端断开连接。")
finally:
if not monitor.is_timeout:
monitor.stop()
logger_to_use.info("在生成器 finally 中停止监控器。")
else:
logger_to_use.warning("生成器 finally 时已超时标记。")
return Response(generator_wrapper(), mimetype='text/event-stream')
# 如果返回的是普通数据str / dict / jsonify等
else:
# 在函数返回前判断一下是否超时
if not monitor.is_timeout:
monitor.stop()
logger_to_use.info("普通返回,停止监控器。")
else:
logger_to_use.warning("普通返回时已超时标记。")
return result
except Exception as e:
if not monitor.is_timeout:
monitor.stop()
logger_to_use.error(f"路由处理异常: {e}", exc_info=True)
return jsonify({'error': '内部服务器错误'}), 500
return wrapped
return decorator