From 0fc892f81991f5adf9cf06471479f322cab8518f Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Mon, 25 Nov 2024 14:38:58 +0800 Subject: [PATCH] =?UTF-8?q?11.25=20=E8=B6=85=E6=97=B6=E8=AE=BE=E7=BD=AE?= =?UTF-8?q?=E3=80=81=E8=BF=9E=E6=8E=A5=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/ConnectionLimiter.py | 77 +++++++++++++++++++++++----- flask_app/main/截取pdf.py | 2 +- flask_app/routes/upload.py | 30 +++-------- flask_app/routes/utils.py | 35 ++++++++++++- flask_app/start_up.py | 2 +- flask_app/货物标/截取pdf货物标版.py | 11 ++-- flask_app/货物标/技术参数要求提取.py | 2 +- flask_app/货物标/货物标解析main.py | 3 +- 8 files changed, 111 insertions(+), 51 deletions(-) diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py index b1621da..35b141c 100644 --- a/flask_app/ConnectionLimiter.py +++ b/flask_app/ConnectionLimiter.py @@ -1,21 +1,70 @@ -# flask_app/ConnectionLimiter.py import threading +import time from functools import wraps +from flask import current_app, jsonify, stream_with_context, Response +class ExecutionTimeoutMonitor: + """监控请求执行时间,超时后释放信号量""" + def __init__(self, timeout, semaphore): + self.timeout = timeout + self.semaphore = semaphore + self.is_timeout = False + self.thread = threading.Thread(target=self._monitor) + self.thread.daemon = True + + def _monitor(self): + """等待指定时间后标记为超时并释放信号量 + 目前的超时机制是通过 ExecutionTimeoutMonitor 的标志位来通知生成器超时。 + 但是,生成器只有在下一次 yield 时才会检查标志位,因此不会在当前执行的代码段中间强行中断。 + """ + time.sleep(self.timeout) + self.is_timeout = True + self.semaphore.release() # 超时后释放信号量 + current_app.logger.error(f"Request execution exceeded {self.timeout} seconds and was terminated.") + + def start(self): + self.thread.start() + +def require_connection_limit(timeout=900): + """装饰器:确保路由使用连接限制,并监控请求执行时间""" + def decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + limiter = getattr(current_app, 'connection_limiter', None) + if limiter is None: + current_app.logger.error("ConnectionLimiter 未初始化") + return jsonify({'error': 'Server configuration error'}), 500 + + # 阻塞方式获取信号量 + limiter.semaphore.acquire() + try: + # 启动执行超时监控器 + monitor = ExecutionTimeoutMonitor(timeout, limiter.semaphore) + monitor.start() + + generator = f(*args, **kwargs) + + @stream_with_context + def generator_wrapper(): + try: + for item in generator: + if monitor.is_timeout: + # 如果已超时,终止生成器执行 + current_app.logger.error("Request exceeded execution time and was terminated.") + break + yield item + finally: + if not monitor.is_timeout: + limiter.semaphore.release() # 正常结束时释放信号量 + + return Response(generator_wrapper(), mimetype='text/event-stream') + except Exception as e: + limiter.semaphore.release() # 异常时释放信号量 + current_app.logger.error(f"Exception in route: {e}") + return jsonify({'error': 'Internal server error'}), 500 + return wrapped + return decorator class ConnectionLimiter: def __init__(self, max_connections=1): self.semaphore = threading.Semaphore(max_connections) - - def limit_connections(self, f): - """装饰器:限制并发连接数""" - - @wraps(f) - def wrapped(*args, **kwargs): - self.semaphore.acquire() - try: - return f(*args, **kwargs) - finally: - self.semaphore.release() - - return wrapped diff --git a/flask_app/main/截取pdf.py b/flask_app/main/截取pdf.py index 0e083b8..57e0de0 100644 --- a/flask_app/main/截取pdf.py +++ b/flask_app/main/截取pdf.py @@ -517,7 +517,7 @@ def truncate_pdf_multiple(input_path, output_folder, unique_id="123"): base_file_name) if merged_result: truncate_files.append(merged_result) - logger.info(f"merged_baseinfo: 已生成合并文件: {merged_output_path}") + # logger.info(f"merged_baseinfo: 已生成合并文件: {merged_output_path}") else: truncate_files.append("") # 如果 merged_result 未生成,添加空字符串 logger.warning("merged_baseinfo: 未生成合并文件,因为没有找到需要合并的 PDF 文件。") diff --git a/flask_app/routes/upload.py b/flask_app/routes/upload.py index d5ec922..61bcc1c 100644 --- a/flask_app/routes/upload.py +++ b/flask_app/routes/upload.py @@ -1,7 +1,5 @@ # flask_app/routes/upload.py -from functools import wraps - -from flask import Blueprint, request, jsonify, Response, stream_with_context, g, current_app +from flask import Blueprint, request, jsonify, Response, stream_with_context, g import json import os import time @@ -13,41 +11,27 @@ from flask_app.general.post_processing import outer_post_processing from flask_app.general.接口_技术偏离表 import get_tech_and_business_deviation from flask_app.routes.utils import generate_deviation_response, validate_request from flask_app.logger_setup import CSTFormatter +from flask_app.ConnectionLimiter import require_connection_limit + upload_bp = Blueprint('upload', __name__) -def require_connection_limit(): - """装饰器:用于确保路由使用连接限制""" - def decorator(f): - @wraps(f) - def wrapped(*args, **kwargs): - limiter = current_app.connection_limiter - limiter.semaphore.acquire() # 阻塞式获取信号量 - try: - return f(*args, **kwargs) - finally: - limiter.semaphore.release() - return wrapped - return decorator @upload_bp.route('/upload', methods=['POST']) -@require_connection_limit() +@require_connection_limit(timeout=900) def zbparse(): + logger = g.logger try: - logger = g.logger - logger.info("zbparse start!!!") + logger.info("大解析开始!!!") received_data = request.get_json() logger.info("Received JSON data: " + str(received_data)) validation = validate_request() - if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str): file_url, zb_type = validation else: return validation # 错误响应 - try: logger.info("starting parsing url:" + file_url) - return Response(stream_with_context(process_and_stream(file_url, zb_type)), - content_type='text/event-stream') + return process_and_stream(file_url, zb_type) except Exception as e: logger.error('Exception occurred: ' + str(e)) return jsonify({'error': str(e)}), 500 diff --git a/flask_app/routes/utils.py b/flask_app/routes/utils.py index aeb4fcc..10084ff 100644 --- a/flask_app/routes/utils.py +++ b/flask_app/routes/utils.py @@ -1,5 +1,9 @@ import json -from flask import request,jsonify +from functools import wraps + +from flask import request, jsonify, current_app + + def validate_request(): """ 验证请求中的JSON数据。 @@ -48,4 +52,31 @@ def generate_deviation_response(tech_deviation, tech_star_deviation, business_de 'filename': 'shangwu_star_deviation', 'data': json.dumps(business_star_deviation, ensure_ascii=False) } - return tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response \ No newline at end of file + return tech_deviation_response, tech_deviation_star_response, zigefuhe_deviation_response, shangwu_deviation_response, shangwu_star_deviation_response + + +def require_connection_limit(): + """装饰器:确保路由使用连接限制,并正确处理生成器函数""" + def decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + limiter = getattr(current_app, 'connection_limiter', None) + if limiter is None: + current_app.logger.error("ConnectionLimiter 未初始化") + return jsonify({'error': 'Server configuration error'}), 500 + + acquired = limiter.semaphore.acquire(blocking=True) + if not acquired: + return jsonify({ + 'error': 'Server is busy. Please try again later.', + 'code': 503 + }), 503 + + generator = f(*args, **kwargs) + try: + for item in generator: + yield item + finally: + limiter.semaphore.release() + return wrapped + return decorator \ No newline at end of file diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 148bbf2..58d061f 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -13,7 +13,7 @@ from flask_app.routes.test_zbparse import test_zbparse_bp class FlaskAppWithLimiter(Flask): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.connection_limiter = ConnectionLimiter(max_connections=1) + self.connection_limiter = ConnectionLimiter(max_connections=10) def create_app(): app = FlaskAppWithLimiter(__name__) diff --git a/flask_app/货物标/截取pdf货物标版.py b/flask_app/货物标/截取pdf货物标版.py index 4f3ca1d..2cc60ea 100644 --- a/flask_app/货物标/截取pdf货物标版.py +++ b/flask_app/货物标/截取pdf货物标版.py @@ -13,7 +13,6 @@ def get_global_logger(unique_id): return logging.getLogger() # 获取默认的日志器 logger = logging.getLogger(unique_id) return logger -logger = None # fitz库版本 # def extract_common_header(pdf_path): @@ -736,9 +735,7 @@ def process_input(input_path, output_folder, selection, output_suffix): return [''] -def truncate_pdf_multiple(pdf_path, output_folder,unique_id="123"): - global logger - logger = get_global_logger(unique_id) +def truncate_pdf_multiple(pdf_path, output_folder,logger): base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] truncate_files = [] @@ -769,11 +766,12 @@ def truncate_pdf_multiple(pdf_path, output_folder,unique_id="123"): if merged_path: # 合并成功,添加合并后的文件路径 truncate_files.append(merged_path) - logger.info(f"已生成合并文件: {merged_output_path}") + # logger.info(f"已生成合并文件: {merged_output_path}") else: # 合并失败,添加空字符串 truncate_files.append("") logger.warning(f"合并失败,没有生成合并文件 for {pdf_path}") + logger.info("已截取文件路径"+str(truncate_files)) return truncate_files #小解析,只需要前三章内容 @@ -789,7 +787,6 @@ def truncate_pdf_specific_goods(pdf_path, output_folder, selections,unique_id="1 Returns: list: 截取的文件路径列表,包括合并后的文件路径(如果有)。 """ - global logger logger = get_global_logger(unique_id) base_file_name = os.path.splitext(os.path.basename(pdf_path))[0] truncate_files = [] @@ -838,7 +835,7 @@ if __name__ == "__main__": # input_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\a091d107-805d-4e28-b8b2-0c7327737238\\ztbfile.pdf" # output_folder = "C:\\Users\\Administrator\\Desktop\\fsdownload\\a091d107-805d-4e28-b8b2-0c7327737238\\tmp" output_folder=r"C:\Users\Administrator\Desktop\new招标文件\output5" - # files = truncate_pdf_multiple(input_path, output_folder) + files = truncate_pdf_multiple(input_path, output_folder) # selections = [3,5] # files=truncate_pdf_specific_goods(input_path,output_folder,selections) # print(files) diff --git a/flask_app/货物标/技术参数要求提取.py b/flask_app/货物标/技术参数要求提取.py index 6baf8ff..5770137 100644 --- a/flask_app/货物标/技术参数要求提取.py +++ b/flask_app/货物标/技术参数要求提取.py @@ -566,7 +566,7 @@ def test_all_files_in_folder(input_folder, output_folder): if __name__ == "__main__": start_time=time.time() # truncate_file="C:\\Users\\Administrator\\Desktop\\fsdownload\\469d2aee-9024-4993-896e-2ac7322d41b7\\ztbfile_procurement.docx" - truncate_file=r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a\ztbfile_procurement.pdf" + truncate_file=r"D:\flask_project\flask_app\static\output\output1\a91d59a5-d04a-4588-98e4-ddc6e9caf999\ztbfile_procurement.pdf" # invalid_path="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.pdf" # truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile_procurement.docx" # output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp" diff --git a/flask_app/货物标/货物标解析main.py b/flask_app/货物标/货物标解析main.py index 50d930a..db69b4e 100644 --- a/flask_app/货物标/货物标解析main.py +++ b/flask_app/货物标/货物标解析main.py @@ -47,8 +47,7 @@ def preprocess_files(output_folder, file_path, file_type,logger): # # 异步上传知识库 # future_knowledge = executor.submit(addfileToKnowledge, docx_path, "招标解析" + unique_id) # 调用截取PDF多次 - truncate_files = truncate_pdf_multiple(pdf_path, - output_folder) # index: 0->商务技术服务要求 1->评标办法 2->资格审查 3->投标人须知前附表 4->投标人须知正文 + truncate_files = truncate_pdf_multiple(pdf_path, output_folder,logger) # index: 0->商务技术服务要求 1->评标办法 2->资格审查 3->投标人须知前附表 4->投标人须知正文 # 处理各个部分 invalid_path=pdf_path