zbparse/flask_app/main/多线程提问.py
2024-09-27 15:31:28 +08:00

205 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 基于知识库提问的通用模板,
# assistant_id
import re
import queue
import concurrent.futures
import time
from dashscope import Assistants, Messages, Runs, Threads
from llama_index.indices.managed.dashscope import DashScopeCloudRetriever
from flask_app.main.通义千问long import qianwen_long, upload_file
prompt = """
# 角色
你是一个文档处理专家,专门负责理解和操作基于特定内容的文档任务,这包括解析、总结、搜索或生成与给定文档相关的各类信息。
## 技能
### 技能 1文档解析与摘要
- 深入理解并分析${documents}的内容,提取关键信息。
- 根据需求生成简洁明了的摘要,保持原文核心意义不变。
### 技能 2信息检索与关联
- 在${documents}中高效检索特定信息或关键词。
- 能够识别并链接到文档内部或外部的相关内容,增强信息的连贯性和深度。
## 限制
- 所有操作均需基于${documents}的内容,不可超出此范围创造信息。
- 在处理敏感或机密信息时,需遵守严格的隐私和安全规定。
- 确保所有生成或改编的内容逻辑连贯,无误导性信息。
请注意,上述技能执行时将直接利用并参考${documents}的具体内容,以确保所有产出紧密相关且高质量。
"""
prom = '请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。${documents}'
def read_questions_from_file(file_path):
questions = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip()
# 使用正则表达式匹配以数字开头,后接一个点的行
if re.match(r'\d+\.', line):
# 从点后分割并去除前后空格获取问题部分
question = line.split('.', 1)[1].strip()
questions.append(question)
return questions
#正文和文档名之间的内容
def send_message(assistant, message='百炼是什么?'):
ans = []
print(f"Query: {message}")
# create thread.
thread = Threads.create()
print(thread)
# create a message.
message = Messages.create(thread.id, content=message)
# create run
run = Runs.create(thread.id, assistant_id=assistant.id)
# print(run)
# wait for run completed or requires_action
run_status = Runs.wait(run.id, thread_id=thread.id)
# print(run_status)
# get the thread messages.
msgs = Messages.list(thread.id)
for message in msgs['data'][::-1]:
ans.append(message['content'][0]['text']['value'])
return ans
def rag_assistant(knowledge_name):
retriever = DashScopeCloudRetriever(knowledge_name)
pipeline_id = str(retriever.pipeline_id)
assistant = Assistants.create(
model='qwen-max',
name='smart helper',
description='智能助手,支持知识库查询和插件调用。',
temperature='0.3',
instructions="请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。${documents}",
tools=[
{
"type": "code_interpreter"
},
{
"type": "rag",
"prompt_ra": {
"pipeline_id": pipeline_id,
"parameters": {
"type": "object",
"properties": {
"query_word": {
"type": "str",
"value": "${documents}"
}
}
}
}
}]
)
return assistant
def pure_assistant():
assistant = Assistants.create(
model='qwen-max',
name='smart helper',
description='智能助手,能基于用户的要求精准简洁地回答用户的提问',
instructions='智能助手,能基于用户的要求精准简洁地回答用户的提问',
tools=[
{
"type": "code_interpreter"
},
]
)
return assistant
def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type):
if llm_type==1:
print(f"rag_assistant! question:{question}")
assistant = rag_assistant(knowledge_name)
elif llm_type==2:
print(f"qianwen_long! question:{question}")
qianwen_res = qianwen_long(file_id,question)
result_queue.put((ans_index,(question,qianwen_res)))
return
else :
assistant = pure_assistant()
ans = send_message(assistant, message=question)
result_queue.put((ans_index, (question, ans))) # 在队列中添加索引 (question, ans)
def multi_threading(queries, knowledge_name="", file_id="", llm_type=1):
if not queries:
return []
print("多线程提问starting multi_threading...")
result_queue = queue.Queue()
# 使用 ThreadPoolExecutor 管理线程
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
# 逐个提交任务每提交一个任务后休眠1秒
future_to_query = {}
for index, query in enumerate(queries):
future = executor.submit(llm_call, query, knowledge_name, file_id, result_queue, index, llm_type)
future_to_query[future] = index
time.sleep(1) # 每提交一个任务后等待1秒
# 收集每个线程的结果
for future in concurrent.futures.as_completed(future_to_query):
index = future_to_query[future]
try:
future.result() # 捕获异常或确认任务完成
except Exception as exc:
print(f"Query {index} generated an exception: {exc}")
# 确保在异常情况下也向 result_queue 添加占位符
result_queue.put((index, None))
# 从队列中获取所有结果并按索引排序
results = [None] * len(queries)
while not result_queue.empty():
index, result = result_queue.get()
results[index] = result
# 返回一个保证是列表的结构
return results
if __name__ == "__main__":
# start_time=time.time()
# # 读取问题列表
# questions =read_questions_from_file('../static/提示词/前两章提问总结.txt')
# for i in questions:
# print(i)
# knowledge_name = "招标解析5word"
# llm_type=1
# results = multi_threading(questions, knowledge_name)
# end_time = time.time()
# if not results:
# print("errror!")
# else:
# print("elapsed time:"+str(end_time-start_time))
# # 打印结果
# for question, response in results:
# print(f"Question: {question}")
# print(f"Response: {response}")
# file_path = "C:\\Users\\Administrator\\Desktop\\招标文件\\output1\\ztb_evaluation_method.pdf"
# file_id = upload_file(file_path)
# questions=["根据该文档中的评标办法前附表,请你列出该文件的技术标,以json的格式返回结果","根据该文档中的评标办法前附表,请你列出该文件的商务标,以json的格式返回结果","根据该文档中的评标办法前附表,请你列出该文件的投标报价,以json的格式返回结果"]
# results=multi_threading(questions,"",file_id,2) #1代表使用百炼rag 2代表使用qianwen-long
# if not results:
# print("errror!")
# else:
# # 打印结果
# for question, response in results:
# print(f"Question: {question}")
# print(f"Response: {response}")
ques=[]
results = multi_threading(ques, "招标解析5word")
if not results:
print("errror!")
else:
# 打印结果
for question, response in results:
print(f"Question: {question}")
print(f"Response: {response}")