zbparse/flask_app/货物标/无效标和废标和禁止投标整合main.py
2024-11-04 10:52:23 +08:00

537 lines
28 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import os.path
import time
import re
from flask_app.general.通义千问long import upload_file, qianwen_long,qianwen_long_text
from flask_app.general.通用功能函数 import process_string_list
from docx import Document
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import OrderedDict
#处理跨页的段落
def preprocess_paragraphs(paragraphs):
processed = [] # 初始化处理后的段落列表
index = 0
#排除遇到表格、填空的情况
def has_long_spaces(text, max_space_count=5):
return any(len(space) > max_space_count for space in re.findall(r'\s+', text))
while index < len(paragraphs):
current_text = paragraphs[index].text.strip() # 去除当前段落的前后空白
# 检查当前段落是否为空
if current_text == '':
# 确保有前一个和后一个段落
if 0 < index < len(paragraphs) - 1:
prev_text = paragraphs[index - 1].text.strip() # 获取前一个段落的文本
next_text = paragraphs[index + 1].text.strip() # 获取后一个段落的文本
# **新增部分:检查前一个段落和后一个段落都非空** 内部没有超过5个连续空格
if prev_text and next_text and not has_long_spaces(prev_text) and not has_long_spaces(next_text):
# 检查前一个段落的文本是否不以标点符号结尾
if not prev_text.endswith(('', ',', '', '!', '?')):
# 定义列表项的模式
list_item_pattern = r'^\s*([\(]\d+[\)]|[A-Za-z]\.\s*|[一二三四五六七八九十]+、)'
# 检查后一个段落是否以列表模式开头
is_next_list = re.match(list_item_pattern, next_text)
# 如果后一个段落不是列表项且前一个段落长度大于30
if not is_next_list and len(prev_text) > 30:
# 合并前一个和后一个段落的文本
merged_text = prev_text + ' ' + next_text # 为了可读性添加空格
if processed:
# 更新处理后的最后一个段落
processed[-1] = merged_text
else:
# 如果列表为空,直接添加合并后的文本
processed.append(merged_text)
# 跳过下一个段落,因为它已经被合并
index += 2
continue
# 如果没有满足条件,不进行合并,跳过当前空段落
else:
# 非空段落,添加到处理后的列表中
processed.append(current_text)
index += 1
return processed
def extract_text_with_keywords(doc_path, keywords, follow_up_keywords):
from collections import OrderedDict
from docx import Document
import re
if isinstance(keywords, str):
keywords = [keywords]
doc = Document(doc_path)
extracted_paragraphs = OrderedDict()
continue_collecting = False
current_section_pattern = None
active_key = None
def match_keywords(text, patterns):
# 首先检查关键词是否匹配
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
# 如果当前的模式是 '不\s*得',则额外检查是否匹配 '不得分'
if '\s*得' in pattern: # 使用字符串匹配检查模式
post_match_text = text[match.end():].strip()
if post_match_text.startswith(""):
continue # 如果是"不得分",跳过这个匹配
return True
return False
def extract_from_text(text, current_index):
nonlocal continue_collecting, current_section_pattern, active_key
if text == "":
return current_index
if continue_collecting:
if current_section_pattern and re.match(current_section_pattern, text):
continue_collecting = False
active_key = None
else:
if active_key is not None:
extracted_paragraphs[active_key].append(text)
return current_index
if match_keywords(text, keywords):
active_key = text
extracted_paragraphs[active_key] = [text]
if match_keywords(text, follow_up_keywords):
continue_collecting = True
section_number = re.match(r'^(\d+([.]\d+)*)\s*[.]?', text) # 修改后的正则,支持 '数字 、' 和 '数字.'
if section_number:
current_section_number = section_number.group(1)
level_count = current_section_number.count('.')
# Pattern to match current level, e.g., 3.4.5 或者 3
pattern = r'^' + (r'\d+\s*[.]\s*') * level_count + r'\d+'
# Generate patterns for next section at same level and parent level
parts = current_section_number.split('.')
matched_patterns = [pattern] # start with the full pattern
# Next section at same level
parts[-1] = str(int(parts[-1]) + 1)
next_pattern = r'^' + r'\.\s*'.join(parts)
matched_patterns.append(next_pattern)
# Parent section (if applicable)
if len(parts) > 1:
parent_section_parts = parts[:-1]
parent_section_parts[-1] = str(int(parent_section_parts[-1]) + 1)
parent_pattern = r'^' + r'\.\s*'.join(parent_section_parts)
matched_patterns.append(parent_pattern)
# 添加对 '数字 、' 格式的支持
digit_comma_pattern = r'^\d+\s*、'
matched_patterns.append(digit_comma_pattern)
# Combine the patterns
combined_pattern = r'(' + r')|('.join(matched_patterns) + r')'
current_section_pattern = re.compile(combined_pattern)
else:
found_next_number = False
current_section_pattern = None
while current_index < len(doc.paragraphs) - 1:
current_index += 1
next_text = doc.paragraphs[current_index].text.strip()
if not found_next_number:
# 修改后的正则,支持 '数字 、' 格式
next_section_number = re.match(r'^([A-Za-z0-9]+(?:\.[A-Za-z0-9]+)*)|(\(\d+\))|(\d+\s*、)',
next_text)
if next_section_number:
found_next_number = True
if next_section_number.group(1):
section_parts = next_section_number.group(1).split('.')
dynamic_pattern = r'^' + r'\.'.join(
[r'[A-Za-z0-9]+' for _ in section_parts]) + r'\b'
elif next_section_number.group(2):
dynamic_pattern = r'^[\(\]\d+[\)\]'
elif next_section_number.group(3):
dynamic_pattern = r'^\d+\s*、'
current_section_pattern = re.compile(dynamic_pattern)
if current_section_pattern and re.match(current_section_pattern, next_text):
extracted_paragraphs[active_key].append(next_text)
else:
continue_collecting = False
active_key = None
break
return current_index
processed_paragraphs = preprocess_paragraphs(doc.paragraphs)
index = 0
while index < len(processed_paragraphs):
index = extract_from_text(processed_paragraphs[index].strip(), index)
index += 1
return extracted_paragraphs
"""
eg:
text_list = ["这是第一句。 1. 接下来是第二句! (3) 最后一句。"]
new_text_list = ["这是第一句。", "1. 接下来是第二句!", "(3) 最后一句。"]
"""
def preprocess_text_list(text_list):
new_text_list = []
# 正则表达式匹配中文字符或标点后的空格,该空格后紧跟字母、数字或带括号的数字
split_pattern = re.compile(r'(?<=[\u4e00-\u9fff。?!;])(?=\s+[a-zA-Z\d]|\s+\([1-9]\d*\)|\s+\[1-9]\d*\)')
for text in text_list:
# 使用正则表达式检查并拆分元素
parts = split_pattern.split(text)
new_text_list.extend(part.strip() for part in parts if part.strip()) # 添加非空字符串检查
return new_text_list
def clean_dict_datas(extracted_contents, keywords, excludes): # 让正则表达式提取到的东西格式化
all_texts1 = []
all_texts2 = []
# 定义用于分割句子的正则表达式,包括中文和西文的结束标点
split_pattern = r'(?<=[。!?\!\?])'
for key, text_list in extracted_contents.items():
if len(text_list) == 1:
# print(text_list)
# print("------------------")
for data in text_list:
# print(data)
# 检查是否包含任何需要排除的字符串
if any(exclude in data for exclude in excludes):
continue # 如果包含任何排除字符串,跳过这个数据
# 去掉开头的序号eg:1 | (1) |2 | 1. | 2全角点| 3、 | 1.1 | 2.3.4 | A1 | C1.1 | 一、
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
data = re.sub(pattern, '', data).strip()
keyword_match = re.search(keywords, data)
if keyword_match:
# 从关键词位置开始查找结束标点符号
start_pos = keyword_match.start()
# 截取从关键词开始到后面的内容
substring = data[start_pos:]
# 按定义的结束标点分割
sentences = re.split(split_pattern, substring, 1)
if len(sentences) > 0 and sentences[0]:
# 只取第一句,保留标点
cleaned_text = data[:start_pos] + sentences[
0] # eg:经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。潜在投标人应自行承担现场考察的全部费用、责任和风险。
# 经采购人允许,潜在投标人可进入项目现场进行考察,但潜在投标人不得因此使采购人承担有关责任和蒙受损失。
else:
cleaned_text = data # 如果没有标点,使用整个字符串
else:
# 如果没有找到关键词,保留原文本
cleaned_text = data
# 删除空格
cleaned_text_no_spaces = cleaned_text.replace(' ', '').replace(' ', '')
# 如果长度大于8则添加到结果列表
if len(cleaned_text_no_spaces) > 8:
all_texts1.append(cleaned_text_no_spaces)
else:
# print(text_list)
# print("*********")
new_text_list = preprocess_text_list(text_list)
# 用于处理结构化文本,清理掉不必要的序号,并将分割后的段落合并,最终形成更简洁和格式化的输出。
pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
data = re.sub(pattern, '', new_text_list[0]).strip() # 去除序号
# 将修改后的第一个元素和剩余的元素连接起来
new_text_list[0] = data # 更新列表中的第一个元素
joined_text = "\n".join(new_text_list) # 如果列表中有多个元素,则连接它们
# 删除空格
joined_text_no_spaces = joined_text.replace(' ', '').replace(' ', '')
all_texts2.append(joined_text_no_spaces) # 将每个列表的内容添加到 all_texts 中
return all_texts1, all_texts2 # all_texts1要额外用gpt all_text2直接返回结果
# 只读取前附表中的最后一列(省钱,但容易漏内容)
def read_docx_last_column(truncate_file):
# 尝试打开文档
try:
doc = Document(truncate_file)
except Exception as e:
print(f"Error opening file: {e}")
return []
last_column_values = []
# 读取文档中的所有表格
if not doc.tables:
print("No tables found in the document.")
return last_column_values
# 遍历文档中的每个表格
for table in doc.tables:
# 获取表格的最后一列
for row in table.rows:
last_cell = row.cells[-1] # 获取最后一个单元格
# 去除内容前后空白并删除文本中的所有空格
cleaned_text = last_cell.text.strip().replace(' ', '')
last_column_values.append(cleaned_text)
return last_column_values
# 完整读取文件中所有表格适合pdf转docx价格便宜的情况优先推荐内容完整
def read_tables_from_docx(file_path):
# 尝试打开文档
try:
doc = Document(file_path)
except Exception as e:
print(f"Error opening file: {e}")
return []
# 初始化列表来保存符合条件的单元格内容
cell_contents = []
# 读取文档中的所有表格
if not doc.tables:
print("No tables found in the document.")
return []
# 遍历文档中的每个表格
for table_idx, table in enumerate(doc.tables):
# 遍历表格中的每一行
for row_idx, row in enumerate(table.rows):
# 遍历每一行中的单元格
for cell in row.cells:
cell_text = cell.text.strip() # 去除单元格内容前后空白
if len(cell_text) > 8: # 检查文字数量是否大于5
cell_contents.append(cell_text)
# 返回符合条件的单元格内容
return cell_contents
def extract_table_with_keywords(data, keywords, follow_up_keywords):
"""遍历列表中的每个元素,查找并返回包含关键词的句子列表,并根据是否存在后续关键词分别存储到两个列表中。"""
sentences1 = [] # 保存没有后续关键词的情况
sentences2 = [] # 保存有后续关键词的情况
# 检查是否包含 '无效报价' 的关键词
check_invalid_bidding = '\s*效\s*报\s*价' in keywords
# 遍历列表中的每个字符串元素
for item in data:
# 只有在包含 '无效投标' 关键词时,才检查 "无效报价"
if check_invalid_bidding and re.search(r'\s*效\s*报\s*价', item, re.IGNORECASE):
sentences1.append(item.strip())
continue
# 分割句子保证句子完整性按标点符号和序号分割eg:(?=\d+\.\d+):匹配诸如 1.1、2.2 之类的序号,并在序号前进行分割。
split_sentences = re.split(
r'(?<=[。!?!?\?])|' # 在中文句号、感叹号或问号后分割
r'(?=\d+\.\d+)|' # 在类似1.1的数字序号前分割
r'(?=\d+[\s、\.])|' # 在数字后跟空格、顿号或点号前分割
r'(?=[(]\d+[)])|' # 在括号包围的数字前分割
r'(?=[A-Za-z]\.\s*)|' # 在字母加点如A.、a.)前分割
r'(?=[A-Za-z]?\d+\s*(?:\.\s*\d+)*)|' # 在可选字母加数字或多级编号前分割
r'(?=[一二三四五六七八九十]+、)', # 在中文数字加顿号(如一、、二、)前分割
item
)
i = 0
while i < len(split_sentences):
sentence = split_sentences[i].strip()
if re.search(keywords, sentence, re.IGNORECASE):
# 处理 "不\s*得" 的特殊情况,跳过包含 "不得分" 的句子
if re.search(r'\s*得', sentence, re.IGNORECASE):
post_match_text = sentence[re.search(r'\s*得', sentence).end():].strip()
if post_match_text.startswith(""):
i += 1 # 跳过这个句子
continue
follow_up_present = any(
re.search(follow_up, sentence, re.IGNORECASE) for follow_up in follow_up_keywords
)
if follow_up_present:
# 如果存在后续关键词,则从当前位置开始截取
start_index = i
end_index = start_index
found_next_section = False
for j in range(start_index + 1, len(split_sentences)):
if re.match(r'\d+\.\d+(\.\d+)?', split_sentences[j].strip()):
end_index = j
found_next_section = True
break
if found_next_section:
full_text = ' '.join(split_sentences[start_index:end_index]).strip()
else:
full_text = ' '.join(split_sentences[start_index:]).strip()
# pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)'
pattern = r'^\s*([(]\d+[)]|[A-Za-z]\.\s*|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
full_text = re.sub(pattern, '', full_text).replace(' ', '').strip() # 删去了空格
sentences2.append(full_text) # 存储有后续关键词的情况
i = end_index if found_next_section else len(split_sentences)
else:
# 没有后续关键词的情况
# pattern = r'^\s*([(]\d+[)]|[A-Za-z]?\d+(\.\d+)*(\s|\.|、)?)'
pattern = r'^\s*([(]\d+[)]|[A-Za-z]\.\s*|[A-Za-z]?\d+\s*(\.\s*\d+)*(\s|\.|、|)?|[一二三四五六七八九十]+、)'
cleaned_sentence = re.sub(pattern, '', sentence).replace('\n', '').replace(' ', '').strip()
if len(cleaned_sentence)>8:
sentences1.append(cleaned_sentence) # 存储没有后续关键词的情况
i += 1
else:
i += 1
return sentences1, sentences2 # 返回两个列表
# 处理无效投标
def extract_values_if_contains(data, includes):
"""
递归检查字典中的值是否包含列表 'includes' 中的内容。
如果包含,将这些值添加到一个列表中并返回。
参数:
data (dict): 字典或从 JSON 解析得到的数据。
includes (list): 包含要检查的关键词的列表。
返回:
list: 包含满足条件的值的列表。
"""
included_values = [] # 初始化结果列表
# 定义递归函数来处理嵌套字典
def recursive_search(current_data):
if isinstance(current_data, dict):
for key, value in current_data.items():
if isinstance(value, dict):
# 如果值是字典,递归搜索
recursive_search(value)
elif isinstance(value, str):
# 如果值是字符串,检查是否包含任何 includes 中的关键词
if any(include in value for include in includes):
included_values.append(value)
elif isinstance(current_data, list):
for item in current_data:
# 如果是列表,递归每个元素
recursive_search(item)
# 开始递归搜索
recursive_search(data)
return included_values
# 你是一个文本助手,文本内的信息以'...............'分割,你负责准确筛选所需的信息并返回,每块信息要求完整,不遗漏,你不得擅自进行总结或删减。
# 以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或使投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号。
# 以上是原文内容,文本内的信息以'...............'分割请你根据该信息回答否决投标或拒绝投标或无效投标或使投标失效的情况有哪些文本中可能存在无关的信息请你准确筛选所需的信息并返回。最终结果以json列表格式返回给我键名为'否决和无效投标情形',你的回答完全忠于原文内容,且回答内容与原文内容一致,要求完整与准确,不能擅自总结或者概括。",
def handle_query(file_path, user_query, output_file, result_key, keywords):
try:
excludes = ["说明表", "重新招标", "否决所有", "否决投标的条件", "备注:", "本人保证:", "我方"]
follow_up_keywords = [r'\s*形\s*之\s*一', r'\s*况\s*之\s*一', r'\s*列', r'\s*下']
extracted_contents = extract_text_with_keywords(file_path, [keywords], follow_up_keywords) # 字典结果
all_texts1, all_texts2 = clean_dict_datas(extracted_contents, keywords, excludes) # 列表
# table_data_list=read_docx_last_column(truncate_file) #从投标人须知前附表中提取信息生成列表data每个元素为'一行信息'
table_data_list = read_tables_from_docx(file_path)
all_tables1, all_tables2 = extract_table_with_keywords(table_data_list, keywords, follow_up_keywords)
qianwen_txt = all_texts1 + all_tables1
# Proceed only if there is content to write
selected_contents = set() # 使用 set 去重
if qianwen_txt:
with open(output_file, 'w', encoding='utf-8') as file:
counter = 1
for content in qianwen_txt:
file.write(f"{counter}. {content}\n")
file.write("..............." + '\n')
counter += 1
file_id = upload_file(output_file)
# qianwen_ans = qianwen_long(file_id, user_query)
qianwen_ans = qianwen_long_text(file_id, user_query)
num_list = process_string_list(qianwen_ans)
print(result_key + "选中的序号:" + str(num_list))
for index in num_list:
if index - 1 < len(qianwen_txt):
content = qianwen_txt[index - 1]
selected_contents.add(content)
# 无论 qianwen_txt 是否为空,都添加 all_texts2 和 all_tables2 的内容
selected_contents.update(all_texts2)
selected_contents.update(all_tables2)
# 如果 selected_contents 不为空,则返回结果,否则返回空字符串
if selected_contents:
res = {result_key: list(selected_contents)}
else:
res = {result_key: ""}
return res
except Exception as e:
print(f"handle_query 在处理 {result_key} 时发生异常: {e}")
return {result_key: ""}
def combine_find_invalid(file_path, output_dir):
queries = [
(
r'\s*决|无\s*效\s*投\s*标|无\s*效\s*文\s*件|文\s*件\s*无\s*效|无\s*效\s*响\s*应|无\s*效\s*报\s*价|无\s*效\s*标|视\s*为\s*无\s*效|被\s*拒\s*绝|予\s*以\s*拒\s*绝|投\s*标\s*失\s*效|投\s*标\s*无\s*效',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:否决投标或拒绝投标或无效投标或投标失效的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号若情况不存在返回[]。",
os.path.join(output_dir, "temp1.txt"),
"否决和无效投标情形"
),
(
r'\s*标',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,请你根据该内容回答:废标项的情况有哪些?文本中可能存在无关的信息,请你准确筛选符合的信息并将它的序号返回。请以[x,x,x]格式返回给我结果x为符合的信息的序号若情况不存在返回[]。",
os.path.join(output_dir, "temp2.txt"),
"废标项"
),
(
r'\s*得|禁\s*止\s*投\s*标',
"以上是从招标文件中摘取的内容,文本内之间的信息以'...............'分割,每条信息规定了各方不得存在的情形,请回答:在这些信息中,主语是投标人或中标人或供应商或联合体投标各方或磋商小组的信息有哪些?不要返回主语是招标人或采购人或评标委员会的信息,请你筛选所需的信息并将它的序号返回。请以[x,x,x]格式返回给我结果,示例返回为[1,4,6],若情况不存在,返回[]。",
os.path.join(output_dir, "temp3.txt"),
"不得存在的情形"
)
]
results = []
# 使用线程池来并行处理查询
with ThreadPoolExecutor() as executor:
futures = []
for keywords, user_query, output_file, result_key in queries:
future = executor.submit(handle_query, file_path, user_query, output_file, result_key, keywords)
futures.append((future, result_key)) # 保持顺序
time.sleep(0.5) # 暂停0.5秒后再提交下一个任务
for future, result_key in futures:
try:
result = future.result()
except Exception as e:
print(f"线程处理 {result_key} 时出错: {e}")
result = {result_key: ""}
results.append(result)
combined_dict = {}
for d in results:
combined_dict.update(d)
print("无效标与废标done...")
return {"无效标与废标项": combined_dict}
# TODO:无效标目前以整个docx文档作为输入可能导致后面两章不必要的信息也导入。 无效投标至少>8个字
if __name__ == '__main__':
start_time = time.time()
# truncate_json_path = "C:\\Users\\Administrator\\Desktop\\货物标\\output4\\tmp2\\竞争性谈判文件(3)_tobidders_notice_part1\\truncate_output.json"
# truncate_file="C:\\Users\\Administrator\\Desktop\\货物标\\output4\\招标文件实高电子显示屏_tobidders_notice_part1.docx"
clause_path = "D:\\flask_project\\flask_app\\static\\output\\output1\\77a48c63-f39f-419b-af2a-7b3dbf41b70b\\clause1.json"
# doc_path="C:\\Users\\Administrator\\Desktop\\货物标\\zbfilesdocx\\磋商文件(1).docx"
doc_path = 'D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.docx'
output_dir = "D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\tmp"
results = combine_find_invalid(doc_path, output_dir)
end_time = time.time()
print("Elapsed time:", str(end_time - start_time))
print("Results:", json.dumps(results, ensure_ascii=False, indent=4))