2024-11-25 16:24:38 +08:00
|
|
|
|
# flask_app/ConnectionLimiter.py
|
|
|
|
|
|
2024-11-25 10:13:39 +08:00
|
|
|
|
import threading
|
2024-12-02 11:10:49 +08:00
|
|
|
|
import logging
|
2024-11-25 09:15:56 +08:00
|
|
|
|
from functools import wraps
|
2024-12-02 11:10:49 +08:00
|
|
|
|
from flask import jsonify, request, Response, stream_with_context, current_app, g
|
|
|
|
|
|
|
|
|
|
# 配置模块级别的日志记录器
|
|
|
|
|
logger = logging.getLogger(__name__)
|
2024-11-25 09:15:56 +08:00
|
|
|
|
|
2025-02-13 22:22:04 +08:00
|
|
|
|
# class ExecutionTimeoutMonitor:
|
|
|
|
|
# """监控请求执行时间,超时后释放信号量"""
|
|
|
|
|
# def __init__(self, timeout, semaphore, logger):
|
|
|
|
|
# self.timeout = timeout
|
|
|
|
|
# self.semaphore = semaphore
|
|
|
|
|
# self.is_timeout = False
|
|
|
|
|
# self.stopped = threading.Event()
|
|
|
|
|
# self.logger = logger
|
|
|
|
|
# self.thread = threading.Thread(target=self._monitor)
|
|
|
|
|
# self.thread.daemon = True
|
|
|
|
|
#
|
|
|
|
|
# def _monitor(self):
|
|
|
|
|
# """等待指定时间后标记为超时并释放信号量"""
|
|
|
|
|
# if not self.stopped.wait(self.timeout):
|
|
|
|
|
# self.is_timeout = True
|
|
|
|
|
# self.semaphore.release() # 超时后释放信号量
|
|
|
|
|
# self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
|
|
|
|
|
#
|
|
|
|
|
# def start(self):
|
|
|
|
|
# self.thread.start()
|
|
|
|
|
#
|
|
|
|
|
# def stop(self):
|
|
|
|
|
# self.stopped.set()
|
|
|
|
|
# self.thread.join()
|
|
|
|
|
|
|
|
|
|
class ConnectionLimiter:
|
|
|
|
|
def __init__(self, max_connections=10):
|
|
|
|
|
self.semaphore = threading.Semaphore(max_connections)
|
|
|
|
|
|
2024-11-25 14:38:58 +08:00
|
|
|
|
class ExecutionTimeoutMonitor:
|
2025-02-13 22:22:04 +08:00
|
|
|
|
"""
|
|
|
|
|
监控请求执行时间,超时后仅做标记(is_timeout = True),
|
|
|
|
|
并记录日志;不再依赖信号量进行释放操作。
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self, timeout, logger):
|
2024-11-25 14:38:58 +08:00
|
|
|
|
self.timeout = timeout
|
2025-02-13 22:22:04 +08:00
|
|
|
|
self.logger = logger
|
2024-11-25 14:38:58 +08:00
|
|
|
|
self.is_timeout = False
|
2024-12-02 11:10:49 +08:00
|
|
|
|
self.stopped = threading.Event()
|
2024-11-25 14:38:58 +08:00
|
|
|
|
self.thread = threading.Thread(target=self._monitor)
|
|
|
|
|
self.thread.daemon = True
|
2024-11-25 09:15:56 +08:00
|
|
|
|
|
2024-11-25 14:38:58 +08:00
|
|
|
|
def _monitor(self):
|
2025-02-13 22:22:04 +08:00
|
|
|
|
"""
|
|
|
|
|
等待指定时间后标记为超时(is_timeout = True)。
|
|
|
|
|
仅做日志输出,不真正强杀后台线程。
|
|
|
|
|
"""
|
2024-12-02 11:10:49 +08:00
|
|
|
|
if not self.stopped.wait(self.timeout):
|
|
|
|
|
self.is_timeout = True
|
2025-02-13 22:22:04 +08:00
|
|
|
|
self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被标记为终止。")
|
2024-11-25 09:15:56 +08:00
|
|
|
|
|
2024-11-25 14:38:58 +08:00
|
|
|
|
def start(self):
|
|
|
|
|
self.thread.start()
|
2024-11-25 10:13:39 +08:00
|
|
|
|
|
2024-12-02 11:10:49 +08:00
|
|
|
|
def stop(self):
|
|
|
|
|
self.stopped.set()
|
|
|
|
|
self.thread.join()
|
|
|
|
|
|
2024-12-12 16:27:37 +08:00
|
|
|
|
#当请求超时时,连接资源(即信号量)会被释放,但是正在执行的程序不会被终止。这意味着虽然新的请求可以继续被接受和处理,但超时的请求仍然会在后台继续运行,直到完成或遇到其他中断条件。
|
2024-11-25 14:38:58 +08:00
|
|
|
|
def require_connection_limit(timeout=900):
|
2024-11-25 16:24:38 +08:00
|
|
|
|
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
|
2024-11-25 14:38:58 +08:00
|
|
|
|
def decorator(f):
|
2024-11-25 09:15:56 +08:00
|
|
|
|
@wraps(f)
|
2024-11-25 10:13:39 +08:00
|
|
|
|
def wrapped(*args, **kwargs):
|
2024-12-02 11:10:49 +08:00
|
|
|
|
# 统一获取 logger 对象
|
|
|
|
|
logger_to_use = getattr(g, 'logger', logger)
|
|
|
|
|
|
2024-11-25 16:04:53 +08:00
|
|
|
|
blueprint = request.blueprint
|
2024-12-02 11:10:49 +08:00
|
|
|
|
limiter = current_app.connection_limiters.get(
|
|
|
|
|
blueprint, current_app.connection_limiters.get('default'))
|
2024-11-25 14:38:58 +08:00
|
|
|
|
if limiter is None:
|
2024-12-02 11:10:49 +08:00
|
|
|
|
logger_to_use.error(
|
|
|
|
|
f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。")
|
2024-11-25 16:04:53 +08:00
|
|
|
|
return jsonify({'error': '服务器配置错误'}), 500
|
|
|
|
|
|
|
|
|
|
acquired = limiter.semaphore.acquire(blocking=True)
|
|
|
|
|
if not acquired:
|
2024-12-02 11:10:49 +08:00
|
|
|
|
logger_to_use.warning("并发连接过多")
|
2024-11-25 16:04:53 +08:00
|
|
|
|
return jsonify({'error': '并发连接过多'}), 429
|
2024-11-25 14:38:58 +08:00
|
|
|
|
|
2024-11-25 16:24:38 +08:00
|
|
|
|
# 启动执行超时监控器
|
2024-12-02 11:10:49 +08:00
|
|
|
|
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use)
|
2024-11-25 16:24:38 +08:00
|
|
|
|
monitor.start()
|
|
|
|
|
|
2024-11-25 09:15:56 +08:00
|
|
|
|
try:
|
2024-11-25 16:24:38 +08:00
|
|
|
|
result = f(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
if isinstance(result, Response):
|
2024-12-02 11:10:49 +08:00
|
|
|
|
# 对于 Response 对象,使用 call_on_close 释放信号量
|
|
|
|
|
def release_semaphore():
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
limiter.semaphore.release()
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.info("Response Object 正确释放")
|
|
|
|
|
|
|
|
|
|
result.call_on_close(release_semaphore)
|
2024-11-25 16:24:38 +08:00
|
|
|
|
return result
|
|
|
|
|
|
2024-12-02 11:10:49 +08:00
|
|
|
|
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
|
|
|
|
|
# 处理生成器响应
|
2024-11-25 16:24:38 +08:00
|
|
|
|
@stream_with_context
|
|
|
|
|
def generator_wrapper():
|
|
|
|
|
try:
|
|
|
|
|
for item in result:
|
|
|
|
|
if monitor.is_timeout:
|
2024-12-02 11:10:49 +08:00
|
|
|
|
logger_to_use.error("请求执行时间超过限制并被终止。")
|
2024-11-25 16:24:38 +08:00
|
|
|
|
break
|
|
|
|
|
yield item
|
2024-12-02 11:10:49 +08:00
|
|
|
|
except GeneratorExit:
|
|
|
|
|
# 客户端断开连接
|
|
|
|
|
logger_to_use.info("客户端断开连接")
|
2024-11-25 16:24:38 +08:00
|
|
|
|
finally:
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
limiter.semaphore.release()
|
2024-12-02 11:10:49 +08:00
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器")
|
2024-11-25 16:24:38 +08:00
|
|
|
|
return Response(generator_wrapper(), mimetype='text/event-stream')
|
|
|
|
|
else:
|
2024-12-02 11:10:49 +08:00
|
|
|
|
# 对于其他类型的响应,返回前释放信号量
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
limiter.semaphore.release()
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.info("Semaphore released for non-Response, non-generator result")
|
2024-11-25 16:24:38 +08:00
|
|
|
|
return result
|
|
|
|
|
|
2024-11-25 14:38:58 +08:00
|
|
|
|
except Exception as e:
|
2024-12-02 11:10:49 +08:00
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
limiter.semaphore.release()
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.error(f"路由处理异常: {e}")
|
2024-11-25 16:04:53 +08:00
|
|
|
|
return jsonify({'error': '内部服务器错误'}), 500
|
2024-11-25 16:24:38 +08:00
|
|
|
|
|
2024-11-25 10:13:39 +08:00
|
|
|
|
return wrapped
|
2024-11-25 14:38:58 +08:00
|
|
|
|
return decorator
|
2025-02-13 22:22:04 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def require_execution_timeout(timeout=1800):
|
|
|
|
|
"""装饰器:仅监控请求执行时间,超过即标记超时,并在可能的地方中断输出。"""
|
|
|
|
|
def decorator(f):
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def wrapped(*args, **kwargs):
|
|
|
|
|
# 获取或使用默认 logger
|
|
|
|
|
logger_to_use = getattr(g, 'logger', None)
|
|
|
|
|
if logger_to_use is None:
|
|
|
|
|
# 如果你有全局 logger,就改成自己的全局 logger
|
|
|
|
|
import logging
|
|
|
|
|
logger_to_use = logging.getLogger("default")
|
|
|
|
|
|
|
|
|
|
# 启动执行超时监控器
|
|
|
|
|
monitor = ExecutionTimeoutMonitor(timeout, logger_to_use)
|
|
|
|
|
monitor.start()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
result = f(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
# 如果是 Flask 的 Response 对象
|
|
|
|
|
if isinstance(result, Response):
|
|
|
|
|
# 在关闭响应时检查是否超时,并停止监控
|
|
|
|
|
def on_close():
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.info("Response 正常结束,停止监控器。")
|
|
|
|
|
else:
|
|
|
|
|
logger_to_use.warning("Response 结束时监控器已超时标记。")
|
|
|
|
|
|
|
|
|
|
result.call_on_close(on_close)
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
# 如果返回的是一个生成器(流式返回)
|
|
|
|
|
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
|
|
|
|
|
@stream_with_context
|
|
|
|
|
def generator_wrapper():
|
|
|
|
|
try:
|
|
|
|
|
for item in result:
|
|
|
|
|
# 如果已超时,则中断输出
|
|
|
|
|
if monitor.is_timeout:
|
|
|
|
|
logger_to_use.error("请求执行已超时,中断输出。")
|
|
|
|
|
break
|
|
|
|
|
yield item
|
|
|
|
|
except GeneratorExit:
|
|
|
|
|
logger_to_use.info("客户端断开连接。")
|
|
|
|
|
finally:
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.info("在生成器 finally 中停止监控器。")
|
|
|
|
|
else:
|
|
|
|
|
logger_to_use.warning("生成器 finally 时已超时标记。")
|
|
|
|
|
|
|
|
|
|
return Response(generator_wrapper(), mimetype='text/event-stream')
|
|
|
|
|
|
|
|
|
|
# 如果返回的是普通数据(str / dict / jsonify等)
|
|
|
|
|
else:
|
|
|
|
|
# 在函数返回前判断一下是否超时
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.info("普通返回,停止监控器。")
|
|
|
|
|
else:
|
|
|
|
|
logger_to_use.warning("普通返回时已超时标记。")
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
if not monitor.is_timeout:
|
|
|
|
|
monitor.stop()
|
|
|
|
|
logger_to_use.error(f"路由处理异常: {e}", exc_info=True)
|
|
|
|
|
return jsonify({'error': '内部服务器错误'}), 500
|
|
|
|
|
|
|
|
|
|
return wrapped
|
|
|
|
|
return decorator
|
|
|
|
|
|