import time import concurrent.futures from ratelimit import limits, sleep_and_retry from functools import wraps # 共享限流器:每秒最多允许 4 次调用 @sleep_and_retry @limits(calls=4, period=1) def rate_limiter(): current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(f"[限流器] {current_time} - Rate limiter invoked") pass # 此函数仅用于限流控制 # 创建一个共享装饰器,在被装饰函数执行之前先调用 rate_limiter() def shared_rate_limit(func): @wraps(func) def wrapper(*args, **kwargs): rate_limiter() # 先调用限流器 return func(*args, **kwargs) return wrapper # 被装饰的测试函数,每次调用会输出调用编号和当前时间 @shared_rate_limit def test_function(index): current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(f"[测试函数] {current_time} - Test function {index} executed") return current_time def main(): start_time = time.time() print("开始测试:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) num_calls = 100 # 使用线程池模拟并发执行 100 次调用 with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: futures = [executor.submit(test_function, i) for i in range(num_calls)] # 等待所有任务完成 results = [future.result() for future in concurrent.futures.as_completed(futures)] end_time = time.time() duration = end_time - start_time print("\n测试结束") print("开始时间:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))) print("结束时间:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))) print(f"总执行时间: {duration:.2f} 秒") print("预期:由于限流规则每秒执行 4 次,100 个调用预计需要约 25 秒。") if __name__ == '__main__': main()