2024-10-10 21:03:02 +08:00

369 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import os.path
import time
import re
from flask_app.main.json_utils import combine_json_results, nest_json_under_key
from flask_app.main.通义千问long import upload_file, qianwen_long
from concurrent.futures import ThreadPoolExecutor
from flask_app.main.禁止投标情形 import find_forbidden, process_string_list
#如果当前段落有序号,则向下匹配直接遇到相同的序号样式
#如果当前段落无序号,则向下匹配序号,把若干同类的序号都摘出来。
def extract_text_with_keywords(doc_path, keywords, follow_up_keywords):
from collections import OrderedDict
from docx import Document
import re
if isinstance(keywords, str):
keywords = [keywords]
doc = Document(doc_path)
extracted_paragraphs = OrderedDict()
continue_collecting = False
current_section_pattern = None
active_key = None
def match_keywords(text, patterns):
# 首先检查关键词是否匹配
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
# 如果当前的模式是 '不\s*得',则额外检查是否匹配 '不得分'
if pattern == r'\s*得':
post_match_text = text[match.end():].strip()
if post_match_text.startswith(""):
continue # 如果是"不得分",跳过这个匹配
return True
return False
def extract_from_text(text, current_index):
nonlocal continue_collecting, current_section_pattern, active_key
if text == "":
return current_index
if continue_collecting:
if current_section_pattern and re.match(current_section_pattern, text):
continue_collecting = False
active_key = None
else:
if active_key is not None:
extracted_paragraphs[active_key].append(text)
return current_index
if match_keywords(text, keywords):
active_key = text
extracted_paragraphs[active_key] = [text]
if match_keywords(text, follow_up_keywords):
continue_collecting = True
section_number = re.match(r'(\d+(\s*\.\s*\d+)*)', text)
if section_number:
current_section_number = section_number.group(1)
level_count = current_section_number.count('.')
# Pattern to match current level, e.g., 3.4.5
pattern = r'^' + (r'\d+\s*\.\s*') * level_count + r'\d+'
# Generate patterns for next section at same level and parent level
parts = current_section_number.split('.')
matched_patterns = [pattern] # start with the full pattern
# Next section at same level
parts[-1] = str(int(parts[-1]) + 1)
next_pattern = r'^' + r'\s*\.\s*'.join(parts)
matched_patterns.append(next_pattern)
# Parent section (if applicable)
if len(parts) > 1:
parent_section_parts = parts[:-1]
parent_section_parts[-1] = str(int(parent_section_parts[-1]) + 1)
parent_pattern = r'^' + r'\s*\.\s*'.join(parent_section_parts)
matched_patterns.append(parent_pattern)
# Combine the patterns
combined_pattern = r'(' + r')|('.join(matched_patterns) + r')'
current_section_pattern = re.compile(combined_pattern)
else:
found_next_number = False
current_section_pattern = None
while current_index < len(doc.paragraphs) - 1:
current_index += 1
next_text = doc.paragraphs[current_index].text.strip()
if not found_next_number:
next_section_number = re.match(r'^([A-Za-z0-9]+(?:\.[A-Za-z0-9]+)*)|(\(\d+\))', next_text)
if next_section_number:
found_next_number = True
if next_section_number.group(1):
section_parts = next_section_number.group(1).split('.')
dynamic_pattern = r'^' + r'\.'.join([r'[A-Za-z0-9]+' for _ in section_parts]) + r'\b'
elif next_section_number.group(2):
dynamic_pattern = r'^[\(\]\d+[\)\]'
current_section_pattern = re.compile(dynamic_pattern)
if current_section_pattern and re.match(current_section_pattern, next_text):
extracted_paragraphs[active_key].append(next_text)
else:
continue_collecting = False
active_key=None
break
return current_index
index = 0
while index < len(doc.paragraphs):
index = extract_from_text(doc.paragraphs[index].text.strip(), index)
index += 1
return extracted_paragraphs
def preprocess_text_list(text_list):
new_text_list = []
# 正则表达式匹配中文字符或标点后的空格,该空格后紧跟字母、数字或带括号的数字
split_pattern = re.compile(r'(?<=[\u4e00-\u9fff。?!;])(?=\s+[a-zA-Z\d]|\s+\([1-9]\d*\)|\s+\[1-9]\d*\)')
for text in text_list:
# 使用正则表达式检查并拆分元素
parts = split_pattern.split(text)
new_text_list.extend(part.strip() for part in parts if part.strip()) # 添加非空字符串检查
return new_text_list
def clean_dict_datas(extracted_contents, keywords,excludes): #让正则表达式提取到的东西格式化
all_texts1 = []
all_texts2=[]
# 定义用于分割句子的正则表达式,包括中文和西文的结束标点
split_pattern = r'(?<=[。!?\!\?])'
for key, text_list in extracted_contents.items():
if len(text_list) == 1:
for data in text_list:
# 检查是否包含任何需要排除的字符串
if any(exclude in data for exclude in excludes):
continue # 如果包含任何排除字符串,跳过这个数据
# 去掉开头的序号,包括字母+数字的格式 以及括号+数字
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、)?)'
data = re.sub(pattern, '', data).strip()
keyword_match = re.search(keywords, data)
if keyword_match:
# 从关键词位置开始查找结束标点符号
start_pos = keyword_match.start()
# 截取从关键词开始到后面的内容
substring = data[start_pos:]
# 按定义的结束标点分割
sentences = re.split(split_pattern, substring, 1)
if len(sentences) > 0 and sentences[0]:
# 只取第一句,保留标点
cleaned_text = data[:start_pos] + sentences[0] #eg:经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。潜在投标人应自行承担现场考察的全部费用、责任和风险。
# 经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。
else:
cleaned_text = data # 如果没有标点,使用整个字符串
else:
# 如果没有找到关键词,保留原文本
cleaned_text = data
all_texts1.append(cleaned_text) # 将处理后的文本添加到结果列表
else:
print(text_list)
new_text_list=preprocess_text_list(text_list)
print(new_text_list)
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、)?)'
data = re.sub(pattern, '', new_text_list[0]).strip() #去除序号
# 将修改后的第一个元素和剩余的元素连接起来
new_text_list[0] = data # 更新列表中的第一个元素
joined_text = "\n".join(new_text_list) # 如果列表中有多个元素,则连接它们
all_texts2.append(joined_text) # 将每个列表的内容添加到 all_texts 中
return all_texts1,all_texts2 #all_texts1要额外用gpt all_text2直接返回结果
def find_sentences_with_keywords(data, keywords, follow_up_keywords):
"""递归查找并返回包含关键词的句子列表,并根据是否存在后续关键词分别存储到两个列表中。"""
sentences1 = [] # 保存没有后续关键词的情况
sentences2 = [] # 保存有后续关键词的情况
if isinstance(data, dict):
for value in data.values():
result1, result2 = find_sentences_with_keywords(value, keywords, follow_up_keywords)
sentences1.extend(result1)
sentences2.extend(result2)
elif isinstance(data, list):
for item in data:
result1, result2 = find_sentences_with_keywords(item, keywords, follow_up_keywords)
sentences1.extend(result1)
sentences2.extend(result2)
elif isinstance(data, str):
# 分割句子,保证句子完整性(按标点符号和序号分割)
split_sentences = re.split(r'(?<=[。!?\!\?])|(?=\d+[\\.])|(?=[(]\d+[)])', data) # 扩展匹配序号分割
i = 0
while i < len(split_sentences):
sentence = split_sentences[i].strip()
if re.search(keywords, sentence, re.IGNORECASE):
follow_up_present = any(
re.search(follow_up, sentence, re.IGNORECASE) for follow_up in follow_up_keywords)
if follow_up_present:
# 如果存在后续关键词,则从当前位置开始截取
start_index = i
end_index = start_index
found_next_section = False
for j in range(start_index + 1, len(split_sentences)):
if re.match(r'\d+\.\d+(\.\d+)?', split_sentences[j].strip()):
end_index = j
found_next_section = True
break
if found_next_section:
full_text = ' '.join(split_sentences[start_index:end_index]).strip()
else:
full_text = ' '.join(split_sentences[start_index:]).strip()
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)'
data=re.sub(pattern,'',full_text)
sentences2.append(data) # 存储有后续关键词的情况
i = end_index if found_next_section else len(split_sentences)
else:
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)'
data = re.sub(pattern, '', sentence).replace('\n','').strip()
sentences1.append(data) # 存储没有后续关键词的情况
i += 1
else:
i += 1
return sentences1, sentences2 # 返回两个列表
def extract_sentences_from_json(json_path, keywords,follow_up_keywords):
with open(json_path, 'r', encoding='utf-8') as file:
data = json.load(file)
"""从JSON数据中提取包含关键词的句子。"""
return find_sentences_with_keywords(data, keywords,follow_up_keywords)
#处理无效投标
def extract_values_if_contains(data, includes):
"""
递归检查字典中的值是否包含列表 'includes' 中的内容。
如果包含,将这些值添加到一个列表中并返回。
参数:
data (dict): 字典或从 JSON 解析得到的数据。
includes (list): 包含要检查的关键词的列表。
返回:
list: 包含满足条件的值的列表。
"""
included_values = [] # 初始化结果列表
# 定义递归函数来处理嵌套字典
def recursive_search(current_data):
if isinstance(current_data, dict):
for key, value in current_data.items():
if isinstance(value, dict):
# 如果值是字典,递归搜索
recursive_search(value)
elif isinstance(value, str):
# 如果值是字符串,检查是否包含任何 includes 中的关键词
if any(include in value for include in includes):
included_values.append(value)
elif isinstance(current_data, list):
for item in current_data:
# 如果是列表,递归每个元素
recursive_search(item)
# 开始递归搜索
recursive_search(data)
return included_values
#你是一个文本助手,文本内的信息以'...............'分割,你负责准确筛选所需的信息并返回,每块信息要求完整,不遗漏,你不得擅自进行总结或删减。
#以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或使投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号。
#以上是原文内容,文本内的信息以'...............'分割请你根据该信息回答否决投标或拒绝投标或无效投标或使投标失效的情况有哪些文本中可能存在无关的信息请你准确筛选所需的信息并返回。最终结果以json列表格式返回给我键名为'否决和无效投标情形',你的回答完全忠于原文内容,且回答内容与原文内容一致,要求完整与准确,不能擅自总结或者概括。",
def handle_query(file_path, user_query, output_file, result_key, keywords, truncate_json_path):
excludes = ["说明表", "重新招标", "否决所有", "否决投标的条件", "备注:", "本人保证:"]
follow_up_keywords = [r'\s*形\s*之\s*一', r'\s*况\s*之\s*一', r'\s*列', r'\s*下']
extracted_contents = extract_text_with_keywords(file_path, [keywords], follow_up_keywords) #字典结果
# print(extracted_contents)
all_texts1, all_texts2 = clean_dict_datas(extracted_contents, keywords, excludes) # 列表
all_tables1, all_tables2 = extract_sentences_from_json(truncate_json_path, keywords, follow_up_keywords)
qianwen_txt = all_texts1 + all_tables1
# Proceed only if there is content to write
if qianwen_txt:
with open(output_file, 'w', encoding='utf-8') as file:
# 初始化一个计数器
counter = 1
for content in qianwen_txt:
file.write("..............."+'\n')
# 写入内容前加上序号,后面接一个点和空格,然后是内容
file.write(f"{counter}. {content}\n")
# 更新计数器,每次循环递增
counter += 1
file_id = upload_file(output_file)
qianwen_ans = qianwen_long(file_id, user_query)
selected_contents = set() # 使用 set 去重
num_list = process_string_list(qianwen_ans)
print(num_list)
for index in num_list:
if index - 1 < len(qianwen_txt):
content = qianwen_txt[index - 1] # 转换序号为索引假设序号从1开始
selected_contents.add(content)
# 将 all_texts2 和 all_tables2 中的内容也添加到 set 中
selected_contents.update(all_texts2)
selected_contents.update(all_tables2)
# 将 set 转换为 list 来返回结果
res = {result_key: list(selected_contents)}
# 将结果转换为JSON字符串
# os.remove(output_file) # Remove the file after use
# print(f"Deleted temporary file: {output_file}")
else:
res = {result_key: ""} # Set the response to empty if no contents were extracted
return res
def combine_find_invalid(file_path, output_dir, truncate_json_path,clause_path):
queries = [
(r'\s*决|无\s*效\s*投\s*标|无\s*效\s*文\s*件|被\s*拒\s*绝|予\s*以\s*拒\s*绝|投\s*标\s*失\s*效|投\s*标\s*无\s*效',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号若情况不存在返回[]。",
os.path.join(output_dir, "temp1.txt"), "否决和无效投标情形"),
(r'\s*标',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:废标项的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号若情况不存在返回[]。",
os.path.join(output_dir, "temp2.txt"), "废标项"),
(r'\s*得|禁\s*止\s*投\s*标',"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,每条信息规定了各方不得存在的情形,请回答:在这些信息中,主语是投标人或中标人或供应商或联合体投标各方或磋商小组的信息有哪些?不要返回主语是招标人或采购人或评标委员会的信息,请你筛选所需的信息并将它的序号返回。请以[x,x,x]格式返回给我结果,示例返回为[1,4,6],若情况不存在,返回[]。",
os.path.join(output_dir,"temp3.txt"),"不得存在的情形")
]
results = []
# 使用线程池来并行处理查询
with ThreadPoolExecutor() as executor:
futures = []
for keywords, user_query, output_file, result_key in queries:
future = executor.submit(handle_query, file_path, user_query, output_file, result_key, keywords,
truncate_json_path)
futures.append(future)
time.sleep(1) # 暂停1秒后再提交下一个任务
for future in futures:
results.append(future.result())
# #禁止投标
# print("starting不得存在的情形...")
# forbidden_res = find_forbidden(truncate_json_path, clause_path)
# results.append(forbidden_res)
combined_dict = {}
for d in results:
combined_dict.update(d)
print("无效标与废标done...")
# return nest_json_under_key(combined_dict, "无效标与废标项")
return {"无效标与废标项":combined_dict}
if __name__ == '__main__':
start_time = time.time()
truncate_json_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp2\\磋商文件_tobidders_notice_part1\\truncate_output.json"
clause_path="C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp1\\clause磋商文件_tobidders_notice_part2.json"
output_dir = "C:\\Users\\Administrator\\Desktop\\货物标\\output4\\invalid"
# doc_path = 'C:\\Users\\Administrator\\Desktop\\fsdownload\\temp7\\3abb6e16-19db-42ad-9504-53bf1072dfe7\\ztbfile_invalid.docx'
doc_path = 'C:\\Users\\Administrator\\Desktop\\货物标\\zbfilesdocx\\磋商文件.docx'
results = combine_find_invalid(doc_path, output_dir,truncate_json_path,clause_path)
end_time = time.time()
print("Elapsed time:", str(end_time - start_time))
print("Results:", json.dumps(results,ensure_ascii=False,indent=4))