12.2 清理bug修改

This commit is contained in:
zy123 2024-12-02 11:10:49 +08:00
parent d8860c9466
commit d11e488640
6 changed files with 66 additions and 49 deletions

View File

@ -1,31 +1,40 @@
# flask_app/ConnectionLimiter.py
import threading
import time
import logging
from functools import wraps
from flask import current_app, jsonify, g, request, Response, stream_with_context
from flask import jsonify, request, Response, stream_with_context, current_app, g
# 配置模块级别的日志记录器
logger = logging.getLogger(__name__)
class ExecutionTimeoutMonitor:
"""监控请求执行时间,超时后释放信号量"""
def __init__(self, timeout, semaphore):
def __init__(self, timeout, semaphore, logger):
self.timeout = timeout
self.semaphore = semaphore
self.is_timeout = False
self.stopped = threading.Event()
self.logger = logger
self.thread = threading.Thread(target=self._monitor)
self.thread.daemon = True
def _monitor(self):
"""等待指定时间后标记为超时并释放信号量"""
time.sleep(self.timeout)
self.is_timeout = True
self.semaphore.release() # 超时后释放信号量
current_app.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
if not self.stopped.wait(self.timeout):
self.is_timeout = True
self.semaphore.release() # 超时后释放信号量
self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。")
def start(self):
self.thread.start()
def stop(self):
self.stopped.set()
self.thread.join()
class ConnectionLimiter:
def __init__(self, max_connections=1):
def __init__(self, max_connections=10):
self.semaphore = threading.Semaphore(max_connections)
def require_connection_limit(timeout=900):
@ -33,55 +42,72 @@ def require_connection_limit(timeout=900):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# 统一获取 logger 对象
logger_to_use = getattr(g, 'logger', logger)
blueprint = request.blueprint
limiter = current_app.connection_limiters.get(blueprint, current_app.connection_limiters.get('default'))
limiter = current_app.connection_limiters.get(
blueprint, current_app.connection_limiters.get('default'))
if limiter is None:
current_app.logger.error(f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter并且未设置默认限流器。")
logger_to_use.error(
f"未为蓝图 '{blueprint}' 配置 ConnectionLimiter并且未设置默认限流器。")
return jsonify({'error': '服务器配置错误'}), 500
acquired = limiter.semaphore.acquire(blocking=True)
if not acquired:
current_app.logger.warning("并发连接过多")
logger_to_use.warning("并发连接过多")
return jsonify({'error': '并发连接过多'}), 429
# 启动执行超时监控器
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore)
monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore, logger_to_use)
monitor.start()
# 将限流器和监控器存储在 g 中,以便在 teardown_request 中释放信号量
g.limiter = limiter
g.monitor = monitor
try:
result = f(*args, **kwargs)
if isinstance(result, Response):
# 对于普通的 Response 对象,不需要额外处理
# 对于 Response 对象,使用 call_on_close 释放信号量
def release_semaphore():
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Response Object 正确释放")
result.call_on_close(release_semaphore)
return result
elif hasattr(result, '__iter__'):
# 如果返回的是生成器,按原有逻辑处理
elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)):
# 处理生成器响应
@stream_with_context
def generator_wrapper():
try:
for item in result:
if monitor.is_timeout:
current_app.logger.error("请求执行时间超过限制并被终止。")
logger_to_use.error("请求执行时间超过限制并被终止。")
break
yield item
except GeneratorExit:
# 客户端断开连接
logger_to_use.info("客户端断开连接")
finally:
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("在生成器的 finally 中释放信号量并停止监控器")
return Response(generator_wrapper(), mimetype='text/event-stream')
else:
# 对于其他类型的响应,直接返回
# 对于其他类型的响应,返回前释放信号量
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.info("Semaphore released for non-Response, non-generator result")
return result
except Exception as e:
limiter.semaphore.release()
current_app.logger.error(f"路由处理异常: {e}")
if not monitor.is_timeout:
limiter.semaphore.release()
monitor.stop()
logger_to_use.error(f"路由处理异常: {e}")
return jsonify({'error': '内部服务器错误'}), 500
return wrapped

View File

@ -5,23 +5,18 @@ from flask import Blueprint, request, jsonify, Response, stream_with_context, g
import json
import os
from flask_app.main.download import download_file
from flask_app.general.post_processing import outer_post_processing
from flask_app.general.接口_技术偏离表 import get_tech_and_business_deviation
from flask_app.routes.utils import generate_deviation_response, validate_request, validate_and_setup_logger
from flask_app.ConnectionLimiter import require_connection_limit
get_deviation_bp = Blueprint('get_deviation', __name__)
@get_deviation_bp.route('/get_deviation', methods=['POST'])
@require_connection_limit(timeout=700)
@require_connection_limit(timeout=720)
@validate_and_setup_logger
def get_deviation():
logger = g.logger
unique_id = g.unique_id
validation = validate_request()
if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str):
file_url, zb_type = validation
else:
return validation # 错误响应
file_url = g.file_url
zb_type = g.zb_type
try:
logger.info("开始解析 URL: " + file_url)

View File

@ -7,17 +7,17 @@ from flask import Blueprint, request, jsonify, Response, g
from flask_app.ConnectionLimiter import require_connection_limit
from flask_app.main.download import download_file
from flask_app.general.接口_小解析 import little_parse_main
from flask_app.routes.utils import validate_request, validate_and_setup_logger
from flask_app.routes.utils import validate_and_setup_logger
little_zbparse_bp = Blueprint('little_zbparse', __name__)
@little_zbparse_bp.route('/little_zbparse', methods=['POST'])
@require_connection_limit(timeout=300)
@validate_and_setup_logger
@require_connection_limit(timeout=300)
def little_zbparse():
logger = g.logger
file_url, zb_type = validate_request()
file_url = g.file_url
zb_type = g.zb_type
if isinstance(file_url, tuple): # 检查是否为错误响应
return file_url

View File

@ -15,19 +15,16 @@ from flask_app.ConnectionLimiter import require_connection_limit
upload_bp = Blueprint('upload', __name__)
@upload_bp.route('/upload', methods=['POST'])
@require_connection_limit(timeout=800)
@validate_and_setup_logger
@require_connection_limit(timeout=720)
def zbparse():
logger = g.logger
try:
logger.info("大解析开始!!!")
received_data = request.get_json()
logger.info("Received JSON data: " + str(received_data))
validation = validate_request()
if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str):
file_url, zb_type = validation
else:
return validation # 错误响应
file_url = g.file_url
zb_type = g.zb_type
try:
logger.info("starting parsing url:" + file_url)
return process_and_stream(file_url, zb_type)

View File

@ -2,7 +2,7 @@
import json
from functools import wraps
from flask import request, jsonify, current_app
from flask import request, jsonify, current_app, g
from flask_app.logger_setup import create_logger
@ -105,7 +105,9 @@ def validate_and_setup_logger(f):
# 创建 logger 和 output_folder
create_logger(current_app, subfolder)
# 将验证后的数据存储在 g 对象中
g.file_url = file_url
g.zb_type = zb_type
return f(*args, **kwargs)
else:
# 验证失败,返回错误响应

View File

@ -35,10 +35,6 @@ def create_app():
@app.teardown_request
def teardown_request(exception):
limiter = getattr(g, 'limiter', None)
monitor = getattr(g, 'monitor', None)
if limiter and not monitor.is_timeout:
limiter.semaphore.release()
output_folder = getattr(g, 'output_folder', None)
if output_folder:
# 执行与output_folder相关的清理操作例如删除临时文件
@ -46,6 +42,7 @@ def create_app():
logger.info(f"正在清理输出文件夹: {output_folder}")
file_ids=read_file_ids(output_folder)
delete_file_by_ids(file_ids)
logger.info("清理完毕!")
return app
#TODO:培训要求、总体要求、进度要求、'建设要求'到技术要求中,归类到其他要求中