# -*- encoding:utf-8 -*- import json import re import os import sys from pathlib import Path from openai import OpenAI import concurrent.futures import queue import time def qianwen_long(file_id, user_query): print("call qianwen-long...") """ Uses a previously uploaded file to generate a response based on a user query. """ client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) # Generate a response based on the file ID completion = client.chat.completions.create( model="qwen-long", top_p=0.5, temperature=0.4, messages=[ { 'role': 'system', 'content': f'fileid://{file_id}' }, { 'role': 'user', 'content': user_query } ], stream=False ) # Return the response content return completion.choices[0].message.content def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type): if llm_type==2: print(f"qianwen_long! question:{question}") qianwen_res = qianwen_long(file_id,question) result_queue.put((ans_index,(question,qianwen_res))) return def multi_threading(queries, knowledge_name="", file_id="", llm_type=1): if not queries: return [] print("多线程提问:starting multi_threading...") result_queue = queue.Queue() # 使用 ThreadPoolExecutor 管理线程 with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: # 逐个提交任务,每提交一个任务后休眠1秒 future_to_query = {} for index, query in enumerate(queries): future = executor.submit(llm_call, query, knowledge_name, file_id, result_queue, index, llm_type) future_to_query[future] = index time.sleep(1) # 每提交一个任务后等待1秒 # 收集每个线程的结果 for future in concurrent.futures.as_completed(future_to_query): index = future_to_query[future] try: future.result() # 捕获异常或确认任务完成 except Exception as exc: print(f"Query {index} generated an exception: {exc}") # 确保在异常情况下也向 result_queue 添加占位符 result_queue.put((index, None)) # 从队列中获取所有结果并按索引排序 results = [None] * len(queries) while not result_queue.empty(): index, result = result_queue.get() results[index] = result # 检查是否所有结果都是 None if all(result is None for result in results): return [] # 过滤掉None值 results = [r for r in results if r is not None] # 返回一个保证是列表的结构 return results def upload_file(file_path): """ Uploads a file to DashScope and returns the file ID. """ client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" ) file = client.files.create(file=Path(file_path), purpose="file-extract") return file.id def load_data(json_path): """ 从指定的JSON文件中加载数据。 Args: json_path (str): JSON文件的路径。 Returns: dict: 加载的JSON数据字典。 """ try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) return data except FileNotFoundError: print(f"错误:文件未找到 - {json_path}") sys.exit(1) except json.JSONDecodeError as e: print(f"错误:解析JSON文件时出错 - {e}") sys.exit(1) def define_target_names(): """ 定义目标名称列表。 Returns: list: 目标名称列表。 """ return [ "营业执照", # "开户信息", "法定代表人身份证", # "法定代表人授权人身份证", "人员证书", "人员社保资料", # "劳动合同", "企业证书", "企业业绩", "财务审计报告", "缴纳税收证明", "公司缴纳社保证明" ] def generate_user_query(target, chapters, keywords): """ 根据目标、章节和关键词生成用户查询模板。 Args: target (str): 目标名称。 chapters (list): 相关章节列表。 keywords (list): 相关关键词列表。 Returns: str: 生成的用户查询字符串。 """ template3 = f"""这是投标文件模板,作为投标人,我需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},并确认插入的位置。我已在原文中打上若干待插入位置的标记,形如'[$$第5个待插入位置$$]',它的标记与它上面的小节内容关联。你需要返回给我{target}应该插入位置的标记序号,即'[$$第5个待插入位置$$]'中的'5',而不是页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[17, 19],若插入位置不明确,那么返回[-1]。 """ template2 = f"""你是一个专业撰写投标文件的专家,这是投标文件模板,作为投标人,你需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},或者根据文中'备注'或'附'内的信息确认插入的位置。目前原文中各章节末尾已打上待插入位置的标记,标记格式为'[$$当前章节名:第x个待插入位置$$]'形如'[$$四、投标保证金:第4个待插入位置$$]',每个标记与它紧跟着的上面的这一章内容关联。你需要返回给我{target}应该插入位置的标记序号,即'[$$四、投标保证金:第4个待插入位置$$]'中的'4',而不是当前页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[4, 5],若插入位置不明确,那么返回[-1]。 """ template1 = f"""这是投标文件模板,作为投标人,我需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},并确认插入的位置。我已在原文中各章节末尾打上待插入位置的标记,标记格式为'[$$当前章节名:第x个待插入位置$$]'形如'[$$四、投标保证金:第4个待插入位置$$]',每个标记与它紧跟着的上面的这一章内容关联。你需要返回给我{target}应该插入位置的标记数字序号,即'[$$四、投标保证金:第4个待插入位置$$]'中的'4',而不是当前页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[4, 5],若插入位置不明确,那么返回[-1]。 """ return template1 def generate_user_queries(target_names, data_dict): """ 为每个目标生成对应的用户查询。 Args: target_names (list): 目标名称列表。 data_dict (dict): 数据字典。 Returns: list: 包含目标和查询的字典列表。 """ user_queries = [] for target in target_names: if target in data_dict: chapters = data_dict[target].get("章节", []) keywords = data_dict[target].get("关键字", []) query = generate_user_query(target, chapters, keywords) user_queries.append({ "target": target, "query": query }) else: print(f"警告:'{target}'未在数据字典中找到相关信息。") return user_queries def process_string_list(string_list): """ 处理字符串列表,提取方括号内的内容并转换为实际列表。 Args: string_list (str): 包含方括号的字符串。 Returns: list: 解析后的列表内容。 """ match = re.search(r'\[(.*?)\]', string_list) if match: content_inside = match.group(1).strip() if content_inside: items = [item.strip() for item in content_inside.split(',')] if all(item.isdigit() for item in items): formatted_list = [int(item) for item in items] else: formatted_list = items return formatted_list return [] def main(): # 定义JSON文件路径 # json_path = "flask_app/general/static/插入位置.json" json_path = "/flask_app/general/static/插入位置.json" # 加载数据 data_dict = load_data(json_path) # 定义目标名称 target_names = define_target_names() # 生成用户查询列表 user_query_list = generate_user_queries(target_names, data_dict) if not user_query_list: print("没有生成任何用户查询。") sys.exit(0) # 提取查询 queries = [item['query'] for item in user_query_list] # 定义文件路径 format_part = "C:\\Users\\Administrator\\Desktop\\bid_format.pdf" # format_part="C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\新建文件夹\\bid_format.docx" # 检查文件是否存在 if not os.path.isfile(format_part): print(f"错误:文件未找到 - {format_part}") sys.exit(1) # 上传文件并获取file_id file_id = upload_file(format_part) if not file_id: print("错误:文件上传失败。") sys.exit(1) # 使用多线程并行处理查询 results = multi_threading(queries, "", file_id,2) if not results: print("错误:未收到任何处理结果。") sys.exit(1) # 清理返回结果 baseinfo_list = [process_string_list(res) for _, res in results] # 输出结果 for i, info in enumerate(baseinfo_list): print(f'{target_names[i]}:{info}') if __name__ == "__main__": main()