zbparse/flask_app/general/通义千问long.py

275 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ast
import json
import re
from functools import wraps
from ratelimit import limits, sleep_and_retry
import random
import time
from pathlib import Path
from openai import OpenAI
import os
def upload_file(file_path):
"""
Uploads a file to DashScope and returns the file ID.
"""
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
file = client.files.create(file=Path(file_path), purpose="file-extract")
return file.id
@sleep_and_retry
@limits(calls=4, period=1) # 每秒最多调用4次
def rate_limiter():
pass # 这个函数本身不执行任何操作,只用于限流
# 创建一个共享的装饰器
def shared_rate_limit(func):
@wraps(func)
def wrapper(*args, **kwargs):
rate_limiter() # 通过共享的限流器
return func(*args, **kwargs)
return wrapper
@shared_rate_limit
def qianwen_long(file_id, user_query, max_retries=2, backoff_factor=1.0):
"""
基于上传的文件 ID 和用户查询生成响应,并在失败时自动重试。
参数:
- file_id: 上传文件的 ID
- user_query: 用户查询
- max_retries: 最大重试次数(默认 2 次)
- backoff_factor: 指数退避的基础等待时间(默认 1.0 秒)
"""
print("call qianwen_long...")
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
for attempt in range(1, max_retries + 2): # +1 是为了包括初始调用
try:
# 调用 API
completion = client.chat.completions.create(
model="qwen-long",
temperature=0.5,
messages=[
{
'role': 'system',
'content': f'fileid://{file_id}'
},
{
'role': 'user',
'content': user_query
}
],
stream=False
)
# 如果调用成功,返回响应内容
return completion.choices[0].message.content
except Exception as exc:
# 提取错误代码
error_code, error_code_string = extract_error_details(str(exc))
print(f"{attempt} 次尝试失败,查询:'{user_query}',错误:{exc}")
if error_code == 429:
if attempt <= max_retries:
sleep_time = backoff_factor * (2 ** (attempt - 1)) # 指数退避
print(f"错误代码为 429将在 {sleep_time} 秒后重试...")
time.sleep(sleep_time)
else:
print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败429 错误)。")
elif error_code == 400 and error_code_string in ['data_inspection_failed', 'ResponseTimeout','DataInspectionFailed','response_timeout']:
if attempt == 1: # 只重试一次
print(f"错误代码为 400 - {error_code_string},将立即重试...")
continue # 直接跳到下一次循环(即重试一次)
else:
print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败400 - {error_code_string})。")
else:
# 对于非 429 和非特定 400 错误,不进行重试,直接抛出异常
print(f"遇到非 429 或非 'data_inspection_failed' 的 400 错误(错误代码:{error_code}),不进行重试。")
return ""
def extract_error_details(error_message):
"""
从错误消息中提取错误代码和内部错误代码。
假设错误消息的格式包含 'Error code: XXX - {...}'
"""
# 提取数值型错误代码
error_code_match = re.search(r'Error code:\s*(\d+)', error_message)
error_code = int(error_code_match.group(1)) if error_code_match else None
# 提取内部错误代码字符串(如 'data_inspection_failed'
error_code_string = None
error_dict_match = re.search(r'Error code:\s*\d+\s*-\s*(\{.*\})', error_message)
if error_dict_match:
error_dict_str = error_dict_match.group(1)
try:
# 使用 ast.literal_eval 解析字典字符串
error_dict = ast.literal_eval(error_dict_str)
error_code_string = error_dict.get('error', {}).get('code')
print(error_code_string)
except Exception as e:
print(f"解析错误消息失败: {e}")
return error_code, error_code_string
@shared_rate_limit
def qianwen_long_stream(file_id, user_query, max_retries = 2, backoff_factor = 1.0):
"""
使用之前上传的文件,根据用户查询生成响应,并实时显示流式输出。
参数:
- file_id: 上传文件的 ID
- user_query: 用户查询
- max_retries: 最大重试次数(默认 2 次)
- backoff_factor: 指数退避的基础等待时间(默认 1.0 秒)
返回:
- Optional[str]: 成功时返回响应内容,失败时返回空字符串
"""
print("调用 qianwen-long stream...")
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
for attempt in range(1, max_retries + 2): # +1 是为了包括初始调用
try:
# 生成基于文件ID的响应
completion = client.chat.completions.create(
model="qwen-long",
temperature=0.4,
max_tokens=5000,
messages=[
{
'role': 'system',
'content': f'fileid://{file_id}'
},
{
'role': 'user',
'content': user_query
}
],
stream=True # 启用流式响应
)
full_response = "" # 用于存储完整的响应内容
for chunk in completion:
if hasattr(chunk, 'to_dict'):
chunk_data = chunk.to_dict()
else:
chunk_data = json.loads(chunk.model_dump_json())
choices = chunk_data.get('choices', [])
if not choices:
continue
choice = choices[0]
delta = choice.get('delta', {})
content = delta.get('content', '')
if content:
full_response += content
# print(content, end='', flush=True) # 实时打印内容
if choice.get('finish_reason'):
break
return full_response # 返回完整的响应内容
except Exception as exc:
# 提取错误代码
error_code, error_code_string = extract_error_details(str(exc))
print(f"{attempt} 次尝试失败,查询:'{user_query}',错误:{exc}")
if error_code == 429:
if attempt <= max_retries:
sleep_time = backoff_factor * (2 ** (attempt - 1)) # 指数退避
print(f"错误代码为 429将在 {sleep_time} 秒后重试...")
time.sleep(sleep_time)
else:
print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败429 错误)。")
elif error_code == 400 and error_code_string in ['data_inspection_failed', 'ResponseTimeout',
'DataInspectionFailed', 'response_timeout']:
if attempt == 1: # 只重试一次
print(f"错误代码为 400 - {error_code_string},将立即重试...")
continue # 直接跳到下一次循环(即重试一次)
else:
print(f"查询 '{user_query}' 的所有 {max_retries + 1} 次尝试均失败400 - {error_code_string})。")
else:
# 对于非 429 和非特定 400 错误,不进行重试,直接抛出异常
print(f"遇到非 429 或非 'data_inspection_failed' 的 400 错误(错误代码:{error_code}),不进行重试。")
# 如果所有尝试都失败了,返回空字符串
return ""
@shared_rate_limit
def qianwen_long_text(file_id, user_query):
print("call qianwen-long text...")
"""
Uses a previously uploaded file to generate a response based on a user query.
"""
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
# Generate a response based on the file ID
completion = client.chat.completions.create(
model="qwen-long",
# top_p=0.5,
temperature=0.5,
messages=[
{
'role': 'system',
'content': f'fileid://{file_id}'
},
{
'role': 'user',
'content': user_query
}
],
stream=False
)
# Return the response content
return completion.choices[0].message.content
if __name__ == "__main__":
# Example file path - replace with your actual file path
file_path = r"C:\Users\Administrator\Desktop\货物标\output1\陕西省公安厅交通警察总队高速公路交通安全智能感知巡查系统项目 (1)_procurement.pdf"
file_id = upload_file(file_path)
#
# user_query1 = "该招标文件前附表中的项目名称是什么请以json格式返回给我"
# user_query2 = ("请提供文件中关于资格审查的具体内容和标准。")
# start_time=time.time()
# # First query
# print("starting qianwen-long...")
# result1 ,result2= qianwen_long(file_id, user_query1)
# print("First Query Result:", result1)
# print(type(result1))
# print(result2)
# # Second query
# print("starting qianwen-long...")
# result2 = qianwen_long(file_id, user_query2)
# print("Second Query Result:", result2)
# end_time=time.time()
# print("elapsed time:"+str(end_time-start_time))
user_query = """
请你根据该货物标中采购要求部分的内容,请你给出"高速公路交通安全智能感知巡查系统软件"的技术参数或采购要求请以json格式返回结果键名为"高速公路交通安全智能感知巡查系统软件", 键值为一个列表,列表中包含若干描述\"{}\"的技术参数(或采购要求)的字符串,需与原文完全一致,即若技术参数前存在序号也要保留,但你不可擅自增添或删减。以下为需要考虑的特殊情况:如果该货物没有相关采购要求或技术参数要求,键值为空列表。示例输出格式如下:
{{
"摄像机控制键盘": [
"1、支持串行 RS232/RS422 和 IP 混合控制,允许在一个控制器上使用 RS232/RS422/IP 控制单个系统中的摄像机;",
"2、支持 2 组 RS422 串口 VISCA 协议菊花链控制 2x7 台摄像机。"
]
}}
"""
response = qianwen_long_stream(file_id, user_query)
print("完整响应内容:")
print(response)