zbparse/flask_app/general/多线程提问.py
2025-01-17 09:11:31 +08:00

257 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 基于知识库提问的通用模板,
# assistant_id
import json
import os
import re
import queue
import concurrent.futures
import time
from datetime import datetime
import requests
from dashscope import Assistants, Messages, Runs, Threads
from llama_index.indices.managed.dashscope import DashScopeCloudRetriever
from flask_app.general.doubao import doubao_model
from flask_app.general.通义千问long import qianwen_long, upload_file
prompt = """
# 角色
你是一个文档处理专家,专门负责理解和操作基于特定内容的文档任务,这包括解析、总结、搜索或生成与给定文档相关的各类信息。
## 技能
### 技能 1文档解析与摘要
- 深入理解并分析${documents}的内容,提取关键信息。
- 根据需求生成简洁明了的摘要,保持原文核心意义不变。
### 技能 2信息检索与关联
- 在${documents}中高效检索特定信息或关键词。
- 能够识别并链接到文档内部或外部的相关内容,增强信息的连贯性和深度。
## 限制
- 所有操作均需基于${documents}的内容,不可超出此范围创造信息。
- 在处理敏感或机密信息时,需遵守严格的隐私和安全规定。
- 确保所有生成或改编的内容逻辑连贯,无误导性信息。
请注意,上述技能执行时将直接利用并参考${documents}的具体内容,以确保所有产出紧密相关且高质量。
"""
prom = '请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。${documents}'
def read_questions_from_file(file_path):
questions = []
current_question = ""
current_number = 0
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip()
if not line: # 跳过空行
continue
if line.startswith('#'): # 跳过以#开头的行
continue
# 检查是否是新的问题编号,例如 "1."
match = re.match(r'^(\d+)\.', line)
if match:
# 如果有之前的问题,保存它
if current_question:
questions.append(current_question.strip())
# 开始新的问题
current_number = int(match.group(1))
# 提取问题内容,去掉编号和点
current_question = line.split('.', 1)[1].strip() + "\n"
else:
# 继续添加到当前问题
current_question += line + "\n"
# 添加最后一个问题(如果存在)
if current_question:
questions.append(current_question.strip())
return questions
#正文和文档名之间的内容
def send_message(assistant, message='百炼是什么?'):
ans = []
print(f"Query: {message}")
# create thread.
thread = Threads.create()
print(thread)
# create a message.
message = Messages.create(thread.id, content=message)
# create run
run = Runs.create(thread.id, assistant_id=assistant.id)
# print(run)
# wait for run completed or requires_action
run_status = Runs.wait(run.id, thread_id=thread.id)
# print(run_status)
# get the thread messages.
msgs = Messages.list(thread.id)
for message in msgs['data'][::-1]:
ans.append(message['content'][0]['text']['value'])
return ans
def rag_assistant(knowledge_name):
retriever = DashScopeCloudRetriever(knowledge_name)
pipeline_id = str(retriever.pipeline_id)
assistant = Assistants.create(
model='qwen-max',
name='smart helper',
description='智能助手,支持知识库查询和插件调用。',
temperature='0.3',
instructions="请记住以下材料,他们对回答问题有帮助,请你简洁准确地给出回答,不要给出无关内容。${documents}",
tools=[
{
"type": "code_interpreter"
},
{
"type": "rag",
"prompt_ra": {
"pipeline_id": pipeline_id,
"parameters": {
"type": "object",
"properties": {
"query_word": {
"type": "str",
"value": "${documents}"
}
}
}
}
}]
)
return assistant
def pure_assistant():
assistant = Assistants.create(
model='qwen-max',
name='smart helper',
description='智能助手,能基于用户的要求精准简洁地回答用户的提问',
instructions='智能助手,能基于用户的要求精准简洁地回答用户的提问',
tools=[
{
"type": "code_interpreter"
},
]
)
return assistant
def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type,need_extra=False):
"""
调用不同的 LLM 模型并将结果放入结果队列。
"""
try:
if llm_type==1:
print(f"rag_assistant! question:{question}")
assistant = rag_assistant(knowledge_name)
# assistant=create_assistant(knowledge_name)
ans = send_message(assistant, message=question)
result_queue.put((ans_index, (question, ans))) # 在队列中添加索引 (question, ans)
elif llm_type==2:
print(f"qianwen_long! question:{question}")
# qianwen_res,usage = qianwen_long(file_id,question) #有bug
qianwen_res = qianwen_long(file_id, question,2,1,need_extra)
if not qianwen_res:
result_queue.put((ans_index, None)) # 如果为空字符串,直接返回 None
else:
result_queue.put((ans_index, (question, qianwen_res)))
elif llm_type==3:
# print(f"doubao! question:{question}")
doubao_res=doubao_model(question,need_extra)
if not doubao_res:
result_queue.put((ans_index, None)) # 如果为空字符串,直接返回 None
else:
result_queue.put((ans_index, (question, doubao_res)))
else :
assistant = pure_assistant()
ans = send_message(assistant, message=question)
result_queue.put((ans_index, (question, ans))) # 在队列中添加索引 (question, ans)
except Exception as e:
print(f"LLM 调用失败,查询索引 {ans_index},错误:{e}")
result_queue.put((ans_index, None)) # 使用 None 作为失败的占位符
def multi_threading(queries, knowledge_name="", file_id="", llm_type=1,need_extra=False):
if not queries:
return []
print("多线程提问starting multi_threading...")
result_queue = queue.Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=40) as executor:
future_to_index = {
executor.submit(llm_call, query, knowledge_name, file_id, result_queue, index, llm_type,need_extra): index
for index, query in enumerate(queries)
}
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
try:
future.result() # 确保任务完成,如果有未处理的异常会在这里抛出
except Exception as exc:
print(f"查询索引 {index} 生成了一个异常:{exc}")
result_queue.put((index, None)) # 使用 None 作为失败的占位符
# 初始化结果列表,确保按查询的索引顺序排列
results = [None] * len(queries)
while not result_queue.empty():
index, result = result_queue.get()
results[index] = result
# 可选:过滤掉所有结果为 None 的项
# 如果希望保留 None 以表示失败的查询,可以注释掉以下代码
if all(result is None for result in results):
return []
results = [r for r in results if r is not None]
return results
if __name__ == "__main__":
start_time=time.time()
# # # 读取问题列表
# baseinfo_file_path = '/flask_app/static/提示词/基本信息工程标.txt'
# questions =read_questions_from_file(baseinfo_file_path)
# knowledge_name = "招标解析5word"
# llm_type=1
# results = multi_threading(questions, knowledge_name)
# end_time = time.time()
# if not results:
# print("errror!")
# else:
# print("elapsed time:"+str(end_time-start_time))
# # 打印结果
# for question, response in results:
# print(f"Response: {response}")
file_path = r"C:\Users\Administrator\Desktop\fsdownload\39b0c3b4-1807-456c-8330-c5c7d1b7a2ca\ztbfile_procurement\ztbfile_procurement_1.pdf"
file_id = upload_file(file_path)
# questions=["该招标文件的项目名称是项目编号或招标编号采购人或招标人采购代理机构或招标代理机构请按json格式给我提供信息键名分别是'项目名称','项目编号','采购人','采购代理机构',若存在未知信息,在对应的键值中填'未知'。","该招标文件的项目概况是项目基本情况是请按json格式给我提供信息键名分别为'项目概况','项目基本情况',若存在嵌套信息,嵌套内容键名以文件中对应字段命名,而嵌套键值必须与原文保持一致,若存在未知信息,在对应的键值中填'未知'。"]
# results=multi_threading(questions,"",file_id,2) #1代表使用百炼rag 2代表使用qianwen-long
# if not results:
# print("errror!")
# else:
# # 打印结果
# for question, response in results:
# print(f"Question: {question}")
# print(f"Response: {response}")
# ques=["关于'资格要求',本采购文件第一章第二款要求的内容是怎样的请按json格式给我提供信息键名为'资格要求',而键值需要完全与原文保持一致,不要擅自总结、删减,如果存在未知信息,请在对应键值处填'未知'。"]
# # ques=["该招标文件的工程名称项目名称招标编号是招标人是招标代理机构是请按json格式给我提供信息键名分别是'工程名称','招标编号','招标人','招标代理机构',若存在未知信息,在对应的键值中填'未知'。","该招标文件的工程概况或项目概况招标范围是请按json格式给我提供信息键名分别为'工程概况','招标范围',若存在嵌套信息,嵌套内容键名以文件中对应字段命名,若存在未知信息,在对应的键值中填'未知'。"]
# results = multi_threading(ques, "6.2视频会议docx")
# if not results:
# print("errror!")
# else:
# # 打印结果
# for question, response in results:
# print(f"Question: {question}")
# print(f"Response: {response}")
query=[]
for i in range(1,50):
query.append("请返回这个数字:"+str(i))
res=multi_threading(query,"",file_id,2)
for _,response in res:
print(response)