zbparse/flask_app/test_case/test_函数并发限制.py

73 lines
2.3 KiB
Python
Raw Normal View History

2024-12-05 16:53:11 +08:00
import time
from ratelimit import limits, sleep_and_retry
from threading import Thread
from functools import wraps
# 定义共享的限流器每秒最多调用4次
@sleep_and_retry
@limits(calls=4, period=1)
def rate_limiter():
pass # 此函数仅用于限流控制,不执行任何操作
2024-12-09 17:38:01 +08:00
# 创建一个共享的装饰器,在被装饰函数 (func) 执行之前,先调用 rate_limiter()。
2024-12-05 16:53:11 +08:00
def shared_rate_limit(func):
@wraps(func)
def wrapper(*args, **kwargs):
rate_limiter() # 应用共享的限流器
return func(*args, **kwargs)
return wrapper
@shared_rate_limit
def limited_test_func(counter, thread_name):
print(f"Thread {thread_name} - limited_test_func called {counter}")
return f"Thread {thread_name} - limited_test_func result {counter}"
@shared_rate_limit
def another_limited_func(counter, thread_name):
print(f"Thread {thread_name} - another_limited_func called {counter}")
return f"Thread {thread_name} - another_limited_func result {counter}"
def thread_function(thread_name):
for i in range(1, 21): # 每个线程序号从1到20
# 先调用 limited_test_func
try:
result = limited_test_func(counter=i, thread_name=thread_name)
print(result)
except Exception as e:
print(f"Thread {thread_name} - limited_test_func Error: {e}")
print("超时")
# 然后调用 another_limited_func
try:
result = another_limited_func(counter=i, thread_name=thread_name)
print(result)
except Exception as e:
print(f"Thread {thread_name} - another_limited_func Error: {e}")
print("超时")
#1s执行4次每个线程执行20*2次5个线程。预计耗时50s
if __name__ == "__main__":
# 定义多个线程每个线程负责调用两个函数1到20的数字
threads = []
num_threads = 5 # 启动5个线程
start_time = time.time()
for thread_id in range(num_threads):
thread_name = f"Thread-{thread_id + 1}"
thread = Thread(target=thread_function, args=(thread_name,))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
end_time = time.time()
print("所有线程完成。")
print("总共耗时: {:.2f}".format(end_time - start_time))