zbparse/flask_app/test_case/test_函数并发限制.py

51 lines
1.9 KiB
Python
Raw Permalink Normal View History

2024-12-05 16:53:11 +08:00
import time
import concurrent.futures
2024-12-05 16:53:11 +08:00
from ratelimit import limits, sleep_and_retry
from functools import wraps
# 共享限流器:每秒最多允许 4 次调用
2024-12-05 16:53:11 +08:00
@sleep_and_retry
@limits(calls=4, period=1)
def rate_limiter():
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[限流器] {current_time} - Rate limiter invoked")
pass # 此函数仅用于限流控制
2024-12-05 16:53:11 +08:00
# 创建一个共享装饰器,在被装饰函数执行之前先调用 rate_limiter()
2024-12-05 16:53:11 +08:00
def shared_rate_limit(func):
@wraps(func)
def wrapper(*args, **kwargs):
rate_limiter() # 先调用限流器
2024-12-05 16:53:11 +08:00
return func(*args, **kwargs)
return wrapper
# 被装饰的测试函数,每次调用会输出调用编号和当前时间
2024-12-05 16:53:11 +08:00
@shared_rate_limit
def test_function(index):
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f"[测试函数] {current_time} - Test function {index} executed")
return current_time
2024-12-05 16:53:11 +08:00
def main():
2024-12-05 16:53:11 +08:00
start_time = time.time()
print("开始测试:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
num_calls = 100
2024-12-05 16:53:11 +08:00
# 使用线程池模拟并发执行 100 次调用
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(test_function, i) for i in range(num_calls)]
# 等待所有任务完成
results = [future.result() for future in concurrent.futures.as_completed(futures)]
2024-12-05 16:53:11 +08:00
end_time = time.time()
duration = end_time - start_time
print("\n测试结束")
print("开始时间:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
print("结束时间:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)))
print(f"总执行时间: {duration:.2f}")
print("预期:由于限流规则每秒执行 4 次100 个调用预计需要约 25 秒。")
if __name__ == '__main__':
main()