11.25 超时设置、连接设置
This commit is contained in:
parent
80df0e377a
commit
94c124ee04
@ -1,8 +1,9 @@
|
|||||||
|
# flask_app/ConnectionLimiter.py
|
||||||
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from flask import current_app, jsonify, stream_with_context, Response, request
|
from flask import current_app, jsonify, g, request, Response, stream_with_context
|
||||||
|
|
||||||
|
|
||||||
class ExecutionTimeoutMonitor:
|
class ExecutionTimeoutMonitor:
|
||||||
"""监控请求执行时间,超时后释放信号量"""
|
"""监控请求执行时间,超时后释放信号量"""
|
||||||
@ -14,19 +15,21 @@ class ExecutionTimeoutMonitor:
|
|||||||
self.thread.daemon = True
|
self.thread.daemon = True
|
||||||
|
|
||||||
def _monitor(self):
|
def _monitor(self):
|
||||||
"""等待指定时间后标记为超时并释放信号量
|
"""等待指定时间后标记为超时并释放信号量"""
|
||||||
目前的超时机制是通过 ExecutionTimeoutMonitor 的标志位来通知生成器超时。
|
|
||||||
但是,生成器只有在下一次 yield 时才会检查标志位,因此不会在当前执行的代码段中间强行中断。
|
|
||||||
"""
|
|
||||||
time.sleep(self.timeout)
|
time.sleep(self.timeout)
|
||||||
self.is_timeout = True
|
self.is_timeout = True
|
||||||
self.semaphore.release() # 超时后释放信号量
|
self.semaphore.release() # 超时后释放信号量
|
||||||
current_app.logger.error(f"Request execution exceeded {self.timeout} seconds and was terminated.")
|
current_app.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
|
||||||
|
class ConnectionLimiter:
|
||||||
|
def __init__(self, max_connections=1):
|
||||||
|
self.semaphore = threading.Semaphore(max_connections)
|
||||||
|
|
||||||
def require_connection_limit(timeout=900):
|
def require_connection_limit(timeout=900):
|
||||||
|
"""装饰器:确保路由使用连接限制,并监控请求执行时间"""
|
||||||
def decorator(f):
|
def decorator(f):
|
||||||
@wraps(f)
|
@wraps(f)
|
||||||
def wrapped(*args, **kwargs):
|
def wrapped(*args, **kwargs):
|
||||||
@ -36,39 +39,50 @@ def require_connection_limit(timeout=900):
|
|||||||
current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。")
|
current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。")
|
||||||
return jsonify({'error': '服务器配置错误'}), 500
|
return jsonify({'error': '服务器配置错误'}), 500
|
||||||
|
|
||||||
# 获取信号量(阻塞)
|
|
||||||
acquired = limiter.semaphore.acquire(blocking=True)
|
acquired = limiter.semaphore.acquire(blocking=True)
|
||||||
if not acquired:
|
if not acquired:
|
||||||
current_app.logger.warning("并发连接过多")
|
current_app.logger.warning("并发连接过多")
|
||||||
return jsonify({'error': '并发连接过多'}), 429
|
return jsonify({'error': '并发连接过多'}), 429
|
||||||
|
|
||||||
|
# 启动执行超时监控器
|
||||||
|
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore)
|
||||||
|
monitor.start()
|
||||||
|
|
||||||
|
# 将限流器和监控器存储在 g 中,以便在 teardown_request 中释放信号量
|
||||||
|
g.limiter = limiter
|
||||||
|
g.monitor = monitor
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 启动执行超时监控器
|
result = f(*args, **kwargs)
|
||||||
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore)
|
|
||||||
monitor.start()
|
|
||||||
|
|
||||||
generator = f(*args, **kwargs)
|
if isinstance(result, Response):
|
||||||
|
# 对于普通的 Response 对象,不需要额外处理
|
||||||
|
return result
|
||||||
|
|
||||||
@stream_with_context
|
elif hasattr(result, '__iter__'):
|
||||||
def generator_wrapper():
|
# 如果返回的是生成器,按原有逻辑处理
|
||||||
try:
|
@stream_with_context
|
||||||
for item in generator:
|
def generator_wrapper():
|
||||||
if monitor.is_timeout:
|
try:
|
||||||
current_app.logger.error("请求执行时间超过限制并被终止。")
|
for item in result:
|
||||||
break
|
if monitor.is_timeout:
|
||||||
yield item
|
current_app.logger.error("请求执行时间超过限制并被终止。")
|
||||||
finally:
|
break
|
||||||
if not monitor.is_timeout:
|
yield item
|
||||||
limiter.semaphore.release() # 正常结束时释放信号量
|
finally:
|
||||||
|
if not monitor.is_timeout:
|
||||||
|
limiter.semaphore.release()
|
||||||
|
|
||||||
|
return Response(generator_wrapper(), mimetype='text/event-stream')
|
||||||
|
|
||||||
|
else:
|
||||||
|
# 对于其他类型的响应,直接返回
|
||||||
|
return result
|
||||||
|
|
||||||
return Response(generator_wrapper(), mimetype='text/event-stream')
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
limiter.semaphore.release() # 异常时释放信号量
|
limiter.semaphore.release()
|
||||||
current_app.logger.error(f"路由处理异常: {e}")
|
current_app.logger.error(f"路由处理异常: {e}")
|
||||||
return jsonify({'error': '内部服务器错误'}), 500
|
return jsonify({'error': '内部服务器错误'}), 500
|
||||||
|
|
||||||
return wrapped
|
return wrapped
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
class ConnectionLimiter:
|
|
||||||
def __init__(self, max_connections=1):
|
|
||||||
self.semaphore = threading.Semaphore(max_connections)
|
|
||||||
|
@ -1,9 +1,8 @@
|
|||||||
# flask_app/routes/little_zbparse.py
|
# flask_app/routes/little_zbparse.py
|
||||||
import time
|
|
||||||
|
|
||||||
from flask import Blueprint, request, jsonify, Response, stream_with_context, g
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
from flask import Blueprint, request, jsonify, Response, g
|
||||||
|
|
||||||
from flask_app.ConnectionLimiter import require_connection_limit
|
from flask_app.ConnectionLimiter import require_connection_limit
|
||||||
from flask_app.main.download import download_file
|
from flask_app.main.download import download_file
|
||||||
@ -15,6 +14,7 @@ from flask_app.logger_setup import CSTFormatter
|
|||||||
from flask_app.routes.utils import validate_request
|
from flask_app.routes.utils import validate_request
|
||||||
|
|
||||||
little_zbparse_bp = Blueprint('little_zbparse', __name__)
|
little_zbparse_bp = Blueprint('little_zbparse', __name__)
|
||||||
|
|
||||||
@little_zbparse_bp.route('/little_zbparse', methods=['POST'])
|
@little_zbparse_bp.route('/little_zbparse', methods=['POST'])
|
||||||
@require_connection_limit(timeout=300)
|
@require_connection_limit(timeout=300)
|
||||||
def little_zbparse():
|
def little_zbparse():
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
# flask_app/start_up.py
|
# flask_app/start_up.py
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from flask import Flask, request
|
from flask import Flask, request, g
|
||||||
|
|
||||||
from flask_app.ConnectionLimiter import ConnectionLimiter
|
from flask_app.ConnectionLimiter import ConnectionLimiter
|
||||||
from flask_app.logger_setup import CSTFormatter, create_logger
|
from flask_app.logger_setup import CSTFormatter, create_logger
|
||||||
@ -51,6 +51,13 @@ def create_app():
|
|||||||
app.connection_limiters['upload'] = ConnectionLimiter(max_connections=7)
|
app.connection_limiters['upload'] = ConnectionLimiter(max_connections=7)
|
||||||
app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=7)
|
app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=7)
|
||||||
app.connection_limiters['default'] = ConnectionLimiter(max_connections=10)
|
app.connection_limiters['default'] = ConnectionLimiter(max_connections=10)
|
||||||
|
|
||||||
|
@app.teardown_request
|
||||||
|
def teardown_request(exception):
|
||||||
|
limiter = getattr(g, 'limiter', None)
|
||||||
|
monitor = getattr(g, 'monitor', None)
|
||||||
|
if limiter and not monitor.is_timeout:
|
||||||
|
limiter.semaphore.release()
|
||||||
return app
|
return app
|
||||||
|
|
||||||
#TODO:培训要求、总体要求、进度要求、'建设要求'到技术要求中,归类到其他要求中
|
#TODO:培训要求、总体要求、进度要求、'建设要求'到技术要求中,归类到其他要求中
|
||||||
|
Loading…
x
Reference in New Issue
Block a user