import os from itertools import cycle import threading from concurrent.futures import ThreadPoolExecutor # 定义三个 API Key 的循环器 api_keys = cycle([1, 2, 3]) # 使用锁保证线程安全 lock = threading.Lock() def test(): # 使用锁保护对 api_keys 的访问 with lock: key = next(api_keys) print(key) return key def main(): results = [] num_calls = 100 # 模拟 100 次并发调用 # 使用 ThreadPoolExecutor 模拟高并发 with ThreadPoolExecutor(max_workers=20) as executor: futures = [executor.submit(test) for _ in range(num_calls)] # 收集所有返回的结果 for future in futures: results.append(future.result()) # 统计各个 API Key 被使用的次数 counts = {1: 0, 2: 0, 3: 0} for key in results: counts[key] += 1 print("调用分布:", counts) if __name__ == '__main__': main()