import json import logging import os import threading import time from itertools import cycle from openai import OpenAI from ratelimit import sleep_and_retry, limits from flask_app.general.llm.qianwen_turbo import qianwen_turbo from flask_app.general.llm.大模型通用函数 import extract_error_details, get_total_tokens #若多账号实现分流,那么就在这里添加不同的API_KEY,这里要求每个模型的TPM都是一样的!!具体API_KEY写在.env文件中 api_keys = cycle([ os.getenv("DASHSCOPE_API_KEY"), # os.getenv("DASHSCOPE_API_KEY_BACKUP1"), # os.getenv("DASHSCOPE_API_KEY_BACKUP2") ]) api_keys_lock = threading.Lock() def get_next_api_key(): with api_keys_lock: return next(api_keys) @sleep_and_retry @limits(calls=7, period=1) # tpm是1000万,稳定下每秒最多调用10次,两个服务器分流就是5次 2.12日:增加了turbo作为承载超限的部分,可以适当扩大到calls=7 def qianwen_plus(user_query, need_extra=False): logger = logging.getLogger('model_log') # 通过日志名字获取记录器 # print("call qianwen-plus...") """ 使用之前上传的文件,根据用户查询生成响应,并实时显示流式输出。 目前命中缓存局限:1.不足 256 Token 的内容不会被缓存。 2.上下文缓存的命中概率并不是100%,即使是上下文完全一致的请求,也存在无法命中的概率,命中概率依据系统判断而定。 3.若多线程同时请求,缓存无法及时更新! 参数: - user_query: 用户查询 - need_extra: 是否需要返回额外数据(默认 False) 返回: - 当 need_extra=False 时: 返回响应内容 (str) - 当 need_extra=True 时: 返回 (响应内容, token_usage) """ # 内部定义重试参数 max_retries = 2 backoff_factor = 2.0 api_key = get_next_api_key() client = OpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) for attempt in range(1, max_retries + 2): # +1 是为了包括初始调用 try: completion_tokens = 0 # 初始化 completion_tokens 为 0 # 生成基于用户查询的响应 completion = client.chat.completions.create( model="qwen-plus", temperature=0.5, messages=[ { 'role': 'user', 'content': user_query } ], stream=True, # 启用流式响应 stream_options={"include_usage": True} ) full_response = "" # 用于存储完整的响应内容 for chunk in completion: # 解析 chunk 数据 if hasattr(chunk, 'to_dict'): chunk_data = chunk.to_dict() else: chunk_data = json.loads(chunk.model_dump_json()) # 处理 usage 信息 usage = chunk_data.get('usage') if usage is not None: completion_tokens = usage.get('completion_tokens', 0) # prompt_tokens_details = usage.get('prompt_tokens_details', {}) #命中tokens ,取消注释可以print # cache_hit = prompt_tokens_details.get('cached_tokens', 0) # print("命中:"+str(cache_hit)) # 处理 choices 信息 choices = chunk_data.get('choices', []) if choices: choice = choices[0] delta = choice.get('delta', {}) content = delta.get('content', '') if content: full_response += content # 实时打印内容(可以取消注释下面一行以实时输出) # print(content, end='', flush=True) if choice.get('finish_reason'): # 处理完成原因(如果需要) pass # 或者记录 finish_reason 以供后续使用 if need_extra: return full_response, completion_tokens else: return full_response except Exception as exc: # 提取错误代码 error_code, error_code_string,request_id = extract_error_details(str(exc)) logger.error( f"第 {attempt} 次尝试失败,查询:'{user_query}',error_code:{error_code}, request_id={request_id}", exc_info=True ) if error_code == 429: if attempt <= max_retries: token_count = get_total_tokens(user_query) if token_count < 90000: #如果超过plus的qpm tpm,且user_query的tokens<90000,那么就调用turbo过渡一下。 return qianwen_turbo(user_query,need_extra) sleep_time = backoff_factor * (2 ** (attempt - 1)) # 指数退避 logger.warning(f"错误代码为 429,将在 {sleep_time} 秒后重试...") time.sleep(sleep_time) else: logger.error(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败(429 错误)。") break else: # 针对非 429 错误,只重试一次 if attempt <= 1: sleep_time = backoff_factor # 固定等待时间 logger.warning(f"遇到非 429 错误(错误代码:{error_code} - {error_code_string}),将等待 {sleep_time} 秒后重试...") time.sleep(sleep_time) continue # 直接跳到下一次循环(即重试一次) else: logger.error(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败(错误代码:{error_code} - {error_code_string})。") break # 如果所有尝试都失败了,返回空字符串或默认值 if need_extra: return "", 0 else: return "" if __name__ == "__main__": user_query1 = """该招标文件对响应文件(投标文件)偏离项的要求或内容是怎样的?请不要回答具体的技术参数,也不要回答具体的评分要求。请以json格式给我提供信息,外层键名为'偏离',若存在嵌套信息,嵌套内容键名为文件中对应字段或是你的总结,键值为原文对应内容。若文中没有关于偏离项的相关内容,在键值中填'未知'。 禁止内容: 确保键值内容均基于提供的实际招标文件内容,禁止使用任何预设的示例作为回答。 禁止返回markdown格式,请提取具体的偏离相关内容。 示例1,嵌套键值对情况: { "偏离":{ "技术要求":"以★标示的内容不允许负偏离", "商务要求":"以★标示的内容不允许负偏离" } } 示例2,无嵌套键值对情况: { "偏离":"所有参数需在技术响应偏离表内响应,如应答有缺项,且无有效证明材料的,评标委员会有权不予认可,视同负偏离处理" } """ res,a = qianwen_plus(user_query1,True) print(res) print(a)