zbparse/flask_app/general/判断截取位置.py

267 lines
10 KiB
Python
Raw Permalink Normal View History

2024-10-25 10:18:38 +08:00
# -*- encoding:utf-8 -*-
2024-10-24 14:34:37 +08:00
import json
2024-10-28 17:40:02 +08:00
import re
import os
import sys
2024-10-29 09:04:23 +08:00
from pathlib import Path
from openai import OpenAI
import concurrent.futures
import queue
import time
def qianwen_long(file_id, user_query):
print("call qianwen-long...")
"""
Uses a previously uploaded file to generate a response based on a user query.
"""
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
# Generate a response based on the file ID
completion = client.chat.completions.create(
model="qwen-long",
top_p=0.5,
temperature=0.4,
messages=[
{
'role': 'system',
'content': f'fileid://{file_id}'
},
{
'role': 'user',
'content': user_query
}
],
stream=False
)
# Return the response content
return completion.choices[0].message.content
def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type):
if llm_type==2:
print(f"qianwen_long! question:{question}")
qianwen_res = qianwen_long(file_id,question)
result_queue.put((ans_index,(question,qianwen_res)))
return
def multi_threading(queries, knowledge_name="", file_id="", llm_type=1):
if not queries:
return []
print("多线程提问starting multi_threading...")
result_queue = queue.Queue()
# 使用 ThreadPoolExecutor 管理线程
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
# 逐个提交任务每提交一个任务后休眠1秒
future_to_query = {}
for index, query in enumerate(queries):
future = executor.submit(llm_call, query, knowledge_name, file_id, result_queue, index, llm_type)
future_to_query[future] = index
time.sleep(1) # 每提交一个任务后等待1秒
# 收集每个线程的结果
for future in concurrent.futures.as_completed(future_to_query):
index = future_to_query[future]
try:
future.result() # 捕获异常或确认任务完成
except Exception as exc:
print(f"Query {index} generated an exception: {exc}")
# 确保在异常情况下也向 result_queue 添加占位符
result_queue.put((index, None))
# 从队列中获取所有结果并按索引排序
results = [None] * len(queries)
while not result_queue.empty():
index, result = result_queue.get()
results[index] = result
# 检查是否所有结果都是 None
if all(result is None for result in results):
return []
# 过滤掉None值
results = [r for r in results if r is not None]
# 返回一个保证是列表的结构
return results
def upload_file(file_path):
"""
Uploads a file to DashScope and returns the file ID.
"""
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
file = client.files.create(file=Path(file_path), purpose="file-extract")
return file.id
2024-10-28 17:40:02 +08:00
def load_data(json_path):
"""
从指定的JSON文件中加载数据
Args:
json_path (str): JSON文件的路径
Returns:
dict: 加载的JSON数据字典
"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except FileNotFoundError:
print(f"错误:文件未找到 - {json_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"错误解析JSON文件时出错 - {e}")
sys.exit(1)
def define_target_names():
"""
定义目标名称列表
Returns:
list: 目标名称列表
"""
return [
"营业执照",
# "开户信息",
"法定代表人身份证",
# "法定代表人授权人身份证",
"人员证书",
"人员社保资料",
# "劳动合同",
"企业证书",
"企业业绩",
2024-10-29 09:04:23 +08:00
"财务审计报告",
"缴纳税收证明",
"公司缴纳社保证明"
2024-10-28 17:40:02 +08:00
]
2024-10-24 14:34:37 +08:00
def generate_user_query(target, chapters, keywords):
2024-10-28 17:40:02 +08:00
"""
根据目标章节和关键词生成用户查询模板
Args:
target (str): 目标名称
chapters (list): 相关章节列表
keywords (list): 相关关键词列表
Returns:
str: 生成的用户查询字符串
"""
2024-10-29 09:04:23 +08:00
template3 = f"""这是投标文件模板,作为投标人,我需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},并确认插入的位置。我已在原文中打上若干待插入位置的标记,形如'[$$第5个待插入位置$$]',它的标记与它上面的小节内容关联。你需要返回给我{target}应该插入位置的标记序号,即'[$$第5个待插入位置$$]'中的'5',而不是页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[17, 19],若插入位置不明确,那么返回[-1]。
2024-10-24 14:34:37 +08:00
"""
2024-10-29 09:04:23 +08:00
template2 = f"""你是一个专业撰写投标文件的专家,这是投标文件模板,作为投标人,你需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},或者根据文中'备注'''内的信息确认插入的位置。目前原文中各章节末尾已打上待插入位置的标记,标记格式为'[$$当前章节名第x个待插入位置$$]'形如'[$$四、投标保证金第4个待插入位置$$]',每个标记与它紧跟着的上面的这一章内容关联。你需要返回给我{target}应该插入位置的标记序号,即'[$$四、投标保证金第4个待插入位置$$]'中的'4',而不是当前页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[4, 5],若插入位置不明确,那么返回[-1]。
"""
template1 = f"""这是投标文件模板,作为投标人,我需要把不同的投标材料填充到对应位置,请你根据该文件回答:{target}应该插入在该文件哪个地方?你可能需要查找以下关键词出现的地方:{', '.join([f"'{kw}'" for kw in keywords])},并确认插入的位置。我已在原文中各章节末尾打上待插入位置的标记,标记格式为'[$$当前章节名第x个待插入位置$$]'形如'[$$四、投标保证金第4个待插入位置$$]',每个标记与它紧跟着的上面的这一章内容关联。你需要返回给我{target}应该插入位置的标记数字序号,即'[$$四、投标保证金第4个待插入位置$$]'中的'4',而不是当前页码,若有多个位置需要插入,可以返回多个序号,你的回答以数组返回,如[4, 5],若插入位置不明确,那么返回[-1]。
"""
return template1
2024-10-28 17:40:02 +08:00
def generate_user_queries(target_names, data_dict):
"""
为每个目标生成对应的用户查询
Args:
target_names (list): 目标名称列表
data_dict (dict): 数据字典
Returns:
list: 包含目标和查询的字典列表
2024-10-25 10:18:38 +08:00
"""
2024-10-28 17:40:02 +08:00
user_queries = []
for target in target_names:
if target in data_dict:
chapters = data_dict[target].get("章节", [])
keywords = data_dict[target].get("关键字", [])
query = generate_user_query(target, chapters, keywords)
user_queries.append({
"target": target,
"query": query
})
else:
print(f"警告:'{target}'未在数据字典中找到相关信息。")
return user_queries
def process_string_list(string_list):
"""
处理字符串列表提取方括号内的内容并转换为实际列表
Args:
string_list (str): 包含方括号的字符串
Returns:
list: 解析后的列表内容
"""
match = re.search(r'\[(.*?)\]', string_list)
if match:
content_inside = match.group(1).strip()
if content_inside:
items = [item.strip() for item in content_inside.split(',')]
if all(item.isdigit() for item in items):
formatted_list = [int(item) for item in items]
else:
formatted_list = items
return formatted_list
return []
def main():
# 定义JSON文件路径
# json_path = "flask_app/general/static/插入位置.json"
json_path = "D:\\flask_project\\flask_app\\general\\static\\插入位置.json"
# 加载数据
data_dict = load_data(json_path)
# 定义目标名称
target_names = define_target_names()
# 生成用户查询列表
user_query_list = generate_user_queries(target_names, data_dict)
if not user_query_list:
print("没有生成任何用户查询。")
sys.exit(0)
# 提取查询
queries = [item['query'] for item in user_query_list]
# 定义文件路径
2024-10-29 09:04:23 +08:00
format_part = "C:\\Users\\Administrator\\Desktop\\bid_format.pdf"
# format_part="C:\\Users\\Administrator\\Desktop\\货物标\\zbfiles\\新建文件夹\\bid_format.docx"
2024-10-28 17:40:02 +08:00
# 检查文件是否存在
if not os.path.isfile(format_part):
print(f"错误:文件未找到 - {format_part}")
sys.exit(1)
# 上传文件并获取file_id
file_id = upload_file(format_part)
if not file_id:
print("错误:文件上传失败。")
sys.exit(1)
# 使用多线程并行处理查询
results = multi_threading(queries, "", file_id,2)
if not results:
print("错误:未收到任何处理结果。")
sys.exit(1)
# 清理返回结果
baseinfo_list = [process_string_list(res) for _, res in results]
# 输出结果
2024-10-29 09:04:23 +08:00
for i, info in enumerate(baseinfo_list):
print(f'{target_names[i]}:{info}')
2024-10-28 17:40:02 +08:00
if __name__ == "__main__":
main()