import ast import json import re import threading from functools import wraps from flask import g from ratelimit import limits, sleep_and_retry import random import time from pathlib import Path from openai import OpenAI import os file_write_lock = threading.Lock() def upload_file(file_path,output_folder=""): """ Uploads a file to DashScope and returns the file ID. Additionally, saves the file ID to 'file_ids.txt' in the given output folder. """ if not output_folder: output_folder=os.path.dirname(file_path) client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) # 上传文件并获取 file_id file = client.files.create(file=Path(file_path), purpose="file-extract") file_id = file.id # 创建output_folder路径,如果它不存在 if not os.path.exists(output_folder): os.makedirs(output_folder) # 确保文件写入是互斥的 with file_write_lock: # 在这个代码块中,其他线程无法进入 file_ids_path = os.path.join(output_folder, 'file_ids.txt') # 如果文件不存在,就创建它并写入 file_id with open(file_ids_path, 'a') as f: f.write(f'{file_id}\n') return file_id @sleep_and_retry @limits(calls=4, period=1) # 每秒最多调用4次 def rate_limiter(): pass # 这个函数本身不执行任何操作,只用于限流 # 创建一个共享的装饰器 def shared_rate_limit(func): @wraps(func) def wrapper(*args, **kwargs): rate_limiter() # 通过共享的限流器 return func(*args, **kwargs) return wrapper @shared_rate_limit def qianwen_long(file_id, user_query, max_retries=2, backoff_factor=1.0): """ 基于上传的文件 ID 和用户查询生成响应,并在失败时自动重试。 参数: - file_id: 上传文件的 ID - user_query: 用户查询 - max_retries: 最大重试次数(默认 2 次) - backoff_factor: 指数退避的基础等待时间(默认 1.0 秒) """ # print("call qianwen_long...") client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) for attempt in range(1, max_retries + 2): # +1 是为了包括初始调用 try: # 调用 API completion = client.chat.completions.create( model="qwen-long", temperature=0.5, messages=[ { 'role': 'system', 'content': f'fileid://{file_id}' }, { 'role': 'user', 'content': user_query } ], stream=False ) # 如果调用成功,返回响应内容 return completion.choices[0].message.content except Exception as exc: # 提取错误代码 error_code, error_code_string = extract_error_details(str(exc)) print(f"第 {attempt} 次尝试失败,查询:'{user_query}',错误:{exc}") if error_code == 429: if attempt <= max_retries: sleep_time = backoff_factor * (2 ** (attempt - 1)) # 指数退避 print(f"错误代码为 429,将在 {sleep_time} 秒后重试...") time.sleep(sleep_time) else: print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败(429 错误)。") elif error_code == 400 and error_code_string in ['data_inspection_failed', 'ResponseTimeout','DataInspectionFailed','response_timeout']: if attempt == 1: # 只重试一次 print(f"错误代码为 400 - {error_code_string},将立即重试...") continue # 直接跳到下一次循环(即重试一次) else: print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败(400 - {error_code_string})。") else: # 对于非 429 和非特定 400 错误,不进行重试,直接抛出异常 print(f"遇到非 429 或非 'data_inspection_failed' 的 400 错误(错误代码:{error_code}),不进行重试。") return "" def extract_error_details(error_message): """ 从错误消息中提取错误代码和内部错误代码。 假设错误消息的格式包含 'Error code: XXX - {...}' """ # 提取数值型错误代码 error_code_match = re.search(r'Error code:\s*(\d+)', error_message) error_code = int(error_code_match.group(1)) if error_code_match else None # 提取内部错误代码字符串(如 'data_inspection_failed') error_code_string = None error_dict_match = re.search(r'Error code:\s*\d+\s*-\s*(\{.*\})', error_message) if error_dict_match: error_dict_str = error_dict_match.group(1) try: # 使用 ast.literal_eval 解析字典字符串 error_dict = ast.literal_eval(error_dict_str) error_code_string = error_dict.get('error', {}).get('code') print(error_code_string) except Exception as e: print(f"解析错误消息失败: {e}") return error_code, error_code_string @shared_rate_limit def qianwen_long_stream(file_id, user_query, max_retries = 2, backoff_factor = 1.0): """ 使用之前上传的文件,根据用户查询生成响应,并实时显示流式输出。 参数: - file_id: 上传文件的 ID - user_query: 用户查询 - max_retries: 最大重试次数(默认 2 次) - backoff_factor: 指数退避的基础等待时间(默认 1.0 秒) 返回: - Optional[str]: 成功时返回响应内容,失败时返回空字符串 """ print("调用 qianwen-long stream...") client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) for attempt in range(1, max_retries + 2): # +1 是为了包括初始调用 try: # 生成基于文件ID的响应 completion = client.chat.completions.create( model="qwen-long", temperature=0.4, max_tokens=5000, messages=[ { 'role': 'system', 'content': f'fileid://{file_id}' }, { 'role': 'user', 'content': user_query } ], stream=True # 启用流式响应 ) full_response = "" # 用于存储完整的响应内容 for chunk in completion: if hasattr(chunk, 'to_dict'): chunk_data = chunk.to_dict() else: chunk_data = json.loads(chunk.model_dump_json()) choices = chunk_data.get('choices', []) if not choices: continue choice = choices[0] delta = choice.get('delta', {}) content = delta.get('content', '') if content: full_response += content # print(content, end='', flush=True) # 实时打印内容 if choice.get('finish_reason'): break return full_response # 返回完整的响应内容 except Exception as exc: # 提取错误代码 error_code, error_code_string = extract_error_details(str(exc)) print(f"第 {attempt} 次尝试失败,查询:'{user_query}',错误:{exc}") if error_code == 429: if attempt <= max_retries: sleep_time = backoff_factor * (2 ** (attempt - 1)) # 指数退避 print(f"错误代码为 429,将在 {sleep_time} 秒后重试...") time.sleep(sleep_time) else: print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败(429 错误)。") elif error_code == 400 and error_code_string in ['data_inspection_failed', 'ResponseTimeout', 'DataInspectionFailed', 'response_timeout','RequestTimeOut','request_timeout']: if attempt == 1: # 只重试一次 print(f"错误代码为 400 - {error_code_string},将立即重试...") continue # 直接跳到下一次循环(即重试一次) else: print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败(400 - {error_code_string})。") else: # 对于非 429 和非特定 400 错误,不进行重试,直接抛出异常 print(f"遇到非 429 或非 'data_inspection_failed...' 的 400 错误(错误代码:{error_code}),不进行重试。") # 如果所有尝试都失败了,返回空字符串 return "" @shared_rate_limit def qianwen_long_text(file_id, user_query): print("call qianwen-long text...") """ Uses a previously uploaded file to generate a response based on a user query. """ client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) # Generate a response based on the file ID completion = client.chat.completions.create( model="qwen-long", # top_p=0.5, temperature=0.5, messages=[ { 'role': 'system', 'content': f'fileid://{file_id}' }, { 'role': 'user', 'content': user_query } ], stream=False ) # Return the response content return completion.choices[0].message.content #TODO:若采购需求和评分那块响应超时比较多,考虑都改为流式 if __name__ == "__main__": # Example file path - replace with your actual file path file_path = r"C:\Users\Administrator\Desktop\货物标\截取test\2-招标文件_before.pdf" file_id = upload_file(file_path) print(file_id) # # user_query1 = "该招标文件前附表中的项目名称是什么,请以json格式返回给我" # user_query2 = ("请提供文件中关于资格审查的具体内容和标准。") # start_time=time.time() # # First query # print("starting qianwen-long...") # result1 ,result2= qianwen_long(file_id, user_query1) # print("First Query Result:", result1) # print(type(result1)) # print(result2) # # Second query # print("starting qianwen-long...") # result2 = qianwen_long(file_id, user_query2) # print("Second Query Result:", result2) # end_time=time.time() # print("elapsed time:"+str(end_time-start_time))