# flask_app/ConnectionLimiter.py import threading from functools import wraps class ConnectionLimiter: def __init__(self, max_connections=10): self.semaphore = threading.Semaphore(max_connections) def limit_connections(self, f): """装饰器:限制并发连接数""" @wraps(f) def wrapped(*args, **kwargs): self.semaphore.acquire() try: return f(*args, **kwargs) finally: self.semaphore.release() return wrapped