2024-08-29 16:37:09 +08:00
# 基于知识库提问的通用模板,
# assistant_id
2024-10-14 17:13:11 +08:00
import json
import os
2024-08-29 16:37:09 +08:00
import re
import queue
import concurrent . futures
import time
2024-11-20 19:35:22 +08:00
from datetime import datetime
2024-10-14 17:13:11 +08:00
import requests
2024-08-29 16:37:09 +08:00
from dashscope import Assistants , Messages , Runs , Threads
from llama_index . indices . managed . dashscope import DashScopeCloudRetriever
2024-11-20 19:35:22 +08:00
from flask_app . general . 通义千问long import qianwen_long , upload_file
2024-08-29 16:37:09 +08:00
prompt = """
# 角色
你是一个文档处理专家 , 专门负责理解和操作基于特定内容的文档任务 , 这包括解析 、 总结 、 搜索或生成与给定文档相关的各类信息 。
## 技能
### 技能 1: 文档解析与摘要
- 深入理解并分析 $ { documents } 的内容 , 提取关键信息 。
- 根据需求生成简洁明了的摘要 , 保持原文核心意义不变 。
### 技能 2: 信息检索与关联
- 在 $ { documents } 中高效检索特定信息或关键词 。
- 能够识别并链接到文档内部或外部的相关内容 , 增强信息的连贯性和深度 。
## 限制
- 所有操作均需基于 $ { documents } 的内容 , 不可超出此范围创造信息 。
- 在处理敏感或机密信息时 , 需遵守严格的隐私和安全规定 。
- 确保所有生成或改编的内容逻辑连贯 , 无误导性信息 。
请注意 , 上述技能执行时将直接利用并参考 $ { documents } 的具体内容 , 以确保所有产出紧密相关且高质量 。
"""
prom = ' 请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。$ {documents} '
def read_questions_from_file ( file_path ) :
questions = [ ]
2024-10-16 20:18:55 +08:00
current_question = " "
current_number = 0
2024-08-29 16:37:09 +08:00
with open ( file_path , ' r ' , encoding = ' utf-8 ' ) as file :
for line in file :
line = line . strip ( )
2024-10-19 15:33:55 +08:00
2024-10-16 20:18:55 +08:00
if not line : # 跳过空行
continue
2024-10-19 15:33:55 +08:00
if line . startswith ( ' # ' ) : # 跳过以#开头的行
continue
# 检查是否是新的问题编号,例如 "1."
2024-10-16 20:18:55 +08:00
match = re . match ( r ' ^( \ d+) \ . ' , line )
if match :
# 如果有之前的问题,保存它
if current_question :
questions . append ( current_question . strip ( ) )
# 开始新的问题
current_number = int ( match . group ( 1 ) )
2024-10-19 15:33:55 +08:00
# 提取问题内容,去掉编号和点
2024-10-16 20:18:55 +08:00
current_question = line . split ( ' . ' , 1 ) [ 1 ] . strip ( ) + " \n "
else :
# 继续添加到当前问题
current_question + = line + " \n "
2024-10-19 15:33:55 +08:00
# 添加最后一个问题(如果存在)
2024-10-16 20:18:55 +08:00
if current_question :
questions . append ( current_question . strip ( ) )
2024-08-29 16:37:09 +08:00
return questions
#正文和文档名之间的内容
def send_message ( assistant , message = ' 百炼是什么? ' ) :
ans = [ ]
print ( f " Query: { message } " )
# create thread.
thread = Threads . create ( )
print ( thread )
# create a message.
message = Messages . create ( thread . id , content = message )
# create run
run = Runs . create ( thread . id , assistant_id = assistant . id )
# print(run)
# wait for run completed or requires_action
run_status = Runs . wait ( run . id , thread_id = thread . id )
# print(run_status)
# get the thread messages.
msgs = Messages . list ( thread . id )
for message in msgs [ ' data ' ] [ : : - 1 ] :
ans . append ( message [ ' content ' ] [ 0 ] [ ' text ' ] [ ' value ' ] )
return ans
def rag_assistant ( knowledge_name ) :
retriever = DashScopeCloudRetriever ( knowledge_name )
pipeline_id = str ( retriever . pipeline_id )
assistant = Assistants . create (
model = ' qwen-max ' ,
name = ' smart helper ' ,
description = ' 智能助手,支持知识库查询和插件调用。 ' ,
temperature = ' 0.3 ' ,
instructions = " 请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。$ {documents} " ,
tools = [
{
" type " : " code_interpreter "
} ,
{
" type " : " rag " ,
" prompt_ra " : {
" pipeline_id " : pipeline_id ,
" parameters " : {
" type " : " object " ,
" properties " : {
" query_word " : {
" type " : " str " ,
" value " : " $ {documents} "
}
}
}
}
} ]
)
return assistant
2024-10-14 17:13:11 +08:00
#TODO:http格式, 有bug还没修改
def create_assistant ( knowledge_name ) :
"""
2024-11-20 15:44:05 +08:00
Create an assistant using DashScope routes via HTTP request based on the provided knowledge name .
2024-10-14 17:13:11 +08:00
Parameters :
knowledge_name ( str ) : The name of the knowledge base to associate with the assistant .
Returns :
2024-11-20 15:44:05 +08:00
dict : Response from the routes containing assistant details .
2024-10-14 17:13:11 +08:00
Raises :
ValueError : If the DASHSCOPE_API_KEY environment variable is not set .
Exception : If any error occurs during the HTTP request .
"""
# Step 1: Initialize the Retriever and get the Pipeline ID
try :
retriever = DashScopeCloudRetriever ( knowledge_name )
pipeline_id = str ( retriever . pipeline_id )
except Exception as e :
print ( f " Error retrieving pipeline ID for knowledge ' { knowledge_name } ' : { e } " )
return None
2024-11-20 15:44:05 +08:00
# Step 2: Fetch the routes Key from Environment Variables
2024-10-14 17:13:11 +08:00
api_key = os . getenv ( " DASHSCOPE_API_KEY " )
if not api_key :
raise ValueError ( " DASHSCOPE_API_KEY environment variable is not set. " )
2024-11-20 15:44:05 +08:00
# Step 3: Define the routes Endpoint and Headers
2024-10-14 17:13:11 +08:00
url = ' https://dashscope.aliyuncs.com/api/v1/assistants '
headers = {
" Content-Type " : " application/json " ,
" Authorization " : f " Bearer { api_key } "
}
# Step 4: Construct the Instructions
instructions = (
" 请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。$ {documents} "
)
# Step 5: Define the Tools
tools = [
{
" type " : " code_interpreter "
} ,
{
" type " : " rag " ,
" prompt_ra " : {
" pipeline_id " : pipeline_id ,
" parameters " : {
" type " : " object " ,
" properties " : {
" query_word " : {
" type " : " str " ,
" value " : " $ {documents} "
}
}
}
}
}
]
# Step 6: Construct the Payload
payload = {
" model " : " qwen-max " ,
" name " : " 智能小助手 " , # "Smart Helper" in Chinese
" description " : " 智能助手,支持知识库查询和插件调用。 " ,
" temperature " : 0.3 ,
" instructions " : instructions ,
" tools " : tools ,
" file_ids " : [ ] , # Add file IDs if necessary
" metadata " : { } # Add metadata if necessary
}
# Optional: If you have specific file_ids or metadata, you can modify the payload accordingly
# For example:
# payload["file_ids"] = ["file_id_1", "file_id_2"]
# payload["metadata"] = {"key1": "value1", "key2": "value2"}
# Step 7: Make the HTTP POST Request
try :
response = requests . post ( url , headers = headers , data = json . dumps ( payload ) )
response . raise_for_status ( ) # Raises HTTPError for bad responses (4xx or 5xx)
assistant = response . json ( )
print ( " Assistant created successfully: " )
print ( json . dumps ( assistant , indent = 4 , ensure_ascii = False ) )
return assistant
except requests . exceptions . HTTPError as http_err :
print ( f " HTTP error occurred: { http_err } - Response: { response . text } " )
except Exception as err :
print ( f " An error occurred: { err } " )
2024-08-29 16:37:09 +08:00
def pure_assistant ( ) :
assistant = Assistants . create (
model = ' qwen-max ' ,
name = ' smart helper ' ,
description = ' 智能助手,能基于用户的要求精准简洁地回答用户的提问 ' ,
instructions = ' 智能助手,能基于用户的要求精准简洁地回答用户的提问 ' ,
tools = [
{
" type " : " code_interpreter "
} ,
]
)
return assistant
def llm_call ( question , knowledge_name , file_id , result_queue , ans_index , llm_type ) :
if llm_type == 1 :
2024-09-13 15:03:55 +08:00
print ( f " rag_assistant! question: { question } " )
2024-08-29 16:37:09 +08:00
assistant = rag_assistant ( knowledge_name )
2024-10-14 17:13:11 +08:00
# assistant=create_assistant(knowledge_name)
2024-08-29 16:37:09 +08:00
elif llm_type == 2 :
2024-09-13 15:03:55 +08:00
print ( f " qianwen_long! question: { question } " )
2024-11-20 19:35:22 +08:00
# 获取当前时间
current_time = datetime . now ( )
# 输出时分秒
print ( current_time . strftime ( " % H: % M: % S. %f " ) [ : - 3 ] )
2024-11-04 11:22:01 +08:00
# qianwen_res,usage = qianwen_long(file_id,question) #有bug
qianwen_res = qianwen_long ( file_id , question )
2024-08-29 16:37:09 +08:00
result_queue . put ( ( ans_index , ( question , qianwen_res ) ) )
return
else :
assistant = pure_assistant ( )
ans = send_message ( assistant , message = question )
result_queue . put ( ( ans_index , ( question , ans ) ) ) # 在队列中添加索引 (question, ans)
2024-09-27 15:31:28 +08:00
def multi_threading ( queries , knowledge_name = " " , file_id = " " , llm_type = 1 ) :
2024-09-18 11:57:17 +08:00
if not queries :
2024-09-23 14:13:52 +08:00
return [ ]
2024-09-27 15:31:28 +08:00
2024-09-13 15:03:55 +08:00
print ( " 多线程提问: starting multi_threading... " )
2024-08-29 16:37:09 +08:00
result_queue = queue . Queue ( )
2024-11-02 14:29:28 +08:00
max_retries = 2 # 设置最大重试次数
2024-11-20 19:35:22 +08:00
retry_counts = { } # 跟踪每个查询的重试次数
2024-11-20 15:44:05 +08:00
with concurrent . futures . ThreadPoolExecutor ( max_workers = 30 ) as executor :
2024-08-29 16:37:09 +08:00
future_to_query = { }
for index , query in enumerate ( queries ) :
2024-11-26 11:10:12 +08:00
# time.sleep(0.5) # 每提交一个任务后等待0.5秒, 目前设置了直接对qianwen-long直接限制, 无需sleep
2024-08-29 16:37:09 +08:00
future = executor . submit ( llm_call , query , knowledge_name , file_id , result_queue , index , llm_type )
future_to_query [ future ] = index
2024-11-20 19:35:22 +08:00
retry_counts [ index ] = 0 # 初始化重试次数
while future_to_query :
done , _ = concurrent . futures . wait (
future_to_query . keys ( ) ,
return_when = concurrent . futures . FIRST_COMPLETED
)
for future in done :
index = future_to_query [ future ]
del future_to_query [ future ]
try :
future . result ( ) # 捕获异常或确认任务完成
except Exception as exc :
print ( f " Query { index } generated an exception: { exc } " )
retry_counts [ index ] + = 1 # 增加重试计数
2024-11-25 09:15:56 +08:00
#Query 0 generated an exception: Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-thousand-questions-metering-and-billing.', 'type': 'insufficient_quota', 'param': None, 'code': 'insufficient_quota'}, 'request_id': 'de10e2e9-78c2-978f-8801-862ffb0892e9'}
2024-11-20 19:35:22 +08:00
if retry_counts [ index ] < = max_retries :
print ( f " Retrying query { index } (attempt { retry_counts [ index ] } )... " )
print ( " 重试的问题: " + queries [ index ] )
# 重新提交任务
new_future = executor . submit ( llm_call , queries [ index ] , knowledge_name , file_id , result_queue , index , llm_type )
future_to_query [ new_future ] = index
else :
print ( f " Query { index } failed after { max_retries } attempts. " )
result_queue . put ( ( index , None ) ) # 添加占位符
2024-08-29 16:37:09 +08:00
# 从队列中获取所有结果并按索引排序
results = [ None ] * len ( queries )
while not result_queue . empty ( ) :
index , result = result_queue . get ( )
results [ index ] = result
2024-11-20 19:35:22 +08:00
2024-10-12 18:01:59 +08:00
# 检查是否所有结果都是 None
if all ( result is None for result in results ) :
return [ ]
2024-11-20 19:35:22 +08:00
# 过滤掉None值
2024-10-16 20:18:55 +08:00
results = [ r for r in results if r is not None ]
2024-08-29 16:37:09 +08:00
return results
if __name__ == " __main__ " :
2024-10-16 20:18:55 +08:00
start_time = time . time ( )
2024-11-20 19:35:22 +08:00
# # # 读取问题列表
# baseinfo_file_path = '/flask_app/static/提示词/基本信息工程标.txt'
# questions =read_questions_from_file(baseinfo_file_path)
# knowledge_name = "招标解析5word"
# llm_type=1
# results = multi_threading(questions, knowledge_name)
# end_time = time.time()
# if not results:
# print("errror!")
# else:
# print("elapsed time:"+str(end_time-start_time))
# # 打印结果
# for question, response in results:
# print(f"Response: {response}")
file_path = r " C: \ Users \ Administrator \ Desktop \ fsdownload \ 39b0c3b4-1807-456c-8330-c5c7d1b7a2ca \ ztbfile_procurement \ ztbfile_procurement_1.pdf "
file_id = upload_file ( file_path )
2024-10-16 20:18:55 +08:00
# questions=["该招标文件的项目名称是? 项目编号( 或招标编号) 是? 采购人( 或招标人) 是? 采购代理机构( 或招标代理机构) 是? 请按json格式给我提供信息, 键名分别是'项目名称','项目编号','采购人','采购代理机构',若存在未知信息,在对应的键值中填'未知'。","该招标文件的项目概况是? 项目基本情况是? 请按json格式给我提供信息, 键名分别为'项目概况','项目基本情况',若存在嵌套信息,嵌套内容键名以文件中对应字段命名,而嵌套键值必须与原文保持一致,若存在未知信息,在对应的键值中填'未知'。"]
# results=multi_threading(questions,"",file_id,2) #1代表使用百炼rag 2代表使用qianwen-long
2024-09-04 17:49:05 +08:00
# if not results:
# print("errror!")
# else:
# # 打印结果
# for question, response in results:
# print(f"Question: {question}")
# print(f"Response: {response}")
2024-08-29 16:37:09 +08:00
2024-10-14 17:13:11 +08:00
# ques=["关于'资格要求',本采购文件第一章第二款要求的内容是怎样的? 请按json格式给我提供信息, 键名为'资格要求',而键值需要完全与原文保持一致,不要擅自总结、删减,如果存在未知信息,请在对应键值处填'未知'。"]
# # ques=["该招标文件的工程名称( 项目名称) 是? 招标编号是? 招标人是? 招标代理机构是? 请按json格式给我提供信息, 键名分别是'工程名称','招标编号','招标人','招标代理机构',若存在未知信息,在对应的键值中填'未知'。","该招标文件的工程概况( 或项目概况) 是? 招标范围是? 请按json格式给我提供信息, 键名分别为'工程概况','招标范围',若存在嵌套信息,嵌套内容键名以文件中对应字段命名,若存在未知信息,在对应的键值中填'未知'。"]
# results = multi_threading(ques, "6.2视频会议docx")
# if not results:
# print("errror!")
# else:
# # 打印结果
# for question, response in results:
# print(f"Question: {question}")
2024-11-20 19:35:22 +08:00
# print(f"Response: {response}")
query = [ ]
for i in range ( 1 , 50 ) :
query . append ( " 请返回这个数字: " + str ( i ) )
res = multi_threading ( query , " " , file_id , 2 )
for _ , response in res :
print ( response )