zbparse/flask_app/old_version/判断截取位置_old.py
2024-11-28 13:38:17 +08:00

267 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- encoding:utf-8 -*-
import json
import re
import os
import sys
from pathlib import Path
from openai import OpenAI
import concurrent.futures
import queue
import time
def qianwen_long(file_id, user_query):
print("call qianwen-long...")
"""
Uses a previously uploaded file to generate a response based on a user query.
"""
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
# Generate a response based on the file ID
completion = client.chat.completions.create(
model="qwen-long",
top_p=0.5,
temperature=0.4,
messages=[
{
'role': 'system',
'content': f'fileid://{file_id}'
},
{
'role': 'user',
'content': user_query
}
],
stream=False
)
# Return the response content
return completion.choices[0].message.content
def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type):
if llm_type==2:
print(f"qianwen_long! question:{question}")
qianwen_res = qianwen_long(file_id,question)
result_queue.put((ans_index,(question,qianwen_res)))
return
def multi_threading(queries, knowledge_name="", file_id="", llm_type=1):
if not queries:
return []
print("多线程提问starting multi_threading...")
result_queue = queue.Queue()
# 使用 ThreadPoolExecutor 管理线程
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
# 逐个提交任务每提交一个任务后休眠1秒
future_to_query = {}
for index, query in enumerate(queries):
future = executor.submit(llm_call, query, knowledge_name, file_id, result_queue, index, llm_type)
future_to_query[future] = index
time.sleep(1) # 每提交一个任务后等待1秒
# 收集每个线程的结果
for future in concurrent.futures.as_completed(future_to_query):
index = future_to_query[future]
try:
future.result() # 捕获异常或确认任务完成
except Exception as exc:
print(f"Query {index} generated an exception: {exc}")
# 确保在异常情况下也向 result_queue 添加占位符
result_queue.put((index, None))
# 从队列中获取所有结果并按索引排序
results = [None] * len(queries)
while not result_queue.empty():
index, result = result_queue.get()
results[index] = result
# 检查是否所有结果都是 None
if all(result is None for result in results):
return []
# 过滤掉None值
results = [r for r in results if r is not None]
# 返回一个保证是列表的结构
return results
def upload_file(file_path):
"""
Uploads a file to DashScope and returns the file ID.
"""
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
file = client.files.create(file=Path(file_path), purpose="file-extract")
return file.id
def load_data(json_path):
"""
从指定的JSON文件中加载数据。
Args:
json_path (str): JSON文件的路径。
Returns:
dict: 加载的JSON数据字典。
"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except FileNotFoundError:
print(f"错误:文件未找到 - {json_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"错误解析JSON文件时出错 - {e}")
sys.exit(1)
def define_target_names():
"""
定义目标名称列表。
Returns:
list: 目标名称列表。
"""
return [
"营业执照",
# "开户信息",
"法定代表人身份证",
# "法定代表人授权人身份证",
"人员证书",
"人员社保资料",
# "劳动合同",
"企业证书",
"企业业绩",
"财务审计报告",
"缴纳税收证明",
"公司缴纳社保证明"
]
def generate_user_query(target, chapters, keywords):
"""
根据目标、章节和关键词生成用户查询模板。
Args:
target (str): 目标名称。
chapters (list): 相关章节列表。
keywords (list): 相关关键词列表。
Returns:
str: 生成的用户查询字符串。
"""
template3 = f"""这是投标文件模板,作为投标人,我需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},并确认插入的位置。我已在原文中打上若干待插入位置的标记,形如'[$$第5个待插入位置$$]',它的标记与它上面的小节内容关联。你需要返回给我{target}应该插入位置的标记序号,即'[$$第5个待插入位置$$]'中的'5',而不是页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[17, 19],若插入位置不明确,那么返回[-1]。
"""
template2 = f"""你是一个专业撰写投标文件的专家,这是投标文件模板,作为投标人,你需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},或者根据文中'备注'''内的信息确认插入的位置。目前原文中各章节末尾已打上待插入位置的标记,标记格式为'[$$当前章节名第x个待插入位置$$]'形如'[$$四、投标保证金第4个待插入位置$$]',每个标记与它紧跟着的上面的这一章内容关联。你需要返回给我{target}应该插入位置的标记序号,即'[$$四、投标保证金第4个待插入位置$$]'中的'4',而不是当前页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[4, 5],若插入位置不明确,那么返回[-1]。
"""
template1 = f"""这是投标文件模板,作为投标人,我需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},并确认插入的位置。我已在原文中各章节末尾打上待插入位置的标记,标记格式为'[$$当前章节名第x个待插入位置$$]'形如'[$$四、投标保证金第4个待插入位置$$]',每个标记与它紧跟着的上面的这一章内容关联。你需要返回给我{target}应该插入位置的标记数字序号,即'[$$四、投标保证金第4个待插入位置$$]'中的'4',而不是当前页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[4, 5],若插入位置不明确,那么返回[-1]。
"""
return template1
def generate_user_queries(target_names, data_dict):
"""
为每个目标生成对应的用户查询。
Args:
target_names (list): 目标名称列表。
data_dict (dict): 数据字典。
Returns:
list: 包含目标和查询的字典列表。
"""
user_queries = []
for target in target_names:
if target in data_dict:
chapters = data_dict[target].get("章节", [])
keywords = data_dict[target].get("关键字", [])
query = generate_user_query(target, chapters, keywords)
user_queries.append({
"target": target,
"query": query
})
else:
print(f"警告:'{target}'未在数据字典中找到相关信息。")
return user_queries
def process_string_list(string_list):
"""
处理字符串列表,提取方括号内的内容并转换为实际列表。
Args:
string_list (str): 包含方括号的字符串。
Returns:
list: 解析后的列表内容。
"""
match = re.search(r'\[(.*?)\]', string_list)
if match:
content_inside = match.group(1).strip()
if content_inside:
items = [item.strip() for item in content_inside.split(',')]
if all(item.isdigit() for item in items):
formatted_list = [int(item) for item in items]
else:
formatted_list = items
return formatted_list
return []
def main():
# 定义JSON文件路径
# json_path = "flask_app/general/static/插入位置.json"
json_path = "/flask_app/general/static/插入位置.json"
# 加载数据
data_dict = load_data(json_path)
# 定义目标名称
target_names = define_target_names()
# 生成用户查询列表
user_query_list = generate_user_queries(target_names, data_dict)
if not user_query_list:
print("没有生成任何用户查询。")
sys.exit(0)
# 提取查询
queries = [item['query'] for item in user_query_list]
# 定义文件路径
format_part = "C:\\Users\\Administrator\\Desktop\\bid_format.pdf"
# format_part="C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\新建文件夹\\bid_format.docx"
# 检查文件是否存在
if not os.path.isfile(format_part):
print(f"错误:文件未找到 - {format_part}")
sys.exit(1)
# 上传文件并获取file_id
file_id = upload_file(format_part)
if not file_id:
print("错误:文件上传失败。")
sys.exit(1)
# 使用多线程并行处理查询
results = multi_threading(queries, "", file_id,2)
if not results:
print("错误:未收到任何处理结果。")
sys.exit(1)
# 清理返回结果
baseinfo_list = [process_string_list(res) for _, res in results]
# 输出结果
for i, info in enumerate(baseinfo_list):
print(f'{target_names[i]}:{info}')
if __name__ == "__main__":
main()