From e89f29eeaab8045eb599a982ede93f08e2fd0d2d Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Thu, 13 Feb 2025 22:22:04 +0800 Subject: [PATCH] 2.13 --- README.md | 63 ++++++++- docker-compose.yml | 4 +- flask_app/ConnectionLimiter.py | 126 ++++++++++++++++-- flask_app/general/截取pdf_main.py | 4 +- .../general/投标人须知正文提取指定内容.py | 4 +- flask_app/general/无效标和废标公共代码.py | 1 - flask_app/routes/get_deviation.py | 6 +- flask_app/routes/little_zbparse.py | 5 +- flask_app/routes/upload.py | 5 +- flask_app/start_up.py | 36 ++--- 10 files changed, 206 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index f8df200..74c7a76 100644 --- a/README.md +++ b/README.md @@ -14,15 +14,15 @@ git clone地址:http://47.98.59.178:3000/zy123/zbparse.git ## 项目结构: -![1](md_files/1.png) +![1](C:\Users\Administrator\Desktop\md_files\1.png) .env存放一些密钥(大模型、textin等),它是gitignore忽略了,因此在服务器上git pull项目的时候,这个文件不会更新(因为密钥比较重要),需要手动维护服务器相应位置的.env。 -**如何更新服务器上的版本:** +#### **如何更新服务器上的版本:** 1. 进入项目文件夹 -![1](md_files/0.png) +![1](C:\Users\Administrator\Desktop\md_files\0.png) **注意:**需要确认.env是否存在在服务器,默认是隐藏的 输入cat .env @@ -46,7 +46,7 @@ requirements.txt一般无需变动,除非代码中使用了新的库,也要 -**如何本地启动本项目:** +#### **如何本地启动本项目:** 1. requirements.txt里的环境要配好 conda create -n zbparse python=3.8 @@ -55,9 +55,9 @@ requirements.txt一般无需变动,除非代码中使用了新的库,也要 2. .env环境配好 (一般不需要在电脑环境变量中额外配置了,但是要在Pycharm中安装插件,使得项目能将env中的环境变量配置到系统环境变量中!!!) 3. 点击下拉框,Edit configurations - ![1](md_files/11.png) + ![1](C:\Users\Administrator\Desktop\md_files\11.png) -​ 设置run_serve.py为启动脚本![1](md_files/10.png) +​ 设置run_serve.py为启动脚本![1](C:\Users\Administrator\Desktop\md_files\10.png) ​ 注意这里的working directory要设置到最外层文件夹,而不是flask_app!!! 4. postman打post请求测试: @@ -80,6 +80,57 @@ bid-assistance/test 里面找个文件的url,推荐'094定稿-湖北工业大 +#### 清理服务器上的文件夹 + +1. 编写shell文件,sudo vim clean_dir.sh + 命名为clean_dir.sh + +``` +#!/bin/bash + +# 需要清理的 output 目录路径 +ROOT_DIR="/home/Z/zbparse_output_dev" + +# 检查目标目录是否存在 +if [ ! -d "$ROOT_DIR" ]; then + echo "目录 $ROOT_DIR 不存在!" + exit 1 +fi + +echo "开始清理 $ROOT_DIR 下超过 7 天的目录..." +echo "以下目录将被删除:" + +# -mindepth 2 表示从第二层目录开始查找,防止删除 output 下的直接子目录(如 output1、output2) +# -depth 采用深度优先遍历,确保先处理子目录再处理父目录 +find "$ROOT_DIR" -mindepth 2 -depth -type d -mtime +7 -print -exec rm -rf {} \; + +echo "清理完成。" + +``` + +2. 添加权限。 + +``` +sudo chmod +x ./clean_dir.sh +``` + +3. 执行 + +``` +sudo ./clean_dir.sh +``` + +4. 每天零点清理 + +``` +sudo crontab -e +在里面添加: + +10 0 * * * /home/Z/clean_dir.sh +``` + + + ## flask_app结构介绍 ### 项目中做限制的地方 diff --git a/docker-compose.yml b/docker-compose.yml index 92cb7ab..ca6ace6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,13 +11,13 @@ services: # - .:/flask_project # 将当前目录挂载到容器的 /flask_project 目录(可选,便于开发时实时更新代码) - /home/Z/zbparse_output_dev:/flask_project/flask_app/static/output # 额外的数据卷挂载 restart: unless-stopped # 容器退出时自动重启,除非明确停止 - mem_limit: "12g" # 容器最大可使用内存为8GB + mem_limit: "16g" # 容器最大可使用内存为16GB mem_reservation: "4g" # 容器保证可使用内存为4GB cpus: 4.0 # 限制容器使用2个CPU核心 security_opt: - seccomp:unconfined # 如果单独服务器的话,注释以下,不做限制: -# mem_limit: "12g" # 容器最大可使用内存为8GB +# mem_limit: "16g" # 容器最大可使用内存为8GB # mem_reservation: "4g" # 容器保证可使用内存为4GB # cpus: 4.0 # 限制容器使用2个CPU核心 \ No newline at end of file diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py index 64a8cce..3f57a90 100644 --- a/flask_app/ConnectionLimiter.py +++ b/flask_app/ConnectionLimiter.py @@ -8,23 +8,56 @@ from flask import jsonify, request, Response, stream_with_context, current_app, # 配置模块级别的日志记录器 logger = logging.getLogger(__name__) +# class ExecutionTimeoutMonitor: +# """监控请求执行时间,超时后释放信号量""" +# def __init__(self, timeout, semaphore, logger): +# self.timeout = timeout +# self.semaphore = semaphore +# self.is_timeout = False +# self.stopped = threading.Event() +# self.logger = logger +# self.thread = threading.Thread(target=self._monitor) +# self.thread.daemon = True +# +# def _monitor(self): +# """等待指定时间后标记为超时并释放信号量""" +# if not self.stopped.wait(self.timeout): +# self.is_timeout = True +# self.semaphore.release() # 超时后释放信号量 +# self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") +# +# def start(self): +# self.thread.start() +# +# def stop(self): +# self.stopped.set() +# self.thread.join() + +class ConnectionLimiter: + def __init__(self, max_connections=10): + self.semaphore = threading.Semaphore(max_connections) + class ExecutionTimeoutMonitor: - """监控请求执行时间,超时后释放信号量""" - def __init__(self, timeout, semaphore, logger): + """ + 监控请求执行时间,超时后仅做标记(is_timeout = True), + 并记录日志;不再依赖信号量进行释放操作。 + """ + def __init__(self, timeout, logger): self.timeout = timeout - self.semaphore = semaphore + self.logger = logger self.is_timeout = False self.stopped = threading.Event() - self.logger = logger self.thread = threading.Thread(target=self._monitor) self.thread.daemon = True def _monitor(self): - """等待指定时间后标记为超时并释放信号量""" + """ + 等待指定时间后标记为超时(is_timeout = True)。 + 仅做日志输出,不真正强杀后台线程。 + """ if not self.stopped.wait(self.timeout): self.is_timeout = True - self.semaphore.release() # 超时后释放信号量 - self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被终止。") + self.logger.error(f"请求执行时间超过 {self.timeout} 秒并被标记为终止。") def start(self): self.thread.start() @@ -33,10 +66,6 @@ class ExecutionTimeoutMonitor: self.stopped.set() self.thread.join() -class ConnectionLimiter: - def __init__(self, max_connections=10): - self.semaphore = threading.Semaphore(max_connections) - #当请求超时时,连接资源(即信号量)会被释放,但是正在执行的程序不会被终止。这意味着虽然新的请求可以继续被接受和处理,但超时的请求仍然会在后台继续运行,直到完成或遇到其他中断条件。 def require_connection_limit(timeout=900): """装饰器:确保路由使用连接限制,并监控请求执行时间""" @@ -113,3 +142,78 @@ def require_connection_limit(timeout=900): return wrapped return decorator + + +def require_execution_timeout(timeout=1800): + """装饰器:仅监控请求执行时间,超过即标记超时,并在可能的地方中断输出。""" + def decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + # 获取或使用默认 logger + logger_to_use = getattr(g, 'logger', None) + if logger_to_use is None: + # 如果你有全局 logger,就改成自己的全局 logger + import logging + logger_to_use = logging.getLogger("default") + + # 启动执行超时监控器 + monitor = ExecutionTimeoutMonitor(timeout, logger_to_use) + monitor.start() + + try: + result = f(*args, **kwargs) + + # 如果是 Flask 的 Response 对象 + if isinstance(result, Response): + # 在关闭响应时检查是否超时,并停止监控 + def on_close(): + if not monitor.is_timeout: + monitor.stop() + logger_to_use.info("Response 正常结束,停止监控器。") + else: + logger_to_use.warning("Response 结束时监控器已超时标记。") + + result.call_on_close(on_close) + return result + + # 如果返回的是一个生成器(流式返回) + elif hasattr(result, '__iter__') and not isinstance(result, (str, bytes)): + @stream_with_context + def generator_wrapper(): + try: + for item in result: + # 如果已超时,则中断输出 + if monitor.is_timeout: + logger_to_use.error("请求执行已超时,中断输出。") + break + yield item + except GeneratorExit: + logger_to_use.info("客户端断开连接。") + finally: + if not monitor.is_timeout: + monitor.stop() + logger_to_use.info("在生成器 finally 中停止监控器。") + else: + logger_to_use.warning("生成器 finally 时已超时标记。") + + return Response(generator_wrapper(), mimetype='text/event-stream') + + # 如果返回的是普通数据(str / dict / jsonify等) + else: + # 在函数返回前判断一下是否超时 + if not monitor.is_timeout: + monitor.stop() + logger_to_use.info("普通返回,停止监控器。") + else: + logger_to_use.warning("普通返回时已超时标记。") + return result + + except Exception as e: + if not monitor.is_timeout: + monitor.stop() + logger_to_use.error(f"路由处理异常: {e}", exc_info=True) + return jsonify({'error': '内部服务器错误'}), 500 + + return wrapped + return decorator + diff --git a/flask_app/general/截取pdf_main.py b/flask_app/general/截取pdf_main.py index 2578b6b..28f1350 100644 --- a/flask_app/general/截取pdf_main.py +++ b/flask_app/general/截取pdf_main.py @@ -118,10 +118,10 @@ if __name__ == "__main__": # input_path = r"C:\Users\Administrator\Desktop\new招标文件\工程标" # pdf_path=r"C:\Users\Administrator\Desktop\货物标\zbfiles\094定稿-湖北工业大学轻武器模拟射击设备采购项目招标文件.pdf" # pdf_path = r"C:\Users\Administrator\Desktop\货物标\zbfiles\zbtest4_evaluation_method.pdf" - pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\0aade992-dc8e-4690-9b09-78630a34c6e7\ztbfile.pdf" + pdf_path=r"C:\Users\Administrator\Desktop\fsdownload\00530231-d5a1-44a3-8639-6d660d9ab509\ztbfile.pdf" # pdf_path=r"C:\Users\Administrator\Desktop\货物标\zbfiles\zbtest4_evaluation_method.pdf" # input_path=r"C:\Users\Administrator\Desktop\招标文件\招标test文件夹\zbtest8.pdf" - output_folder = r"C:\Users\Administrator\Desktop\fsdownload\0aade992-dc8e-4690-9b09-78630a34c6e7" + output_folder = r"C:\Users\Administrator\Desktop\fsdownload\00530231-d5a1-44a3-8639-6d660d9ab509\tmp" # selections = [1, 4] # 仅处理 selection 4、1 # selections = [1, 2, 3, 5] # files = truncate_pdf_multiple(pdf_path, output_folder, logger, 'goods', selections) #engineering diff --git a/flask_app/general/投标人须知正文提取指定内容.py b/flask_app/general/投标人须知正文提取指定内容.py index b295218..1ce7881 100644 --- a/flask_app/general/投标人须知正文提取指定内容.py +++ b/flask_app/general/投标人须知正文提取指定内容.py @@ -431,8 +431,8 @@ def extract_from_notice(merged_baseinfo_path, clause_path, type): if __name__ == "__main__": # file_path = 'C:\\Users\\Administrator\\Desktop\\fsdownload\\3bffaa84-2434-4bd0-a8ee-5c234ccd7fa0\\clause1.json' - merged_baseinfo_path=r"C:\Users\Administrator\Desktop\新建文件夹 (3)\废标\2025-湖北-通羊镇港口村人饮项目谈判文件.docx" - clause_path=r"D:\flask_project\flask_app\static\output\output1\ce679bd7-a17a-4a95-bfc9-3801bd4a86e4\tmp\clause1.json" + merged_baseinfo_path=r"C:\Users\Administrator\Desktop\fsdownload\86f5a290-af72-46af-a0da-3cfac19ed1f4\invalid_del.docx" + clause_path=r"" try: res = extract_from_notice(merged_baseinfo_path,"", 2) # 可以改变此处的 type 参数测试不同的场景 #1: ["投标", "投标文件", "响应文件"], 2:开评定标 res2 = json.dumps(res, ensure_ascii=False, indent=4) diff --git a/flask_app/general/无效标和废标公共代码.py b/flask_app/general/无效标和废标公共代码.py index e5fc5a3..9aa2c64 100644 --- a/flask_app/general/无效标和废标公共代码.py +++ b/flask_app/general/无效标和废标公共代码.py @@ -680,7 +680,6 @@ def combine_find_invalid(invalid_docpath, output_dir): for keywords, user_query, output_file, result_key in queries: future = executor.submit(handle_query, invalid_docpath, user_query, output_file, result_key, keywords) futures.append((future, result_key)) # 保持顺序 - time.sleep(0.5) # 暂停0.5秒后再提交下一个任务 for future, result_key in futures: try: diff --git a/flask_app/routes/get_deviation.py b/flask_app/routes/get_deviation.py index 4ec5ef5..bb46ca2 100644 --- a/flask_app/routes/get_deviation.py +++ b/flask_app/routes/get_deviation.py @@ -7,12 +7,14 @@ from flask_app.general.format_change import download_file from flask_app.routes.偏离表数据解析main import get_tech_and_business_deviation from flask_app.routes.utils import generate_deviation_response, validate_and_setup_logger, create_response, sse_format, \ log_error_unique_id -from flask_app.ConnectionLimiter import require_connection_limit +from flask_app.ConnectionLimiter import require_connection_limit, require_execution_timeout + get_deviation_bp = Blueprint('get_deviation', __name__) @get_deviation_bp.route('/get_deviation', methods=['POST']) @validate_and_setup_logger -@require_connection_limit(timeout=1800) +# @require_connection_limit(timeout=1800) +@require_execution_timeout(timeout=1800) def get_deviation(): #提供商务、技术偏离的数据 logger = g.logger unique_id = g.unique_id diff --git a/flask_app/routes/little_zbparse.py b/flask_app/routes/little_zbparse.py index 14fb991..521a260 100644 --- a/flask_app/routes/little_zbparse.py +++ b/flask_app/routes/little_zbparse.py @@ -4,7 +4,7 @@ import json import os from flask import Blueprint, g -from flask_app.ConnectionLimiter import require_connection_limit +from flask_app.ConnectionLimiter import require_connection_limit, require_execution_timeout from flask_app.general.format_change import download_file from flask_app.routes.小解析main import little_parse_main from flask_app.routes.utils import validate_and_setup_logger, create_response_normal, log_error_unique_id @@ -12,7 +12,8 @@ from flask_app.routes.utils import validate_and_setup_logger, create_response_no little_zbparse_bp = Blueprint('little_zbparse', __name__) @little_zbparse_bp.route('/little_zbparse', methods=['POST']) @validate_and_setup_logger -@require_connection_limit(timeout=300) +# @require_connection_limit(timeout=300) +@require_execution_timeout(timeout=300) def little_zbparse(): #小解析 logger = g.logger file_url = g.file_url diff --git a/flask_app/routes/upload.py b/flask_app/routes/upload.py index 52a9b8d..cf08d46 100644 --- a/flask_app/routes/upload.py +++ b/flask_app/routes/upload.py @@ -9,12 +9,13 @@ from flask_app.routes.货物标解析main import goods_bid_main from flask_app.general.post_processing import outer_post_processing from flask_app.routes.utils import generate_deviation_response, validate_and_setup_logger, create_response, sse_format, \ log_error_unique_id -from flask_app.ConnectionLimiter import require_connection_limit +from flask_app.ConnectionLimiter import require_connection_limit, require_execution_timeout upload_bp = Blueprint('upload', __name__) @upload_bp.route('/upload', methods=['POST']) @validate_and_setup_logger -@require_connection_limit(timeout=1800) +# @require_connection_limit(timeout=1800) +@require_execution_timeout(timeout=1800) def zbparse(): #大解析 logger = g.logger try: diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 86c129e..ae18e77 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -28,25 +28,25 @@ def create_app(): app.register_blueprint(upload_bp) app.register_blueprint(test_zbparse_bp) app.register_blueprint(judge_zbfile_bp) - app.connection_limiters['upload'] = ConnectionLimiter(max_connections=100) - app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=100) - app.connection_limiters['default'] = ConnectionLimiter(max_connections=100) - app.connection_limiters['judge_zbfile'] = ConnectionLimiter(max_connections=100) + # app.connection_limiters['upload'] = ConnectionLimiter(max_connections=100) + # app.connection_limiters['get_deviation'] = ConnectionLimiter(max_connections=100) + # app.connection_limiters['default'] = ConnectionLimiter(max_connections=100) + # app.connection_limiters['judge_zbfile'] = ConnectionLimiter(max_connections=100) - @app.teardown_request - def teardown_request(exception): - # 接口请求之后都会执行该代码,做一些清理工作 - output_folder = getattr(g, 'output_folder', None) - if output_folder: - # 执行与output_folder相关的清理操作(例如删除临时文件) - logger = g.logger # 使用 app 的 logger - logger.info(f"正在清理输出文件夹: {output_folder}") - file_ids = read_file_ids(output_folder) - failed_file_ids = delete_file_by_ids(file_ids) - if failed_file_ids: - logger.error(f"以下文件删除失败: {failed_file_ids}") - else: - logger.info("清理完毕!") + # @app.teardown_request + # def teardown_request(exception): + # # 接口请求之后都会执行该代码,做一些清理工作 + # output_folder = getattr(g, 'output_folder', None) + # if output_folder: + # # 执行与output_folder相关的清理操作(例如删除临时文件) + # logger = g.logger # 使用 app 的 logger + # logger.info(f"正在清理输出文件夹: {output_folder}") + # file_ids = read_file_ids(output_folder) + # failed_file_ids = delete_file_by_ids(file_ids) + # if failed_file_ids: + # logger.error(f"以下文件删除失败: {failed_file_ids}") + # else: + # logger.info("清理完毕!") return app