75 lines
3.1 KiB
Python
75 lines
3.1 KiB
Python
import threading
|
||
import time
|
||
from functools import wraps
|
||
from flask import current_app, jsonify, stream_with_context, Response, request
|
||
|
||
|
||
class ExecutionTimeoutMonitor:
|
||
"""监控请求执行时间,超时后释放信号量"""
|
||
def __init__(self, timeout, semaphore):
|
||
self.timeout = timeout
|
||
self.semaphore = semaphore
|
||
self.is_timeout = False
|
||
self.thread = threading.Thread(target=self._monitor)
|
||
self.thread.daemon = True
|
||
|
||
def _monitor(self):
|
||
"""等待指定时间后标记为超时并释放信号量
|
||
目前的超时机制是通过 ExecutionTimeoutMonitor 的标志位来通知生成器超时。
|
||
但是,生成器只有在下一次 yield 时才会检查标志位,因此不会在当前执行的代码段中间强行中断。
|
||
"""
|
||
time.sleep(self.timeout)
|
||
self.is_timeout = True
|
||
self.semaphore.release() # 超时后释放信号量
|
||
current_app.logger.error(f"Request execution exceeded {self.timeout} seconds and was terminated.")
|
||
|
||
def start(self):
|
||
self.thread.start()
|
||
|
||
def require_connection_limit(timeout=900):
|
||
def decorator(f):
|
||
@wraps(f)
|
||
def wrapped(*args, **kwargs):
|
||
blueprint = request.blueprint
|
||
limiter = current_app.connection_limiters.get(blueprint, current_app.connection_limiters.get('default'))
|
||
if limiter is None:
|
||
current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。")
|
||
return jsonify({'error': '服务器配置错误'}), 500
|
||
|
||
# 获取信号量(阻塞)
|
||
acquired = limiter.semaphore.acquire(blocking=True)
|
||
if not acquired:
|
||
current_app.logger.warning("并发连接过多")
|
||
return jsonify({'error': '并发连接过多'}), 429
|
||
|
||
try:
|
||
# 启动执行超时监控器
|
||
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore)
|
||
monitor.start()
|
||
|
||
generator = f(*args, **kwargs)
|
||
|
||
@stream_with_context
|
||
def generator_wrapper():
|
||
try:
|
||
for item in generator:
|
||
if monitor.is_timeout:
|
||
current_app.logger.error("请求执行时间超过限制并被终止。")
|
||
break
|
||
yield item
|
||
finally:
|
||
if not monitor.is_timeout:
|
||
limiter.semaphore.release() # 正常结束时释放信号量
|
||
|
||
return Response(generator_wrapper(), mimetype='text/event-stream')
|
||
except Exception as e:
|
||
limiter.semaphore.release() # 异常时释放信号量
|
||
current_app.logger.error(f"路由处理异常: {e}")
|
||
return jsonify({'error': '内部服务器错误'}), 500
|
||
return wrapped
|
||
return decorator
|
||
|
||
class ConnectionLimiter:
|
||
def __init__(self, max_connections=1):
|
||
self.semaphore = threading.Semaphore(max_connections)
|