From 80df0e377ad732e74c0f227abe30541cc8562fdb Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Mon, 25 Nov 2024 16:04:53 +0800 Subject: [PATCH] =?UTF-8?q?11.25=20=E8=B6=85=E6=97=B6=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E3=80=81=E8=BF=9E=E6=8E=A5=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/ConnectionLimiter.py | 26 +++++++++++++++----------- flask_app/routes/get_deviation.py | 7 +++---- flask_app/routes/little_zbparse.py | 3 +++ flask_app/routes/upload.py | 3 +-- flask_app/start_up.py | 7 +++++-- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py index 35b141c..2578dc3 100644 --- a/flask_app/ConnectionLimiter.py +++ b/flask_app/ConnectionLimiter.py @@ -1,7 +1,8 @@ import threading import time from functools import wraps -from flask import current_app, jsonify, stream_with_context, Response +from flask import current_app, jsonify, stream_with_context, Response, request + class ExecutionTimeoutMonitor: """监控请求执行时间,超时后释放信号量""" @@ -26,17 +27,21 @@ class ExecutionTimeoutMonitor: self.thread.start() def require_connection_limit(timeout=900): - """装饰器:确保路由使用连接限制,并监控请求执行时间""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): - limiter = getattr(current_app, 'connection_limiter', None) + blueprint = request.blueprint + limiter = current_app.connection_limiters.get(blueprint, current_app.connection_limiters.get('default')) if limiter is None: - current_app.logger.error("ConnectionLimiter 未初始化") - return jsonify({'error': 'Server configuration error'}), 500 + current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter,并且未设置默认限流器。") + return jsonify({'error': '服务器配置错误'}), 500 + + # 获取信号量(阻塞) + acquired = limiter.semaphore.acquire(blocking=True) + if not acquired: + current_app.logger.warning("并发连接过多") + return jsonify({'error': '并发连接过多'}), 429 - # 阻塞方式获取信号量 - limiter.semaphore.acquire() try: # 启动执行超时监控器 monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore) @@ -49,8 +54,7 @@ def require_connection_limit(timeout=900): try: for item in generator: if monitor.is_timeout: - # 如果已超时,终止生成器执行 - current_app.logger.error("Request exceeded execution time and was terminated.") + current_app.logger.error("请求执行时间超过限制并被终止。") break yield item finally: @@ -60,8 +64,8 @@ def require_connection_limit(timeout=900): return Response(generator_wrapper(), mimetype='text/event-stream') except Exception as e: limiter.semaphore.release() # 异常时释放信号量 - current_app.logger.error(f"Exception in route: {e}") - return jsonify({'error': 'Internal server error'}), 500 + current_app.logger.error(f"路由处理异常: {e}") + return jsonify({'error': '内部服务器错误'}), 500 return wrapped return decorator diff --git a/flask_app/routes/get_deviation.py b/flask_app/routes/get_deviation.py index 3748af5..7c7455f 100644 --- a/flask_app/routes/get_deviation.py +++ b/flask_app/routes/get_deviation.py @@ -1,4 +1,5 @@ # flask_app/routes/get_deviation.py +import time from flask import Blueprint, request, jsonify, Response, stream_with_context, g import json @@ -6,13 +7,11 @@ import os from flask_app.main.download import download_file from flask_app.general.post_processing import outer_post_processing from flask_app.general.接口_技术偏离表 import get_tech_and_business_deviation - -from flask_app.logger_setup import CSTFormatter - from flask_app.routes.utils import generate_deviation_response, validate_request - +from flask_app.ConnectionLimiter import require_connection_limit get_deviation_bp = Blueprint('get_deviation', __name__) @get_deviation_bp.route('/get_deviation', methods=['POST']) +@require_connection_limit(timeout=700) def get_deviation(): logger = g.logger unique_id = g.unique_id diff --git a/flask_app/routes/little_zbparse.py b/flask_app/routes/little_zbparse.py index fc87f63..8bd8f7e 100644 --- a/flask_app/routes/little_zbparse.py +++ b/flask_app/routes/little_zbparse.py @@ -1,9 +1,11 @@ # flask_app/routes/little_zbparse.py +import time from flask import Blueprint, request, jsonify, Response, stream_with_context, g import json import os +from flask_app.ConnectionLimiter import require_connection_limit from flask_app.main.download import download_file from flask_app.general.post_processing import outer_post_processing from flask_app.general.接口_小解析 import little_parse_main @@ -14,6 +16,7 @@ from flask_app.routes.utils import validate_request little_zbparse_bp = Blueprint('little_zbparse', __name__) @little_zbparse_bp.route('/little_zbparse', methods=['POST']) +@require_connection_limit(timeout=300) def little_zbparse(): logger = g.logger file_url, zb_type = validate_request() diff --git a/flask_app/routes/upload.py b/flask_app/routes/upload.py index 61bcc1c..8cc5d99 100644 --- a/flask_app/routes/upload.py +++ b/flask_app/routes/upload.py @@ -17,7 +17,7 @@ from flask_app.ConnectionLimiter import require_connection_limit upload_bp = Blueprint('upload', __name__) @upload_bp.route('/upload', methods=['POST']) -@require_connection_limit(timeout=900) +@require_connection_limit(timeout=800) def zbparse(): logger = g.logger try: @@ -38,7 +38,6 @@ def zbparse(): except Exception as e: logger.error('Unexpected exception: ' + str(e)) return jsonify({'error': 'Internal server error'}), 500 - def process_and_stream(file_url, zb_type): """ 下载文件并进行处理,支持工程标和货物标的处理。 diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 58d061f..4aa19c4 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -13,7 +13,8 @@ from flask_app.routes.test_zbparse import test_zbparse_bp class FlaskAppWithLimiter(Flask): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.connection_limiter = ConnectionLimiter(max_connections=10) + # 初始化一个字典来存储每个蓝图的限流器 + self.connection_limiters = {} def create_app(): app = FlaskAppWithLimiter(__name__) @@ -47,7 +48,9 @@ def create_app(): app.register_blueprint(little_zbparse_bp) app.register_blueprint(upload_bp) app.register_blueprint(test_zbparse_bp) - + app.connection_limiters['upload'] = ConnectionLimiter(max_connections=7) + app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=7) + app.connection_limiters['default'] = ConnectionLimiter(max_connections=10) return app #TODO:培训要求、总体要求、进度要求、'建设要求'到技术要求中,归类到其他要求中