From d4a344623628cca6b7c40cbcc1499bba3d7278a0 Mon Sep 17 00:00:00 2001 From: zy123 <646228430@qq.com> Date: Thu, 6 Feb 2025 16:37:52 +0800 Subject: [PATCH] =?UTF-8?q?2.6=20=E7=BB=93=E6=9E=84=E6=95=B4=E7=90=86?= =?UTF-8?q?=E3=80=81=E5=A2=9E=E5=8A=A0=E4=BA=86cache=E5=91=BD=E4=B8=AD?= =?UTF-8?q?=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flask_app/general/llm/多线程提问.py | 103 ++++++++++++--------- flask_app/general/llm/通义千问long_plus.py | 7 +- flask_app/static/提示词/基本信息工程标.txt | 2 +- flask_app/static/提示词/基本信息货物标.txt | 2 +- flask_app/货物标/技术参数要求提取.py | 3 +- flask_app/货物标/提取采购需求main.py | 4 +- 6 files changed, 70 insertions(+), 51 deletions(-) diff --git a/flask_app/general/llm/多线程提问.py b/flask_app/general/llm/多线程提问.py index 5e68bd4..36facf1 100644 --- a/flask_app/general/llm/多线程提问.py +++ b/flask_app/general/llm/多线程提问.py @@ -8,6 +8,7 @@ import time from dashscope import Assistants, Messages, Runs, Threads from llama_index.indices.managed.dashscope import DashScopeCloudRetriever +from flask_app.general.llm.doubao import read_txt_to_string from flask_app.general.llm.通义千问long_plus import qianwen_long, upload_file, qianwen_plus prompt = """ @@ -157,7 +158,7 @@ def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type else: result_queue.put((ans_index, (question, qianwen_res))) elif llm_type==3: - # print(f"doubao! question:{question}") + # print(f"doubao! question:{question}") #暂时废弃doubao! # doubao_res=doubao_model(question,need_extra) qianwen_plus_res=qianwen_plus(question,need_extra) if not qianwen_plus_res: @@ -178,56 +179,62 @@ def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type print(f"LLM 调用失败,查询索引 {ans_index},错误:{e}") result_queue.put((ans_index, None)) # 使用 None 作为失败的占位符 -def multi_threading(queries, knowledge_name="", file_id="", llm_type=1,need_extra=False): + +def multi_threading(queries, knowledge_name="", file_id="", llm_type=1, need_extra=False): if not queries: return [] + print("多线程提问:starting multi_threading...") result_queue = queue.Queue() + + def submit_task(executor, query, idx): + # 提交任务,并返回对应的 future + return executor.submit(llm_call, query, knowledge_name, file_id, result_queue, idx, llm_type, need_extra) + + future_to_index = {} with concurrent.futures.ThreadPoolExecutor(max_workers=40) as executor: - future_to_index = { - executor.submit(llm_call, query, knowledge_name, file_id, result_queue, index, llm_type,need_extra): index - for index, query in enumerate(queries) - } + if llm_type == 4: + # 每组10个任务,组与组之间间隔5秒,提高缓存命中率 + group_size = 5 + for i in range(0, len(queries), group_size): + group = queries[i:i + group_size] + for idx, query in enumerate(group, start=i): + future = submit_task(executor, query, idx) + future_to_index[future] = idx + # 若还有后续组,则等待5秒 + if i + group_size < len(queries): + time.sleep(5) + else: + # 直接一次性提交所有任务 + for index, query in enumerate(queries): + future = submit_task(executor, query, index) + future_to_index[future] = index + # 收集所有任务的异常 for future in concurrent.futures.as_completed(future_to_index): - index = future_to_index[future] + idx = future_to_index[future] try: - future.result() # 确保任务完成,如果有未处理的异常会在这里抛出 + future.result() except Exception as exc: - print(f"查询索引 {index} 生成了一个异常:{exc}") - result_queue.put((index, None)) # 使用 None 作为失败的占位符 + print(f"查询索引 {idx} 生成了一个异常:{exc}") + result_queue.put((idx, None)) - # 初始化结果列表,确保按查询的索引顺序排列 + # 根据任务索引初始化结果列表 results = [None] * len(queries) while not result_queue.empty(): index, result = result_queue.get() results[index] = result - # 可选:过滤掉所有结果为 None 的项 - # 如果希望保留 None 以表示失败的查询,可以注释掉以下代码 + + # 如果所有结果都为 None,则返回空列表;否则过滤掉 None 项 if all(result is None for result in results): return [] - results = [r for r in results if r is not None] - return results + return [r for r in results if r is not None] if __name__ == "__main__": start_time=time.time() - # # # 读取问题列表 - # baseinfo_file_path = '/flask_app/static/提示词/基本信息工程标.txt' - # questions =read_questions_from_file(baseinfo_file_path) - # knowledge_name = "招标解析5word" - # llm_type=1 - # results = multi_threading(questions, knowledge_name) - # end_time = time.time() - # if not results: - # print("errror!") - # else: - # print("elapsed time:"+str(end_time-start_time)) - # # 打印结果 - # for question, response in results: - # print(f"Response: {response}") - file_path = r"C:\Users\Administrator\Desktop\fsdownload\39b0c3b4-1807-456c-8330-c5c7d1b7a2ca\ztbfile_procurement\ztbfile_procurement_1.pdf" - file_id = upload_file(file_path) + # file_path = r"C:\Users\Administrator\Desktop\fsdownload\39b0c3b4-1807-456c-8330-c5c7d1b7a2ca\ztbfile_procurement\ztbfile_procurement_1.pdf" + # file_id = upload_file(file_path) # questions=["该招标文件的项目名称是?项目编号(或招标编号)是?采购人(或招标人)是?采购代理机构(或招标代理机构)是?请按json格式给我提供信息,键名分别是'项目名称','项目编号','采购人','采购代理机构',若存在未知信息,在对应的键值中填'未知'。","该招标文件的项目概况是?项目基本情况是?请按json格式给我提供信息,键名分别为'项目概况','项目基本情况',若存在嵌套信息,嵌套内容键名以文件中对应字段命名,而嵌套键值必须与原文保持一致,若存在未知信息,在对应的键值中填'未知'。"] # results=multi_threading(questions,"",file_id,2) #1代表使用百炼rag 2代表使用qianwen-long # if not results: @@ -237,20 +244,26 @@ if __name__ == "__main__": # for question, response in results: # print(f"Question: {question}") # print(f"Response: {response}") - - # ques=["关于'资格要求',本采购文件第一章第二款要求的内容是怎样的?请按json格式给我提供信息,键名为'资格要求',而键值需要完全与原文保持一致,不要擅自总结、删减,如果存在未知信息,请在对应键值处填'未知'。"] - # # ques=["该招标文件的工程名称(项目名称)是?招标编号是?招标人是?招标代理机构是?请按json格式给我提供信息,键名分别是'工程名称','招标编号','招标人','招标代理机构',若存在未知信息,在对应的键值中填'未知'。","该招标文件的工程概况(或项目概况)是?招标范围是?请按json格式给我提供信息,键名分别为'工程概况','招标范围',若存在嵌套信息,嵌套内容键名以文件中对应字段命名,若存在未知信息,在对应的键值中填'未知'。"] - # results = multi_threading(ques, "6.2视频会议docx") - # if not results: - # print("errror!") - # else: - # # 打印结果 - # for question, response in results: - # print(f"Question: {question}") - # print(f"Response: {response}") + # query=[] - for i in range(1,50): - query.append("请返回这个数字:"+str(i)) - res=multi_threading(query,"",file_id,2) + processed_filepath=r"C:\Users\Administrator\Desktop\货物标\extract_files\107国道.txt" + full_text=read_txt_to_string(processed_filepath) + temp="请告诉我LED 全彩显示屏的功能是怎样的,请以JSON格式返回,键名为'LED 全彩显示屏',键值为字符串。" + user_query = f"文本内容:{full_text}\n" + temp +# user_query=''' +# +# 3 +# 大屏播控系统 +# 1、具有多用户多权限管理功能,支持多用户同时登录客户端,每个用户根据自身不同权限管理显示屏;2、系统对输入信号源进行预监视,实现在播控前预先查看的功能。 +# 中国 +# 1 +# 套 +# +# 请告诉我大屏播控系统的功能是怎样的,请以JSON格式返回,键名为'大屏播控系统',键值为字符串。 +# ''' + for i in range(1,15): + query.append(user_query) + res=multi_threading(query,"","",4) for _,response in res: print(response) + # end_time = time.time() diff --git a/flask_app/general/llm/通义千问long_plus.py b/flask_app/general/llm/通义千问long_plus.py index a62d0ee..540f79c 100644 --- a/flask_app/general/llm/通义千问long_plus.py +++ b/flask_app/general/llm/通义千问long_plus.py @@ -279,6 +279,9 @@ def qianwen_plus(user_query, need_extra=False): """ 使用之前上传的文件,根据用户查询生成响应,并实时显示流式输出。 + 目前命中缓存局限:1.不足 256 Token 的内容不会被缓存。 + 2.上下文缓存的命中概率并不是100%,即使是上下文完全一致的请求,也存在无法命中的概率,命中概率依据系统判断而定。 + 3.若多线程同时请求,缓存无法及时更新! 参数: - user_query: 用户查询 - need_extra: 是否需要返回额外数据(默认 False) @@ -326,7 +329,9 @@ def qianwen_plus(user_query, need_extra=False): usage = chunk_data.get('usage') if usage is not None: completion_tokens = usage.get('completion_tokens', 0) - + # prompt_tokens_details = usage.get('prompt_tokens_details', {}) #命中tokens ,取消注释可以print + # cache_hit = prompt_tokens_details.get('cached_tokens', 0) + # print("命中:"+str(cache_hit)) # 处理 choices 信息 choices = chunk_data.get('choices', []) if choices: diff --git a/flask_app/static/提示词/基本信息工程标.txt b/flask_app/static/提示词/基本信息工程标.txt index 5e864a6..e45ecad 100644 --- a/flask_app/static/提示词/基本信息工程标.txt +++ b/flask_app/static/提示词/基本信息工程标.txt @@ -30,7 +30,7 @@ } } -6.请从提供的招标文件中提取与“信息公示媒介”相关的信息(如补充说明、文件澄清、评标结果等)公示媒介(如网址、官网)。按以下 JSON 格式输出信息,其中键名为“信息公示媒介”,键值为一个字符串列表。如果存在多个信息公示媒介,请分别将原文中相关表述逐字添加至字符串中。注意,若只有一个信息公示媒介,字符串列表中只包含一个字符串。示例输出格式如下,仅供参考: +6.请从提供的招标文件中提取与“信息公示媒介”相关的信息,即补充说明、文件澄清、评标结果等信息的公示媒介(如网址、官网)。按以下 JSON 格式输出信息,其中键名为“信息公示媒介”,键值为一个字符串列表。如果存在多个信息公示媒介,请分别将原文中相关表述逐字添加至字符串中。注意,若只有一个信息公示媒介,字符串列表中只包含一个字符串。示例输出格式如下,仅供参考: { "信息公示媒介":["招标公告在政府采购网(www.test.gov.cn)发布。","中标结果将在采购网(www.test.bid.cn)予以公告。"] } diff --git a/flask_app/static/提示词/基本信息货物标.txt b/flask_app/static/提示词/基本信息货物标.txt index 61cb981..fe4ff92 100644 --- a/flask_app/static/提示词/基本信息货物标.txt +++ b/flask_app/static/提示词/基本信息货物标.txt @@ -30,7 +30,7 @@ } } -6.请从提供的招标文件中提取与“信息公示媒介”相关的信息(如补充说明、文件澄清、评标结果等)公示媒介(如网址、官网)。按以下 JSON 格式输出信息,其中键名为“信息公示媒介”,键值为一个字符串列表。如果存在多个信息公示媒介,请分别将原文中相关表述逐字添加至字符串中。注意,若只有一个信息公示媒介,字符串列表中只包含一个字符串。示例输出格式如下,仅供参考: +6.请从提供的招标文件中提取与“信息公示媒介”相关的信息,即补充说明、文件澄清、评标结果等信息的公示媒介(如网址、官网)。按以下 JSON 格式输出信息,其中键名为“信息公示媒介”,键值为一个字符串列表。如果存在多个信息公示媒介,请分别将原文中相关表述逐字添加至字符串中。注意,若只有一个信息公示媒介,字符串列表中只包含一个字符串。示例输出格式如下,仅供参考: { "信息公示媒介":["招标公告在政府采购网(www.test.gov.cn)发布。","中标结果将在采购网(www.test.bid.cn)予以公告。"] } diff --git a/flask_app/货物标/技术参数要求提取.py b/flask_app/货物标/技术参数要求提取.py index 6eac7bf..75da1e5 100644 --- a/flask_app/货物标/技术参数要求提取.py +++ b/flask_app/货物标/技术参数要求提取.py @@ -646,7 +646,8 @@ def get_technical_requirements(invalid_path, processed_filepath, model_type=1): new_query=f"文件内容:{full_text}\n" + new_query queries.append(new_query) if model_type == 1: - results = multi_threading(queries, "", "", 3, True) # 豆包 + # results = multi_threading(queries, "", "", 3, True) # 豆包 + results = multi_threading(queries, "", "", 4, True) # plus else: results = multi_threading(queries, "", file_id, 2, True) # qianwen-long temp_final = {} diff --git a/flask_app/货物标/提取采购需求main.py b/flask_app/货物标/提取采购需求main.py index 0dd47b4..f8feba0 100644 --- a/flask_app/货物标/提取采购需求main.py +++ b/flask_app/货物标/提取采购需求main.py @@ -35,11 +35,11 @@ def fetch_procurement_reqs(procurement_path, invalid_path): busi_model_type=3 #long-stream processed_filepath = "" else: - tech_model_type= 1 #doubao + tech_model_type= 1 #doubao或者qianwen-plus busi_model_type =4 #qianwen-plus processed_filepath = convert_file_to_markdown(procurement_path,"extract3.txt") # invalid_path->markdown格式 else: - tech_model_type = 1 #doubao + tech_model_type = 1 #doubao或者qianwen-plus busi_model_type = 4 processed_filepath = convert_file_to_markdown(procurement_path) # 正常情况:procurement_path->markdown格式 # processed_filepath = pdf2txt(procurement_path) # 纯文本提取