11.26 优化日志管理

This commit is contained in:
zy123 2024-11-26 15:06:57 +08:00
parent 45df7ea714
commit 3f33820797
6 changed files with 324 additions and 210 deletions

View File

@ -2,37 +2,6 @@
import json
import re
def extract_content_from_json(string):
"""
输入字符串提取 { } 之间的内容并将其解析为字典
如果使用 insert_missing_commas 修复后仍然失败则尝试返回原始的解析结果如果可能
"""
if not string.strip():
return {}
# 提取第一个匹配的 JSON 对象
match = re.search(r'\{[\s\S]*\}', string)
if match:
json_data = match.group(0)
try:
# 尝试直接解析原始 JSON 数据
return json.loads(json_data)
except json.JSONDecodeError as original_error:
print(f"原始 JSON 解析失败: {original_error}")
print(string)
try:
# 尝试修复缺失的逗号
fixed_json = insert_missing_commas(json_data)
return json.loads(fixed_json)
except json.JSONDecodeError as fixed_error:
print(f"修复后的 JSON 解析失败: {fixed_error}")
print(string)
# 可选:返回空字典或其他默认值
return {}
else:
print("json_utils: extract_content_from_json: 未找到有效的 JSON 内容。")
return {}
def insert_missing_commas(json_str):
"""
使用正则表达式在缺失逗号的位置插入逗号
@ -48,6 +17,148 @@ def insert_missing_commas(json_str):
json_str = re.sub(pattern, replacement, json_str)
return json_str
def fix_json_escape_sequences(json_str):
"""
修复 JSON 字符串中的非法转义序列
将所有不符合 JSON 规范的反斜杠进行转义
"""
# JSON 中合法的转义字符
valid_escapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
# 使用正则表达式找到所有反斜杠
# 如果反斜杠后面不是合法的转义字符,则进行转义
pattern = re.compile(r'\\(?!["\\/bfnrtu])')
fixed_str = pattern.sub(r'\\\\', json_str)
return fixed_str
def replace_latex_expressions(json_str):
"""
替换 JSON 字符串中的 LaTeX 风格表达式
例如 $ \geq 2 m$ 替换为 2
"""
# 定义 LaTeX 符号到 Unicode 字符的映射
latex_mapping = {
r'\geq': '',
r'\leq': '',
r'\times': '×',
r'\frac': '/', # 简单处理分数
r'\neq': '',
r'\approx': '',
r'\pm': '±',
r'\alpha': 'α',
r'\beta': 'β',
r'\gamma': 'γ',
r'\delta': 'δ',
r'\pi': 'π',
r'\sqrt': '',
r'\infty': '',
r'\cup': '',
r'\cap': '',
r'\subseteq': '',
r'\supseteq': '',
r'\forall': '',
r'\exists': '',
r'\rightarrow': '',
r'\leftarrow': '',
# 添加更多需要的映射
}
# 处理每一个 LaTeX 表达式
def replace_match(match):
expr = match.group(1)
for latex, char in latex_mapping.items():
expr = expr.replace(latex, char)
# 替换单位符号,例如 ' m' 到 '米', ' s' 到 '秒', 等等
expr = re.sub(r'(\d+)\s*m', r'\1米', expr)
expr = re.sub(r'(\d+)\s*s', r'\1秒', expr)
expr = re.sub(r'(\d+)\s*kg', r'\1公斤', expr)
expr = re.sub(r'(\d+)\s*A', r'\1安', expr) # 例如电流单位安培
# 继续添加更多单位
return expr
# 替换所有 $...$ 包围的内容
fixed_str = re.sub(r'\$(.*?)\$', replace_match, json_str)
return fixed_str
def extract_content_from_json(string):
"""
输入字符串尝试解析 JSON 数据
1. 尝试直接解析原始 JSON
2. 如果直接解析失败按顺序使用不同修复方法尝试解析
"""
if not string.strip():
return {}
# 提取第一个匹配的 JSON 对象
match = re.search(r'\{[\s\S]*\}', string)
if not match:
print("未找到有效的 JSON 内容。")
return {}
original_json = match.group(0)
# 尝试直接解析原始 JSON 数据
try:
parsed = json.loads(original_json)
print("直接解析原始 JSON 成功。")
return parsed
except json.JSONDecodeError as original_error:
print(f"直接解析原始 JSON 失败: {original_error}")
# 方法1逗号修复
try:
fixed_json1 = insert_missing_commas(original_json)
parsed = json.loads(fixed_json1)
print("使用方法1逗号修复成功。")
return parsed
except json.JSONDecodeError as error1:
print(f"方法1逗号修复解析失败: {error1}")
# 方法2LaTeX 表达式替换
try:
fixed_json2 = replace_latex_expressions(original_json)
parsed = json.loads(fixed_json2)
print("使用方法2LaTeX 表达式替换成功。")
return parsed
except json.JSONDecodeError as error2:
print(f"方法2LaTeX 替换)解析失败: {error2}")
# 方法3非法转义序列修复
try:
fixed_json3 = fix_json_escape_sequences(original_json)
parsed = json.loads(fixed_json3)
print("使用方法3非法转义序列修复成功。")
return parsed
except json.JSONDecodeError as error3:
print(f"方法3非法转义修复解析失败: {error3}")
# 如果所有方法都失败,返回空字典
print("所有修复方法均失败,返回空字典。")
return {}
def replace_latex_expressions_in_dict(obj):
"""
递归遍历字典或列表替换其中的 LaTeX 表达式
"""
if isinstance(obj, dict):
return {k: replace_latex_expressions_in_dict(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_latex_expressions_in_dict(item) for item in obj]
elif isinstance(obj, str):
# 仅处理被 $...$ 包围的内容
def replace_match(match):
expr = match.group(1)
return replace_latex_expressions(expr)
# 替换所有 $...$ 包围的内容
return re.sub(r'\$(.*?)\$', replace_match, obj)
else:
return obj
def clean_json_string(json_string):
# print(json_string)
"""清理JSON字符串移除多余的反引号并解析为字典"""

View File

@ -11,6 +11,8 @@ from datetime import datetime
import requests
from dashscope import Assistants, Messages, Runs, Threads
from llama_index.indices.managed.dashscope import DashScopeCloudRetriever
from flask_app.general.doubao import doubao_model
from flask_app.general.通义千问long import qianwen_long, upload_file
prompt = """
# 角色
@ -250,6 +252,11 @@ def llm_call(question, knowledge_name,file_id, result_queue, ans_index, llm_type
qianwen_res = qianwen_long(file_id, question)
result_queue.put((ans_index,(question,qianwen_res)))
return
elif llm_type==3:
# print(f"doubao! question:{question}")
doubao_res=doubao_model(question)
result_queue.put((ans_index, (question, doubao_res)))
return
else :
assistant = pure_assistant()
ans = send_message(assistant, message=question)

View File

@ -395,8 +395,13 @@ if __name__ == "__main__":
file_type=2
output_folder = r"C:\Users\Administrator\Desktop\fsdownload\5950ad84-30c8-4643-b6de-b13ef5be7a5c\tmp"
tech_deviation,tech_star_deviation,business_deviation,business_star_deviation,zigefuhe_deviation=get_tech_and_business_deviation(file_path,file_type,"123",output_folder)
print("技术偏离表")
print(json.dumps(tech_deviation,ensure_ascii=False,indent=4))
print("技术带星")
print(json.dumps(tech_star_deviation,ensure_ascii=False,indent=4))
print("商务偏离表")
print(json.dumps(business_deviation, ensure_ascii=False, indent=4))
print("商务带星")
print(json.dumps(business_star_deviation, ensure_ascii=False, indent=4))
print("资格审查")
print(json.dumps(zigefuhe_deviation, ensure_ascii=False, indent=4))

View File

@ -1,185 +1,168 @@
import concurrent.futures
import json
from flask_app.general.doubao import doubao_model
from flask_app.general.json_utils import clean_json_string
import re
def extract_zige_deviation_table(zige_info, fuhe_info, zigefuhe_info):
prompt_template1 = """
任务给出一份文本根据文本提取资格性检查的具体评审标准
输出要求
1.以json格式返回结果不要输出其他内容
2.键名为"资格性检查"键值为字符串列表每个字符串为一条评审标准评审标准不分先后不要有序号标注
要求与指南
1. 评审标准是具体的内容不要返回诸如'本项目的特定资格要求:'这种标题性质且不能体现具体评审标准的内容
2. 若文本中存在相同或相似的表述仅需取其中一个作为键值中的一条即可
文本内容{full_text}
def insert_missing_commas(json_str):
"""
prompt_template2 = """
任务给出一份文本根据文本提取符合性检查的具体评审标准
输出要求
1.以json格式返回结果不要输出其他内容
2.键名为"符合性检查"键值为字符串列表每个字符串为一条评审标准评审标准不分先后不要有序号标注
3.仔细检查你所选取的标准若发现这些标准实际上是在描述不允许出现的符合性审查情况则将外键替换为'符合性检查(以下情况不得出现)'并将这些标准写入其中
要求与指南
1. 评审标准应该是具体的内容不要返回诸如'本项目的特定符合性要求:'这种标题性质且不能体现具体评审标准的内容
2. 若文本中存在相同或相似的表述仅需取其中一个作为键值中的一条即可
输出示例1
{{
"符合性检查": [
"因素1",
"因素2",
...
]
}}
输出示例2
{{
"符合性检查(以下情况不得出现)": [
"因素1",
"因素2",
...
]
}}
文本内容{full_text}
使用正则表达式在缺失逗号的位置插入逗号
具体来说寻找一个值的结束引号后紧跟着下一个键的开始引号并在中间插入逗号
"""
prompt_template3 = """
任务给出一份文本根据文本提取资格性检查和符合性检查的具体评审标准
输出要求
1.以json格式返回结果不要输出其他内容
2.键名为"资格性和符合性检查"键值为字符串列表每个字符串为一条评审标准评审标准不分先后不要有序号标注
要求与指南
1. 评审标准应该是具体的内容不要返回诸如'本项目的特定符合性要求:'这种标题性质且不能体现具体评审标准的内容
2. 若文本中存在相同或相似的表述仅需取其中一个作为键值中的一条即可
# 这个正则匹配一个字符串结尾的引号,可能有空白字符,然后是另一个键的引号
pattern = r'(":\s*"[^"]*)"\s*(")'
replacement = r'\1", \2'
文本内容{full_text}
"""
previous_str = None
while previous_str != json_str:
previous_str = json_str
json_str = re.sub(pattern, replacement, json_str)
def get_model_response(query):
return doubao_model(query)
return json_str
result = {"资格审查": {}}
if zigefuhe_info:
# 如果zigefuhe_info非空使用prompt_template3
user_query3 = prompt_template3.format(full_text=zigefuhe_info)
model_res3 = get_model_response(user_query3)
zigefuhe_deviation = clean_json_string(model_res3)
result["资格审查"] = zigefuhe_deviation
else:
zige_deviation = {}
fuhe_deviation = {}
# 提交 zige_info 和 fuhe_info 的模型调用
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {}
if zige_info != "":
user_query1 = prompt_template1.format(full_text=zige_info)
futures["zige"] = executor.submit(get_model_response, user_query1)
if fuhe_info != "":
user_query2 = prompt_template2.format(full_text=fuhe_info)
futures["fuhe"] = executor.submit(get_model_response, user_query2)
# 获取结果
for key, future in futures.items():
try:
model_res = future.result()
if key == "zige":
zige_deviation = clean_json_string(model_res)
elif key == "fuhe":
fuhe_deviation = clean_json_string(model_res)
except Exception as e:
print(f"Error processing {key}: {e}")
# 合并结果
result["资格审查"] = {
"资格性检查": zige_deviation.get("资格性检查", zige_deviation),
"符合性检查": fuhe_deviation.get("符合性检查", fuhe_deviation),
}
return result
a="""
"申请人资格要求": [
"1.满足《中华人民共和国政府采购法》第二十二条规定1具有独立承担民事责任的能力 2具有良好的商业信誉和健全的财务会计制度 3具有履行合同所必须的设备和专业技术能力 4有依法交纳税收和社会保障资金的良好记录 5参加政府采购活动前三年内在经营活动中没有重大违法记录 6法律、行政法规规定的其他条件。",
"2.单位负责人为同一人或者存在直接控股、管理关系的不同投标人,不得参加本项目同一合同 项下的政府采购活动。",
"3.未被列入“信用中国”网站(www.creditchina.gov.cn)失信被执行人、重大税收违法案件当事人名 单、政府采购严重违法失信名单和“中国政府采购”网站www.ccgp.gov.cn政府采购严重违法失信 行为记录名单(以磋商公告发布之日起查询结果为准)。",
"4.落实政府采购政策需满足的资格要求: 本项目(是/否)专门面向中小微型企业:否 。需落实的节能环保、中小微型企业扶持(含支 持监狱企业发展、促进残疾人就业)等相关政府采购政策详见磋商文件。"
],
"资格性审查": {
"资格要求": "符合本采购文件第一章第二款要求,并提供合格有效的证明材料。",
"其他要求": [
"符合竞争性磋商文件规定的其他要求",
"是否符合竞争性磋商采购文件中的其他规定"
],
"磋商报价": [
"每一种采购内容是否只有一个报价",
"报价金额是否超过采购预算"
],
"磋商书签字盖章": "是否有法定代表人或其委托代理人签字并加盖单位公章",
"磋商结果有效期": "投标有效期满足招标文件要求。",
"信誉情况": "无不良经济纠纷记录和违法行为的;",
"采购需求响应": [
"是否实质性响应采购文件采购技术要求的",
"是否有提出采购人不能接受的合同条件的"
]
}
"""
b="""
"符合性审查": {
"磋商报价": [
"每一种采购内容是否只有一个报价",
"报价金额是否超过采购预算"
],
"磋商书签字盖章": "是否有法定代表人或其委托代理人签字并加盖单位公章",
"磋商结果有效期": "投标有效期满足招标文件要求。",
"信誉情况": "无不良经济纠纷记录和违法行为的;",
"采购需求响应": [
"是否实质性响应采购文件采购技术要求的",
"是否有提出采购人不能接受的合同条件的"
],
"其他要求": "是否符合竞争性磋商采购文件中的其他规定"
}
"""
c=""
res=extract_zige_deviation_table(a,b,c)
print(json.dumps(res,ensure_ascii=False,indent=4))
data={
"申请人资格要求": [
"1.满足《中华人民共和国政府采购法》第二十二条规定1具有独立承担民事责任的能力 2具有良好的商业信誉和健全的财务会计制度 3具有履行合同所必须的设备和专业技术能力 4有依法交纳税收和社会保障资金的良好记录 5参加政府采购活动前三年内在经营活动中没有重大违法记录 6法律、行政法规规定的其他条件。",
"2.单位负责人为同一人或者存在直接控股、管理关系的不同投标人,不得参加本项目同一合同 项下的政府采购活动。",
"3.未被列入“信用中国”网站(www.creditchina.gov.cn)失信被执行人、重大税收违法案件当事人名 单、政府采购严重违法失信名单和“中国政府采购”网站www.ccgp.gov.cn政府采购严重违法失信 行为记录名单(以磋商公告发布之日起查询结果为准)。",
"4.落实政府采购政策需满足的资格要求: 本项目(是/否)专门面向中小微型企业:否 。需落实的节能环保、中小微型企业扶持(含支 持监狱企业发展、促进残疾人就业)等相关政府采购政策详见磋商文件。"
],
"资格性审查": {
"资格要求": "符合本采购文件第一章第二款要求,并提供合格有效的证明材料。",
"其他要求": [
"符合竞争性磋商文件规定的其他要求",
"是否符合竞争性磋商采购文件中的其他规定"
],
"磋商报价": [
"每一种采购内容是否只有一个报价",
"报价金额是否超过采购预算"
],
"磋商书签字盖章": "是否有法定代表人或其委托代理人签字并加盖单位公章",
"磋商结果有效期": "投标有效期满足招标文件要求。",
"信誉情况": "无不良经济纠纷记录和违法行为的;",
"采购需求响应": [
"是否实质性响应采购文件采购技术要求的",
"是否有提出采购人不能接受的合同条件的"
]
},
"符合性审查": {
"磋商报价": [
"每一种采购内容是否只有一个报价",
"报价金额是否超过采购预算"
],
"磋商书签字盖章": "是否有法定代表人或其委托代理人签字并加盖单位公章",
"磋商结果有效期": "投标有效期满足招标文件要求。",
"信誉情况": "无不良经济纠纷记录和违法行为的;",
"采购需求响应": [
"是否实质性响应采购文件采购技术要求的",
"是否有提出采购人不能接受的合同条件的"
],
"其他要求": "是否符合竞争性磋商采购文件中的其他规定"
}
def fix_json_escape_sequences(json_str):
"""
修复 JSON 字符串中的非法转义序列
将所有不符合 JSON 规范的反斜杠进行转义
"""
# JSON 中合法的转义字符
valid_escapes = ['"', '\\', '/', 'b', 'f', 'n', 'r', 't', 'u']
# 使用正则表达式找到所有反斜杠
# 如果反斜杠后面不是合法的转义字符,则进行转义
pattern = re.compile(r'\\(?!["\\/bfnrtu])')
fixed_str = pattern.sub(r'\\\\', json_str)
return fixed_str
def replace_latex_expressions(json_str):
"""
替换 JSON 字符串中的 LaTeX 风格表达式
例如 $ \geq 2 m$ 替换为 2
"""
# 定义 LaTeX 符号到 Unicode 字符的映射
latex_mapping = {
r'\geq': '',
r'\leq': '',
r'\times': '×',
r'\frac': '/', # 简单处理分数
r'\neq': '',
r'\approx': '',
r'\pm': '±',
r'\alpha': 'α',
r'\beta': 'β',
r'\gamma': 'γ',
r'\delta': 'δ',
r'\pi': 'π',
r'\sqrt': '',
r'\infty': '',
r'\cup': '',
r'\cap': '',
r'\subseteq': '',
r'\supseteq': '',
r'\forall': '',
r'\exists': '',
r'\rightarrow': '',
r'\leftarrow': '',
# 添加更多需要的映射
}
# 处理每一个 LaTeX 表达式
def replace_match(match):
expr = match.group(1)
for latex, char in latex_mapping.items():
expr = expr.replace(latex, char)
# 替换单位符号,例如 ' m' 到 '米', ' s' 到 '秒', 等等
expr = re.sub(r'(\d+)\s*m', r'\1米', expr)
expr = re.sub(r'(\d+)\s*s', r'\1秒', expr)
expr = re.sub(r'(\d+)\s*kg', r'\1公斤', expr)
expr = re.sub(r'(\d+)\s*A', r'\1安', expr) # 例如电流单位安培
# 继续添加更多单位
return expr
# 替换所有 $...$ 包围的内容
fixed_str = re.sub(r'\$(.*?)\$', replace_match, json_str)
return fixed_str
def extract_content_from_json(string):
"""
输入字符串尝试解析 JSON 数据
1. 尝试直接解析原始 JSON
2. 如果直接解析失败按顺序使用不同修复方法尝试解析
"""
if not string.strip():
return {}
# 提取第一个匹配的 JSON 对象
match = re.search(r'\{[\s\S]*\}', string)
if not match:
print("未找到有效的 JSON 内容。")
return {}
original_json = match.group(0)
# 尝试直接解析原始 JSON 数据
try:
parsed = json.loads(original_json)
print("直接解析原始 JSON 成功。")
return parsed
except json.JSONDecodeError as original_error:
print(f"直接解析原始 JSON 失败: {original_error}")
# 方法1逗号修复
try:
fixed_json1 = insert_missing_commas(original_json)
parsed = json.loads(fixed_json1)
print("使用方法1逗号修复成功。")
return parsed
except json.JSONDecodeError as error1:
print(f"方法1逗号修复解析失败: {error1}")
# 方法2LaTeX 表达式替换
try:
fixed_json2 = replace_latex_expressions(original_json)
parsed = json.loads(fixed_json2)
print("使用方法2LaTeX 表达式替换成功。")
return parsed
except json.JSONDecodeError as error2:
print(f"方法2LaTeX 替换)解析失败: {error2}")
# 方法3非法转义序列修复
try:
fixed_json3 = fix_json_escape_sequences(original_json)
parsed = json.loads(fixed_json3)
print("使用方法3非法转义序列修复成功。")
return parsed
except json.JSONDecodeError as error3:
print(f"方法3非法转义修复解析失败: {error3}")
# 如果所有方法都失败,返回空字典
print("所有修复方法均失败,返回空字典。")
return {}
def replace_latex_expressions_in_dict(obj):
"""
递归遍历字典或列表替换其中的 LaTeX 表达式
"""
if isinstance(obj, dict):
return {k: replace_latex_expressions_in_dict(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_latex_expressions_in_dict(item) for item in obj]
elif isinstance(obj, str):
# 仅处理被 $...$ 包围的内容
def replace_match(match):
expr = match.group(1)
return replace_latex_expressions(expr)
# 替换所有 $...$ 包围的内容
return re.sub(r'\$(.*?)\$', replace_match, obj)
else:
return obj
test_json_1 = """
{
"示例": "速度为 $v = 3 \\times 10^2 m/s$,加速度为 $a \\geq 9.8 m/s^2$。"
}
"""
res=extract_content_from_json(test_json_1)
print(res)

View File

@ -12,7 +12,7 @@ from flask_app.general.多线程提问 import multi_threading
from flask_app.general.通义千问long import qianwen_long, upload_file
from flask_app.general.json_utils import clean_json_string, combine_json_results
from flask_app.货物标.截取pdf货物标版 import truncate_pdf_main
from flask_app.general.doubao import doubao_model, generate_full_user_query, pdf2txt
from flask_app.general.doubao import doubao_model, generate_full_user_query, pdf2txt, read_txt_to_string
from flask_app.货物标.技术参数要求提取后处理函数 import postprocess, all_postprocess
def truncate_system_keys(data):
@ -510,6 +510,8 @@ def get_technical_requirements(file_path,invalid_path,processed_filepath):
# 将键中的 '.' 替换为 '下的'
modified_key = key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
# full_text = read_txt_to_string(processed_filepath)
# new_query = user_query_template.format(modified_key, key, modified_key,full_text) #转豆包后取消注释
new_query = user_query_template.format(modified_key, key, modified_key)
queries.append(new_query)
@ -518,9 +520,12 @@ def get_technical_requirements(file_path,invalid_path,processed_filepath):
# 将键中的 '.' 替换为 '下的'
modified_grouped_key = grouped_key.replace('.', '下的')
# 使用修改后的键填充第一个占位符,原始键填充第二个占位符
# full_text = read_txt_to_string(processed_filepath)
# new_query = user_query_template_two.format(modified_grouped_key, grouped_key, modified_grouped_key, full_text)
new_query = user_query_template_two.format(modified_grouped_key, grouped_key, modified_grouped_key)
queries.append(new_query)
results = multi_threading(queries, "", file_id, 2)
results = multi_threading(queries, "", file_id, 2) #通义
# results = multi_threading(queries, "", "", 3) #豆包
technical_requirements = []
if not results:
print("errror!未获得大模型的回答!")
@ -566,14 +571,15 @@ def test_all_files_in_folder(input_folder, output_folder):
if __name__ == "__main__":
start_time=time.time()
# truncate_file="C:\\Users\\Administrator\\Desktop\\fsdownload\\469d2aee-9024-4993-896e-2ac7322d41b7\\ztbfile_procurement.docx"
truncate_file=r"D:\flask_project\flask_app\static\output\output1\a91d59a5-d04a-4588-98e4-ddc6e9caf999\ztbfile_procurement.pdf"
truncate_file=r"C:\Users\Administrator\Desktop\fsdownload\5950ad84-30c8-4643-b6de-b13ef5be7a5c\ztbfile.pdf"
# invalid_path="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile.pdf"
# truncate_file="D:\\flask_project\\flask_app\\static\\output\\output1\\e7dda5cb-10ba-47a8-b989-d2993d34bb89\\ztbfile_procurement.docx"
# output_folder="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\tmp"
# file_id = upload_file(truncate_file)
invalid_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\a110ed59-00e8-47ec-873a-bd4579a6e628\\ztbfile.pdf"
# file_id=upload_file(truncate_file)
processed_filepath = pdf2txt(truncate_file)
# processed_filepath = pdf2txt(truncate_file)
processed_filepath=r"C:\Users\Administrator\Desktop\fsdownload\5950ad84-30c8-4643-b6de-b13ef5be7a5c\tmp\extract1.txt"
res=get_technical_requirements(truncate_file,invalid_path,processed_filepath)
json_string = json.dumps(res, ensure_ascii=False, indent=4)
print(json_string)

View File

@ -3,6 +3,7 @@ import json
import time
from flask_app.general.doubao import pdf2txt
from flask_app.general.file2markdown import convert_pdf_to_markdown
from flask_app.货物标.技术参数要求提取 import get_technical_requirements
from flask_app.general.通义千问long import upload_file
from flask_app.货物标.商务服务其他要求提取 import get_business_requirements
@ -25,6 +26,7 @@ def fetch_procurement_reqs(procurement_path, invalid_path):
return DEFAULT_PROCUREMENT_REQS.copy()
try:
# processed_filepath = convert_pdf_to_markdown(procurement_path) # 转markdown格式
processed_filepath = pdf2txt(procurement_path) # 纯文本提取
# 使用 ThreadPoolExecutor 并行处理 get_technical_requirements 和 get_business_requirements
with concurrent.futures.ThreadPoolExecutor() as executor:
@ -64,7 +66,7 @@ if __name__ == "__main__":
start_time=time.time()
output_folder = "C:\\Users\\Administrator\\Desktop\\货物标\\货物标output"
# file_path="C:\\Users\\Administrator\\Desktop\\货物标\\output1\\2-招标文件2020年广水市中小学教师办公电脑系统及多媒体“班班通”设备采购安装项目_procurement.pdf"
procurement_path = r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a\ztbfile_procurement.pdf"
procurement_path = r"C:\Users\Administrator\Desktop\货物标\output1\招标文件(107国道)_procurement.pdf"
procurement_docpath=r"C:\Users\Administrator\Desktop\fsdownload\fa0d51a1-0d63-4c0d-9002-cf8ac3f2211a"
invalid_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\db79e9e0-830e-442c-8cb6-1d036215f8ff\\ztbfile.pdf"
res=fetch_procurement_reqs(procurement_path,invalid_path)