diff --git a/flask_app/ConnectionLimiter.py b/flask_app/ConnectionLimiter.py new file mode 100644 index 0000000..c7ddfb8 --- /dev/null +++ b/flask_app/ConnectionLimiter.py @@ -0,0 +1,31 @@ +from threading import Semaphore +from functools import wraps +from flask import jsonify, current_app + + +class ConnectionLimiter: + _instance = None + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, max_connections=10): + if not hasattr(self, 'semaphore'): + self.semaphore = Semaphore(max_connections) + + def limit_connections(self, f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not self.semaphore.acquire(blocking=False): + return jsonify({ + 'error': 'Server is busy. Maximum number of concurrent connections reached.', + 'code': 429 + }), 429 + try: + return f(*args, **kwargs) + finally: + self.semaphore.release() + + return decorated_function \ No newline at end of file diff --git a/flask_app/general/多线程提问.py b/flask_app/general/多线程提问.py index 0528f4a..ab93dbe 100644 --- a/flask_app/general/多线程提问.py +++ b/flask_app/general/多线程提问.py @@ -286,6 +286,7 @@ def multi_threading(queries, knowledge_name="", file_id="", llm_type=1): except Exception as exc: print(f"Query {index} generated an exception: {exc}") retry_counts[index] += 1 # 增加重试计数 + #Query 0 generated an exception: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-thousand-questions-metering-and-billing.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}, 'request_id': 'de10e2e9-78c2-978f-8801-862ffb0892e9'} if retry_counts[index] <= max_retries: print(f"Retrying query {index} (attempt {retry_counts[index]})...") print("重试的问题:" + queries[index]) diff --git a/flask_app/routes/upload.py b/flask_app/routes/upload.py index 5b74a5d..c1c9ab8 100644 --- a/flask_app/routes/upload.py +++ b/flask_app/routes/upload.py @@ -1,6 +1,7 @@ # flask_app/routes/upload.py +from functools import wraps -from flask import Blueprint, request, jsonify, Response, stream_with_context, g +from flask import Blueprint, request, jsonify, Response, stream_with_context, g, current_app import json import os import time @@ -14,25 +15,48 @@ from flask_app.routes.utils import generate_deviation_response, validate_request from flask_app.logger_setup import CSTFormatter upload_bp = Blueprint('upload', __name__) + +def require_connection_limit(): + """装饰器:用于确保路由使用连接限制""" + def decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + limiter = current_app.connection_limiter + return limiter.limit_connections(f)(*args, **kwargs) + return wrapped + return decorator @upload_bp.route('/upload', methods=['POST']) def zbparse(): - logger = g.logger - logger.info("zbparse start!!!") - received_data = request.get_json() - logger.info("Received JSON data: " + str(received_data)) - validation = validate_request() + # 获取当前应用的 connection_limiter + limiter = current_app.connection_limiter - if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str): - file_url, zb_type = validation - else: - return validation # 错误响应 + if not limiter.semaphore.acquire(blocking=False): + return jsonify({ + 'error': 'Server is busy. Maximum number of concurrent connections reached.', + 'code': 429 + }), 429 try: - logger.info("starting parsing url:" + file_url) - return Response(stream_with_context(process_and_stream(file_url, zb_type)), content_type='text/event-stream') - except Exception as e: - logger.error('Exception occurred: ' + str(e)) - return jsonify({'error': str(e)}), 500 + logger = g.logger + logger.info("zbparse start!!!") + received_data = request.get_json() + logger.info("Received JSON data: " + str(received_data)) + validation = validate_request() + + if isinstance(validation, tuple) and len(validation) == 2 and isinstance(validation[0], str): + file_url, zb_type = validation + else: + return validation # 错误响应 + + try: + logger.info("starting parsing url:" + file_url) + return Response(stream_with_context(process_and_stream(file_url, zb_type)), + content_type='text/event-stream') + except Exception as e: + logger.error('Exception occurred: ' + str(e)) + return jsonify({'error': str(e)}), 500 + finally: + limiter.semaphore.release() def process_and_stream(file_url, zb_type): """ diff --git a/flask_app/start_up.py b/flask_app/start_up.py index 494a5fa..58d061f 100644 --- a/flask_app/start_up.py +++ b/flask_app/start_up.py @@ -3,15 +3,19 @@ import logging from flask import Flask, request - +from flask_app.ConnectionLimiter import ConnectionLimiter from flask_app.logger_setup import CSTFormatter, create_logger from flask_app.routes.get_deviation import get_deviation_bp from flask_app.routes.little_zbparse import little_zbparse_bp from flask_app.routes.upload import upload_bp from flask_app.routes.test_zbparse import test_zbparse_bp +class FlaskAppWithLimiter(Flask): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.connection_limiter = ConnectionLimiter(max_connections=10) def create_app(): - app = Flask(__name__) + app = FlaskAppWithLimiter(__name__) # 设置日志的全局配置(如果需要) handler = logging.StreamHandler() diff --git a/flask_app/货物标/商务服务其他要求提取.py b/flask_app/货物标/商务服务其他要求提取.py index b381820..a557dd7 100644 --- a/flask_app/货物标/商务服务其他要求提取.py +++ b/flask_app/货物标/商务服务其他要求提取.py @@ -3,7 +3,7 @@ import json import re from PyPDF2 import PdfReader -from flask_app.general.doubao import read_txt_to_string +from flask_app.general.doubao import read_txt_to_string, pdf2txt from flask_app.general.json_utils import combine_json_results,clean_json_string from flask_app.general.通义千问long import upload_file,qianwen_long_stream from flask_app.货物标.截取pdf货物标版 import extract_common_header, clean_page_content @@ -246,11 +246,14 @@ def merge_requirements(input_dict): final_dict[key] = final_dict[key].strip() return final_dict +#,"总\s*体\s*要\s*求","进\s*度\s*要\s*求","培\s*训\s*要\s*求" def get_business_requirements(procurement_path,processed_filepath): file_id=upload_file(procurement_path) - required_keys = ["技\s*术\s*要\s*求","商\s*务\s*要\s*求", "服\s*务\s*要\s*求", "其\s*他\s*要\s*求","总\s*体\s*要\s*求","进\s*度\s*要\s*求","培\s*训\s*要\s*求"] + required_keys = ["技\s*术\s*要\s*求","商\s*务\s*要\s*求", "服\s*务\s*要\s*求", "其\s*他\s*要\s*求"] contained_keys=find_exists(procurement_path,required_keys) print(contained_keys) + if not contained_keys: + return {} # queries = generate_queries(truncate_file, contained_keys) user_query=generate_user_query_template(contained_keys,processed_filepath) # print(user_query) @@ -264,8 +267,9 @@ def get_business_requirements(procurement_path,processed_filepath): #TODO:改为先判断,再摘取 if __name__ == "__main__": # truncate_file = "C:\\Users\\Administrator\\Desktop\\fsdownload\\e4be098d-b378-4126-9c32-a742b237b3b1\\ztbfile_procurement.docx" - truncate_file=r"C:\Users\Administrator\Desktop\fsdownload\5901b181-b55f-4107-9f30-c85d607b1fa0\ztbfile_procurement.pdf" - processed_filepath="" + truncate_file=r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a\ztbfile_procurement.pdf" + # file_id = upload_file(truncate_file) - res=get_business_requirements(truncate_file,"") + processed_filepath = pdf2txt(truncate_file) + res=get_business_requirements(truncate_file,processed_filepath) print(json.dumps(res, ensure_ascii=False, indent=4)) diff --git a/flask_app/货物标/技术参数要求提取.py b/flask_app/货物标/技术参数要求提取.py index 28951b7..6baf8ff 100644 --- a/flask_app/货物标/技术参数要求提取.py +++ b/flask_app/货物标/技术参数要求提取.py @@ -566,7 +566,7 @@ def test_all_files_in_folder(input_folder, output_folder): if __name__ == "__main__": start_time=time.time() # truncate_file="C:\\Users\\Administrator\\Desktop\\fsdownload\\469d2aee-9024-4993-896e-2ac7322d41b7\\ztbfile_procurement.docx" - truncate_file=r"C:\Users\Administrator\Desktop\货物标\output1\包头市公安支队机动车查验监管系统招标文201907_procurement.pdf" + truncate_file=r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a\ztbfile_procurement.pdf" # invalid_path="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.pdf" # truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile_procurement.docx" # output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp" diff --git a/flask_app/货物标/提取采购需求main.py b/flask_app/货物标/提取采购需求main.py index 9a6801d..2f16eff 100644 --- a/flask_app/货物标/提取采购需求main.py +++ b/flask_app/货物标/提取采购需求main.py @@ -59,12 +59,13 @@ def fetch_procurement_reqs(procurement_path, invalid_path): #TODO:技术要求可以在技术参数之后执行,把完整的技术参数输入,问大模型,除了上述内容还有哪些,这样的话把技术标和其他的区分开。 +#TODO: 094有问题 if __name__ == "__main__": start_time=time.time() output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\货物标output" # file_path="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\2-招标文件(2020年广水市中小学教师办公电脑系统及多媒体“班班通”设备采购安装项目)_procurement.pdf" - procurement_path = r"C:\Users\Administrator\Desktop\fsdownload\5901b181-b55f-4107-9f30-c85d607b1fa0\ztbfile_procurement.pdf" - procurement_docpath=r"C:\Users\Administrator\Desktop\fsdownload\5901b181-b55f-4107-9f30-c85d607b1fa0" + procurement_path = r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a\ztbfile_procurement.pdf" + procurement_docpath=r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a" invalid_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\db79e9e0-830e-442c-8cb6-1d036215f8ff\\ztbfile.pdf" res=fetch_procurement_reqs(procurement_path,invalid_path) print(json.dumps(res, ensure_ascii=False, indent=4))