zbparse/flask_app/货物标/无效标和废标和禁止投标整合货物标版.py
2024-10-12 18:01:59 +08:00

434 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import os.path
import time
import re
from flask_app.main.通义千问long import upload_file, qianwen_long
from concurrent.futures import ThreadPoolExecutor
from flask_app.main.禁止投标情形 import find_forbidden, process_string_list
from docx import Document
#如果当前段落有序号,则向下匹配直接遇到相同的序号样式
#如果当前段落无序号,则向下匹配序号,把若干同类的序号都摘出来。
def extract_text_with_keywords(doc_path, keywords, follow_up_keywords):
from collections import OrderedDict
from docx import Document
import re
if isinstance(keywords, str):
keywords = [keywords]
doc = Document(doc_path)
extracted_paragraphs = OrderedDict()
continue_collecting = False
current_section_pattern = None
active_key = None
def match_keywords(text, patterns):
# 首先检查关键词是否匹配
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
# 如果当前的模式是 '不\s*得',则额外检查是否匹配 '不得分'
if '\s*得' in pattern: # 使用字符串匹配检查模式
post_match_text = text[match.end():].strip()
if post_match_text.startswith(""):
continue # 如果是"不得分",跳过这个匹配
return True
return False
def extract_from_text(text, current_index):
nonlocal continue_collecting, current_section_pattern, active_key
if text == "":
return current_index
if continue_collecting:
if current_section_pattern and re.match(current_section_pattern, text):
continue_collecting = False
active_key = None
else:
if active_key is not None:
extracted_paragraphs[active_key].append(text)
return current_index
if match_keywords(text, keywords):
active_key = text
extracted_paragraphs[active_key] = [text]
if match_keywords(text, follow_up_keywords):
continue_collecting = True
section_number = re.match(r'(\d+(\s*\.\s*\d+)*)', text)
if section_number:
current_section_number = section_number.group(1)
level_count = current_section_number.count('.')
# Pattern to match current level, e.g., 3.4.5
pattern = r'^' + (r'\d+\s*\.\s*') * level_count + r'\d+'
# Generate patterns for next section at same level and parent level
parts = current_section_number.split('.')
matched_patterns = [pattern] # start with the full pattern
# Next section at same level
parts[-1] = str(int(parts[-1]) + 1)
next_pattern = r'^' + r'\s*\.\s*'.join(parts)
matched_patterns.append(next_pattern)
# Parent section (if applicable)
if len(parts) > 1:
parent_section_parts = parts[:-1]
parent_section_parts[-1] = str(int(parent_section_parts[-1]) + 1)
parent_pattern = r'^' + r'\s*\.\s*'.join(parent_section_parts)
matched_patterns.append(parent_pattern)
# Combine the patterns
combined_pattern = r'(' + r')|('.join(matched_patterns) + r')'
current_section_pattern = re.compile(combined_pattern)
else:
found_next_number = False
current_section_pattern = None
while current_index < len(doc.paragraphs) - 1:
current_index += 1
next_text = doc.paragraphs[current_index].text.strip()
if not found_next_number:
next_section_number = re.match(r'^([A-Za-z0-9]+(?:\.[A-Za-z0-9]+)*)|(\(\d+\))', next_text)
if next_section_number:
found_next_number = True
if next_section_number.group(1):
section_parts = next_section_number.group(1).split('.')
dynamic_pattern = r'^' + r'\.'.join([r'[A-Za-z0-9]+' for _ in section_parts]) + r'\b'
elif next_section_number.group(2):
dynamic_pattern = r'^[\(\]\d+[\)\]'
current_section_pattern = re.compile(dynamic_pattern)
if current_section_pattern and re.match(current_section_pattern, next_text):
extracted_paragraphs[active_key].append(next_text)
else:
continue_collecting = False
active_key=None
break
return current_index
index = 0
while index < len(doc.paragraphs):
index = extract_from_text(doc.paragraphs[index].text.strip(), index)
index += 1
return extracted_paragraphs
"""
eg:
text_list = ["这是第一句。 1. 接下来是第二句! (3) 最后一句。"]
new_text_list = ["这是第一句。", "1. 接下来是第二句!", "(3) 最后一句。"]
"""
def preprocess_text_list(text_list):
new_text_list = []
# 正则表达式匹配中文字符或标点后的空格,该空格后紧跟字母、数字或带括号的数字
split_pattern = re.compile(r'(?<=[\u4e00-\u9fff。?!;])(?=\s+[a-zA-Z\d]|\s+\([1-9]\d*\)|\s+\[1-9]\d*\)')
for text in text_list:
# 使用正则表达式检查并拆分元素
parts = split_pattern.split(text)
new_text_list.extend(part.strip() for part in parts if part.strip()) # 添加非空字符串检查
return new_text_list
def clean_dict_datas(extracted_contents, keywords,excludes): #让正则表达式提取到的东西格式化
all_texts1 = []
all_texts2=[]
# 定义用于分割句子的正则表达式,包括中文和西文的结束标点
split_pattern = r'(?<=[。!?\!\?])'
for key, text_list in extracted_contents.items():
if len(text_list) == 1:
for data in text_list:
# 检查是否包含任何需要排除的字符串
if any(exclude in data for exclude in excludes):
continue # 如果包含任何排除字符串,跳过这个数据
# 去掉开头的序号eg:1 | (1) |2 | 1. | 2全角点| 3、 | 1.1 | 2.3.4 | A1 | C1.1 | 一、
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
data = re.sub(pattern, '', data).strip()
keyword_match = re.search(keywords, data)
if keyword_match:
# 从关键词位置开始查找结束标点符号
start_pos = keyword_match.start()
# 截取从关键词开始到后面的内容
substring = data[start_pos:]
# 按定义的结束标点分割
sentences = re.split(split_pattern, substring, 1)
if len(sentences) > 0 and sentences[0]:
# 只取第一句,保留标点
cleaned_text = data[:start_pos] + sentences[0] #eg:经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。潜在投标人应自行承担现场考察的全部费用、责任和风险。
# 经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。
else:
cleaned_text = data # 如果没有标点,使用整个字符串
else:
# 如果没有找到关键词,保留原文本
cleaned_text = data
all_texts1.append(cleaned_text) # 将处理后的文本添加到结果列表
else:
new_text_list=preprocess_text_list(text_list)
#用于处理结构化文本,清理掉不必要的序号,并将分割后的段落合并,最终形成更简洁和格式化的输出。
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
data = re.sub(pattern, '', new_text_list[0]).strip() #去除序号
# 将修改后的第一个元素和剩余的元素连接起来
new_text_list[0] = data # 更新列表中的第一个元素
joined_text = "\n".join(new_text_list) # 如果列表中有多个元素,则连接它们
all_texts2.append(joined_text) # 将每个列表的内容添加到 all_texts 中
return all_texts1,all_texts2 #all_texts1要额外用gpt all_text2直接返回结果
#只读取前附表中的最后一列(省钱,但容易漏内容)
def read_docx_last_column(truncate_file):
# 尝试打开文档
try:
doc = Document(truncate_file)
except Exception as e:
print(f"Error opening file: {e}")
return []
last_column_values = []
# 读取文档中的所有表格
if not doc.tables:
print("No tables found in the document.")
return last_column_values
# 遍历文档中的每个表格
for table in doc.tables:
# 获取表格的最后一列
for row in table.rows:
last_cell = row.cells[-1] # 获取最后一个单元格
# 去除内容前后空白并删除文本中的所有空格
cleaned_text = last_cell.text.strip().replace(' ', '')
last_column_values.append(cleaned_text)
return last_column_values
#完整读取文件中所有表格适合pdf转docx价格便宜的情况优先推荐内容完整
def read_tables_from_docx(file_path):
# 尝试打开文档
try:
doc = Document(file_path)
except Exception as e:
print(f"Error opening file: {e}")
return []
# 初始化列表来保存符合条件的单元格内容
cell_contents = []
# 读取文档中的所有表格
if not doc.tables:
print("No tables found in the document.")
return []
# 遍历文档中的每个表格
for table_idx, table in enumerate(doc.tables):
# 遍历表格中的每一行
for row_idx, row in enumerate(table.rows):
# 遍历每一行中的单元格
for cell in row.cells:
cell_text = cell.text.strip() # 去除单元格内容前后空白
if len(cell_text) > 6: # 检查文字数量是否大于5
cell_contents.append(cell_text)
# 返回符合条件的单元格内容
return cell_contents
def extract_table_with_keywords(data, keywords, follow_up_keywords):
"""遍历列表中的每个元素,查找并返回包含关键词的句子列表,并根据是否存在后续关键词分别存储到两个列表中。"""
sentences1 = [] # 保存没有后续关键词的情况
sentences2 = [] # 保存有后续关键词的情况
# 检查是否包含 '无效报价' 的关键词
check_invalid_bidding = '\s*效\s*报\s*价' in keywords
# 遍历列表中的每个字符串元素
for item in data:
# 只有在包含 '无效投标' 关键词时,才检查 "无效报价"
if check_invalid_bidding and re.search(r'\s*效\s*报\s*价', item, re.IGNORECASE):
sentences1.append(item.strip())
continue
# 分割句子保证句子完整性按标点符号和序号分割eg:(?=\d+\.\d+):匹配诸如 1.1、2.2 之类的序号,并在序号前进行分割。
split_sentences = re.split(r'(?<=[。!?\!\?])|(?=\d+\.\d+)|(?=\d+[\\.])|(?=[(]\d+[)])', item)
i = 0
while i < len(split_sentences):
sentence = split_sentences[i].strip()
if re.search(keywords, sentence, re.IGNORECASE):
# 处理 "不\s*得" 的特殊情况,跳过包含 "不得分" 的句子
if re.search(r'\s*得', sentence, re.IGNORECASE):
post_match_text = sentence[re.search(r'\s*得', sentence).end():].strip()
if post_match_text.startswith(""):
i += 1 # 跳过这个句子
continue
follow_up_present = any(
re.search(follow_up, sentence, re.IGNORECASE) for follow_up in follow_up_keywords
)
if follow_up_present:
# 如果存在后续关键词,则从当前位置开始截取
start_index = i
end_index = start_index
found_next_section = False
for j in range(start_index + 1, len(split_sentences)):
if re.match(r'\d+\.\d+(\.\d+)?', split_sentences[j].strip()):
end_index = j
found_next_section = True
break
if found_next_section:
full_text = ' '.join(split_sentences[start_index:end_index]).strip()
else:
full_text = ' '.join(split_sentences[start_index:]).strip()
# pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)'
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
full_text = re.sub(pattern, '', full_text)
sentences2.append(full_text) # 存储有后续关键词的情况
i = end_index if found_next_section else len(split_sentences)
else:
# 没有后续关键词的情况
# pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)'
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
cleaned_sentence = re.sub(pattern, '', sentence).replace('\n', '').strip()
sentences1.append(cleaned_sentence) # 存储没有后续关键词的情况
i += 1
else:
i += 1
return sentences1, sentences2 # 返回两个列表
#处理无效投标
def extract_values_if_contains(data, includes):
"""
递归检查字典中的值是否包含列表 'includes' 中的内容。
如果包含,将这些值添加到一个列表中并返回。
参数:
data (dict): 字典或从 JSON 解析得到的数据。
includes (list): 包含要检查的关键词的列表。
返回:
list: 包含满足条件的值的列表。
"""
included_values = [] # 初始化结果列表
# 定义递归函数来处理嵌套字典
def recursive_search(current_data):
if isinstance(current_data, dict):
for key, value in current_data.items():
if isinstance(value, dict):
# 如果值是字典,递归搜索
recursive_search(value)
elif isinstance(value, str):
# 如果值是字符串,检查是否包含任何 includes 中的关键词
if any(include in value for include in includes):
included_values.append(value)
elif isinstance(current_data, list):
for item in current_data:
# 如果是列表,递归每个元素
recursive_search(item)
# 开始递归搜索
recursive_search(data)
return included_values
#你是一个文本助手,文本内的信息以'...............'分割,你负责准确筛选所需的信息并返回,每块信息要求完整,不遗漏,你不得擅自进行总结或删减。
#以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或使投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号。
#以上是原文内容,文本内的信息以'...............'分割请你根据该信息回答否决投标或拒绝投标或无效投标或使投标失效的情况有哪些文本中可能存在无关的信息请你准确筛选所需的信息并返回。最终结果以json列表格式返回给我键名为'否决和无效投标情形',你的回答完全忠于原文内容,且回答内容与原文内容一致,要求完整与准确,不能擅自总结或者概括。",
def handle_query(file_path, user_query, output_file, result_key, keywords):
excludes = ["说明表", "重新招标", "否决所有", "否决投标的条件", "备注:", "本人保证:","我方"]
follow_up_keywords = [r'\s*形\s*之\s*一', r'\s*况\s*之\s*一', r'\s*列', r'\s*下']
extracted_contents = extract_text_with_keywords(file_path, [keywords], follow_up_keywords) #字典结果
all_texts1, all_texts2 = clean_dict_datas(extracted_contents, keywords, excludes) # 列表
# table_data_list=read_docx_last_column(truncate_file) #从投标人须知前附表中提取信息生成列表data每个元素为'一行信息'
table_data_list=read_tables_from_docx(file_path)
all_tables1, all_tables2 = extract_table_with_keywords(table_data_list, keywords,follow_up_keywords)
qianwen_txt = all_texts1 + all_tables1
# Proceed only if there is content to write
selected_contents = set() # 使用 set 去重
if qianwen_txt:
with open(output_file, 'w', encoding='utf-8') as file:
counter = 1
for content in qianwen_txt:
file.write("..............." + '\n')
file.write(f"{counter}. {content}\n")
counter += 1
file_id = upload_file(output_file)
qianwen_ans = qianwen_long(file_id, user_query)
num_list = process_string_list(qianwen_ans)
print(num_list)
for index in num_list:
if index - 1 < len(qianwen_txt):
content = qianwen_txt[index - 1]
selected_contents.add(content)
# 无论 qianwen_txt 是否为空,都添加 all_texts2 和 all_tables2 的内容
selected_contents.update(all_texts2)
selected_contents.update(all_tables2)
# 如果 selected_contents 不为空,则返回结果,否则返回空字符串
if selected_contents:
res = {result_key: list(selected_contents)}
else:
res = {result_key: ""}
return res
def combine_find_invalid(file_path, output_dir):
queries = [
(r'\s*决|无\s*效\s*投\s*标|无\s*效\s*文\s*件|文\s*件\s*无\s*效|无\s*效\s*响\s*应|无\s*效\s*报\s*价|被\s*拒\s*绝|予\s*以\s*拒\s*绝|投\s*标\s*失\s*效|投\s*标\s*无\s*效',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号若情况不存在返回[]。",
os.path.join(output_dir, "temp1.txt"), "否决和无效投标情形"),
(r'\s*标',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:废标项的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号若情况不存在返回[]。",
os.path.join(output_dir, "temp2.txt"), "废标项"),
(r'\s*得|禁\s*止\s*投\s*标',"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,每条信息规定了各方不得存在的情形,请回答:在这些信息中,主语是投标人或中标人或供应商或联合体投标各方或磋商小组的信息有哪些?不要返回主语是招标人或采购人或评标委员会的信息,请你筛选所需的信息并将它的序号返回。请以[x,x,x]格式返回给我结果,示例返回为[1,4,6],若情况不存在,返回[]。",
os.path.join(output_dir,"temp3.txt"),"不得存在的情形")
]
results = []
# 使用线程池来并行处理查询
with ThreadPoolExecutor() as executor:
futures = []
for keywords, user_query, output_file, result_key in queries:
future = executor.submit(handle_query, file_path, user_query, output_file, result_key, keywords)
futures.append(future)
time.sleep(1) # 暂停1秒后再提交下一个任务
for future in futures:
results.append(future.result())
# #禁止投标
# print("starting不得存在的情形...")
# forbidden_res = find_forbidden(truncate_json_path, clause_path)
# results.append(forbidden_res)
combined_dict = {}
for d in results:
combined_dict.update(d)
print("无效标与废标done...")
# return nest_json_under_key(combined_dict, "无效标与废标项")
return {"无效标与废标项":combined_dict}
if __name__ == '__main__':
start_time = time.time()
# truncate_json_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp2\\竞争性谈判文件(3)_tobidders_notice_part1\\truncate_output.json"
# truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output4\\招标文件实高电子显示屏_tobidders_notice_part1.docx"
clause_path="C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp1\\clause招标文件广水市教育局封闭管理项目二次_tobidders_notice_part2.json"
output_dir = "C:\\Users\\Administrator\\Desktop\\货物标\\output4\\invalid"
# doc_path = 'C:\\Users\\Administrator\\Desktop\\fsdownload\\temp7\\3abb6e16-19db-42ad-9504-53bf1072dfe7\\ztbfile_invalid.docx'
doc_path = 'C:\\Users\\Administrator\\Desktop\\货物标\\zbfilesdocx\\招标文件(广水市教育局封闭管理项目二次).docx'
results = combine_find_invalid(doc_path, output_dir)
end_time = time.time()
print("Elapsed time:", str(end_time - start_time))
print("Results:", json.dumps(results,ensure_ascii=False,indent=4))