zbparse/flask_app/ConnectionLimiter.py

89 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# flask_app/ConnectionLimiter.py
import threading
import time
from functools import wraps
from flask import current_app, jsonify, g, request, Response, stream_with_context
class ExecutionTimeoutMonitor:
"""监控请求执行时间,超时后释放信号量"""
def __init__(self, timeout, semaphore):
self.timeout = timeout
self.semaphore = semaphore
self.is_timeout = False
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
def _monitor(self):
"""等待指定时间后标记为超时并释放信号量"""
time.sleep(self.timeout)
self.is_timeout = True
self.semaphore.release() # 超时后释放信号量
current_app.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
def start(self):
self.thread.start()
class ConnectionLimiter:
def __init__(self, max_connections=1):
self.semaphore = threading.Semaphore(max_connections)
def require_connection_limit(timeout=900):
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
blueprint = request.blueprint
limiter = current_app.connection_limiters.get(blueprint, current_app.connection_limiters.get('default'))
if limiter is None:
current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter并且未设置默认限流器。")
return jsonify({'error': '服务器配置错误'}), 500
acquired = limiter.semaphore.acquire(blocking=True)
if not acquired:
current_app.logger.warning("并发连接过多")
return jsonify({'error': '并发连接过多'}), 429
# 启动执行超时监控器
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore)
monitor.start()
# 将限流器和监控器存储在 g 中,以便在 teardown_request 中释放信号量
g.limiter = limiter
g.monitor = monitor
try:
result = f(*args, **kwargs)
if isinstance(result, Response):
# 对于普通的 Response 对象,不需要额外处理
return result
elif hasattr(result, '__iter__'):
# 如果返回的是生成器,按原有逻辑处理
@stream_with_context
def generator_wrapper():
try:
for item in result:
if monitor.is_timeout:
current_app.logger.error("请求执行时间超过限制并被终止。")
break
yield item
finally:
if not monitor.is_timeout:
limiter.semaphore.release()
return Response(generator_wrapper(), mimetype='text/event-stream')
else:
# 对于其他类型的响应,直接返回
return result
except Exception as e:
limiter.semaphore.release()
current_app.logger.error(f"路由处理异常: {e}")
return jsonify({'error': '内部服务器错误'}), 500
return wrapped
return decorator