zbparse/flask_app/ConnectionLimiter.py

71 lines
2.9 KiB
Python
Raw Normal View History

2024-11-25 10:13:39 +08:00
import threading
2024-11-25 14:38:58 +08:00
import time
2024-11-25 09:15:56 +08:00
from functools import wraps
2024-11-25 14:38:58 +08:00
from flask import current_app, jsonify, stream_with_context, Response
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
class ExecutionTimeoutMonitor:
"""监控请求执行时间,超时后释放信号量"""
def __init__(self, timeout, semaphore):
self.timeout = timeout
self.semaphore = semaphore
self.is_timeout = False
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
def _monitor(self):
"""等待指定时间后标记为超时并释放信号量
目前的超时机制是通过 ExecutionTimeoutMonitor 的标志位来通知生成器超时
但是生成器只有在下一次 yield 时才会检查标志位因此不会在当前执行的代码段中间强行中断
"""
time.sleep(self.timeout)
self.is_timeout = True
self.semaphore.release() # 超时后释放信号量
current_app.logger.error(f"Request execution exceeded {self.timeout} seconds and was terminated.")
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
def start(self):
self.thread.start()
2024-11-25 10:13:39 +08:00
2024-11-25 14:38:58 +08:00
def require_connection_limit(timeout=900):
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
def decorator(f):
2024-11-25 09:15:56 +08:00
@wraps(f)
2024-11-25 10:13:39 +08:00
def wrapped(*args, **kwargs):
2024-11-25 14:38:58 +08:00
limiter = getattr(current_app, 'connection_limiter', None)
if limiter is None:
current_app.logger.error("ConnectionLimiter 未初始化")
return jsonify({'error': 'Server configuration error'}), 500
# 阻塞方式获取信号量
limiter.semaphore.acquire()
2024-11-25 09:15:56 +08:00
try:
2024-11-25 14:38:58 +08:00
# 启动执行超时监控器
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore)
monitor.start()
generator = f(*args, **kwargs)
2024-11-25 09:15:56 +08:00
2024-11-25 14:38:58 +08:00
@stream_with_context
def generator_wrapper():
try:
for item in generator:
if monitor.is_timeout:
# 如果已超时,终止生成器执行
current_app.logger.error("Request exceeded execution time and was terminated.")
break
yield item
finally:
if not monitor.is_timeout:
limiter.semaphore.release() # 正常结束时释放信号量
return Response(generator_wrapper(), mimetype='text/event-stream')
except Exception as e:
limiter.semaphore.release() # 异常时释放信号量
current_app.logger.error(f"Exception in route: {e}")
return jsonify({'error': 'Internal server error'}), 500
2024-11-25 10:13:39 +08:00
return wrapped
2024-11-25 14:38:58 +08:00
return decorator
class ConnectionLimiter:
def __init__(self, max_connections=1):
self.semaphore = threading.Semaphore(max_connections)