zbparse/flask_app/main/start_up.py
2024-10-18 15:44:18 +08:00

548 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import shutil
import time
import uuid
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, Response, stream_with_context, g
import json
import os
from flask_app.general.little_zbparse import little_parse_main
from flask_app.main.download import download_file
from flask_app.main.post_processing import outer_post_processing
from flask_app.main.招标文件解析 import engineering_bid_main
from flask_app.货物标.货物标解析main import goods_bid_main
from flask_app.货物标.技术要求提取 import get_technical_requirements_main
from flask_app.货物标.货物标截取pdf import truncate_pdf_main
app = Flask(__name__)
class CSTFormatter(logging.Formatter):
"""自定义的 Formatter将日志的时间戳调整为中国标准时间UTC+8"""
def formatTime(self, record, datefmt=None):
ct = datetime.fromtimestamp(record.created) + timedelta(hours=8)
if datefmt:
s = ct.strftime(datefmt)
else:
try:
s = ct.strftime("%Y-%m-%d %H:%M:%S")
if self.usesTime():
s = f"{s},{record.msecs:03d}"
except ValueError:
s = ct.strftime("%Y-%m-%d %H:%M:%S")
return s
@app.before_request
def before_request():
# 每个请求开始前初始化 logger
create_logger() # 确保这个函数中设置了 g.logger
def create_logger():
unique_id = str(uuid.uuid4())
g.unique_id = unique_id
output_folder = f"flask_app/static/output/{unique_id}"
os.makedirs(output_folder, exist_ok=True)
log_filename = "log.txt"
log_path = os.path.join(output_folder, log_filename)
logger = logging.getLogger(unique_id)
if not logger.handlers:
file_handler = logging.FileHandler(log_path)
file_formatter = CSTFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(stream_handler)
logger.setLevel(logging.INFO)
g.logger = logger
# @app.route('/upload', methods=['POST'])
# def zbparse():
# logger=g.logger
# file_url = validate_request()
# if isinstance(file_url, tuple): # Check if the returned value is an error response
# return file_url
# try:
# logger.info("starting parsing url:" + file_url)
# final_json_path, output_folder= download_and_process_file(file_url)
# if not final_json_path:
# return jsonify({'error': 'File processing failed'}), 500
# response = generate_response(final_json_path) # 先获取响应内容
# # remove_directory(output_folder) # 然后删除文件夹
# return response # 最后返回获取的响应
# except Exception as e:
# logger.error('Exception occurred: ' + str(e)) # 使用全局 logger 记录
# return jsonify({'error': str(e)}), 500
# def download_and_process_file(file_url):
# logger = g.logger
# unique_id = g.unique_id
# output_folder = f"flask_app/static/output/{unique_id}" # 直接使用全局 unique_id 构建路径
# filename = "ztbfile"
# downloaded_filename = os.path.join(output_folder, filename)
#
# # 下载文件,假设 download_file 函数已正确处理异常并返回文件路径
# downloaded_filepath, file_type = download_file(file_url, downloaded_filename)
#
# if downloaded_filepath is None or file_type == 3:
# logger.error("Unsupported file type or failed to download file")
# return None, output_folder
#
# logger.info("Local file path: " + downloaded_filepath)
# processed_file_path = engineering_bid_main(output_folder, downloaded_filepath, file_type, unique_id)
# return processed_file_path, output_folder
# def generate_response(final_json_path):
# logger = g.logger
# # 检查final_json_path是否为空或None
# if not final_json_path:
# logger.error('Empty or None path provided for final_json.')
# return jsonify({'error': 'No path provided for final_json.'}), 400
# if not os.path.exists(final_json_path):
# logger.error('final_json not found at path: ' + final_json_path)
# return jsonify({'error': 'final_json not found'}), 404
# with open(final_json_path, 'r', encoding='utf-8') as f:
# logger.info('final_json_path:' + final_json_path)
# zbparse_data = json.load(f)
# json_str = json.dumps(zbparse_data, ensure_ascii=False)
# return jsonify({
# 'message': 'File uploaded and processed successfully',
# 'filename': os.path.basename(final_json_path),
# 'data': json_str
# })
def validate_request(default_zb_type=1):
if not request.is_json:
return jsonify({'error': 'Missing JSON in request'}), 400
file_url = request.json.get('file_url')
zb_type = request.json.get('zb_type', default_zb_type)
if not file_url:
return jsonify({'error': 'No file URL provided'}), 400
try:
zb_type = int(zb_type)
except (ValueError, TypeError):
return jsonify({'error': 'Invalid zb_type provided'}), 400
return file_url, zb_type
#提取采购需求
@app.route('/procurement_reqs', methods=['POST'])
def get_procurement_reqs():
logger = g.logger
file_url, zb_type = validate_request()
if isinstance(file_url, tuple): # Check if the returned value is an error response
return file_url
try:
logger.info("starting parsing url:" + file_url)
if zb_type != 2:
logger.error(f"Invalid zb_type: {zb_type}. Expected zb_type: 2")
return jsonify({
'error': 'Invalid zb_type',
'message': 'This endpoint only supports zb_type 2 (procurement requirements)'
}), 400
else:
response = download_and_process_file_for_procurement(file_url)
return jsonify({
'message': 'procurement_reqs processed successfully',
'filename': "filename",
'data': json.dumps(response, ensure_ascii=False)
})
except Exception as e:
logger.error('Exception occurred: ' + str(e)) # 使用全局 logger 记录
return jsonify({'error': str(e)}), 500
#提取采购需求
def download_and_process_file_for_procurement(file_url):
logger = g.logger
unique_id = g.unique_id
output_folder = f"flask_app/static/output/{unique_id}" # 直接使用全局 unique_id 构建路径
filename = "ztbfile"
downloaded_filename = os.path.join(output_folder, filename)
# 下载文件,假设 download_file 函数已正确处理异常并返回文件路径
downloaded_filepath, file_type = download_file(file_url, downloaded_filename)
if downloaded_filepath is None or file_type == 4:
logger.error("Unsupported file type or failed to download file")
return None
logger.info("Local file path: " + downloaded_filepath)
res =get_technical_requirements_main(downloaded_filepath,output_folder)
return res
@app.route('/little_zbparse',methods=['POST'])
def little_zbparse():
logger=g.logger
file_url,zb_type = validate_request()
if isinstance(file_url, tuple): # Check if the returned value is an error response
return file_url
try:
logger.info("starting parsing url:" + file_url)
final_json_path= download_and_process_file(file_url,zb_type)
if not final_json_path:
return jsonify({'error': 'File processing failed'}), 500
response = generate_response(final_json_path) # 先获取响应内容
# remove_directory(output_folder) # 然后删除文件夹
return response # 最后返回获取的响应
except Exception as e:
logger.error('Exception occurred: ' + str(e)) # 使用全局 logger 记录
return jsonify({'error': str(e)}), 500
def download_and_process_file(file_url,zb_type):
logger = g.logger
unique_id = g.unique_id
output_folder = f"flask_app/static/output/{unique_id}" # 直接使用全局 unique_id 构建路径
filename = "ztbfile"
downloaded_filename = os.path.join(output_folder, filename)
# 下载文件,假设 download_file 函数已正确处理异常并返回文件路径
downloaded_filepath, file_type = download_file(file_url, downloaded_filename)
if downloaded_filepath is None or file_type == 4:
logger.error("Unsupported file type or failed to download file")
return None
logger.info("Local file path: " + downloaded_filepath)
processed_file_path = little_parse_main(output_folder, downloaded_filepath, file_type,zb_type,unique_id)
return processed_file_path
def generate_response(final_json_path):
logger = g.logger
# 检查final_json_path是否为空或None
if not final_json_path:
logger.error('Empty or None path provided for final_json.')
return jsonify({'error': 'No path provided for final_json.'}), 400
if not os.path.exists(final_json_path):
logger.error('final_json not found at path: ' + final_json_path)
return jsonify({'error': 'final_json not found'}), 404
with open(final_json_path, 'r', encoding='utf-8') as f:
logger.info('extracted_info_path:' + final_json_path)
zbparse_data = json.load(f)
json_str = json.dumps(zbparse_data, ensure_ascii=False)
return jsonify({
'message': 'Little Parse processed successfully',
'filename': os.path.basename(final_json_path),
'data': json_str
})
# 流式
@app.route('/upload', methods=['POST'])
def zbparse():
logger = g.logger
logger.info("zbparse start!!!")
# 获取并显示接收到的 JSON 数据
received_data = request.get_json()
logger.info("Received JSON data: " + str(received_data))
file_url,zb_type = validate_request()
if isinstance(file_url, tuple): # Check if the returned value is an error response
return file_url
try:
logger.info("starting parsing url:" + file_url)
return Response(stream_with_context(process_and_stream(file_url,zb_type)), content_type='text/event-stream')
except Exception as e:
logger.error('Exception occurred: ' + str(e))
return jsonify({'error': str(e)}), 500
# 分段返回
def process_and_stream(file_url, zb_type):
"""
下载文件并进行处理,支持工程标和货物标的处理。
参数:
- file_url (str): 文件的URL地址。
- zb_type (int): 标的类型1表示工程标2表示货物标。
返回:
- generator: 生成处理过程中的流式响应。
"""
logger = g.logger
unique_id = g.unique_id
output_folder = f"flask_app/static/output1/{unique_id}"
filename = "ztbfile"
downloaded_filename = os.path.join(output_folder, filename)
start_time = time.time() # 记录开始时间
try:
# 下载文件
downloaded = download_file(file_url, downloaded_filename)
if not downloaded:
logger.error("下载文件失败或不支持的文件类型")
error_response = {
'message': 'File processing failed',
'filename': None,
'data': json.dumps({'error': 'File processing failed'})
}
yield f"data: {json.dumps(error_response)}\n\n"
return
downloaded_filepath, file_type = downloaded
# 检查文件类型
if file_type == 4:
logger.error("不支持的文件类型")
error_response = {
'message': 'Unsupported file type',
'filename': None,
'data': json.dumps({'error': 'Unsupported file type'})
}
yield f"data: {json.dumps(error_response)}\n\n"
return
logger.info("本地文件路径: " + downloaded_filepath)
combined_data = {}
# 根据zb_type选择调用的处理函数
processing_functions = {
1: engineering_bid_main,
2: goods_bid_main
}
processing_func = processing_functions.get(zb_type, engineering_bid_main) #默认按工程标解析
# 从 processing_func 获取数据
for data in processing_func(output_folder, downloaded_filepath, file_type, unique_id):
if not data.strip():
logger.error("Received empty data, skipping JSON parsing.")
continue # Skip processing empty data
try:
parsed_data = json.loads(data)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
logger.error(f"Data received: {data}")
continue # Skip data if JSON parsing fails
# 遍历 parsed_data 只提取内层内容进行合并
for outer_key, inner_dict in parsed_data.items():
if isinstance(inner_dict, dict):
combined_data.update(inner_dict)
# 日志记录已合并数据
# 每次数据更新后,流式返回当前进度
response = {
'message': 'Processing',
'filename': os.path.basename(downloaded_filepath),
'data': data
}
yield f"data: {json.dumps(response, ensure_ascii=False)}\n\n"
# 日志记录已合并数据
logger.info(f"合并后的数据: {json.dumps(combined_data, ensure_ascii=False, indent=4)}")
# **保存 combined_data 到 output_folder 下的 'final_result.json'**
output_json_path = os.path.join(output_folder, 'final_result.json')
extracted_info_path=os.path.join(output_folder, 'extracted_result.json')
includes = ["基础信息", "资格审查", "商务评分", "技术评分", "无效标与废标项", "投标文件要求", "开评定标流程"]
final_result, extracted_info,procurement_reqs = outer_post_processing(combined_data, includes)
procurement_reqs_response={
'message': 'procurement_reqs',
'filename': os.path.basename(downloaded_filepath),
'data': json.dumps(procurement_reqs, ensure_ascii=False)
}
yield f"data: {json.dumps(procurement_reqs_response, ensure_ascii=False)}\n\n"
try:
with open(extracted_info_path, 'w', encoding='utf-8') as json_file:
json.dump(extracted_info, json_file, ensure_ascii=False, indent=4)
logger.info(f"摘取后的数据已保存到 '{extracted_info_path}'")
except IOError as e:
logger.error(f"保存JSON文件时出错: {e}")
try:
with open(output_json_path, 'w', encoding='utf-8') as json_file:
json.dump(final_result, json_file, ensure_ascii=False, indent=4)
logger.info(f"合并后的数据已保存到 '{output_json_path}'")
except IOError as e:
logger.error(f"保存JSON文件时出错: {e}")
extracted_info_response = {
'message': 'extracted_info',
'filename': os.path.basename(downloaded_filepath),
'data': json.dumps(extracted_info, ensure_ascii=False)
}
yield f"data: {json.dumps(extracted_info_response, ensure_ascii=False)}\n\n"
# 最后发送合并后的完整数据
complete_response = {
'message': 'Combined_data',
'filename': os.path.basename(downloaded_filepath),
'data': json.dumps(final_result, ensure_ascii=False)
}
yield f"data: {json.dumps(complete_response, ensure_ascii=False)}\n\n"
# 发送最终响应
final_response = {
'message': 'File uploaded and processed successfully',
'filename': os.path.basename(downloaded_filepath),
'data': 'END'
}
yield f"data: {json.dumps(final_response)}\n\n"
finally:
end_time = time.time() # 记录结束时间
duration = end_time - start_time
logger.info(f"Total processing time: {duration:.2f} seconds")
@app.route('/api/test_zbparse', methods=['POST'])
def test_zbparse():
try:
return Response(stream_with_context(test_process_and_stream()), content_type='text/event-stream')
except Exception as e:
app.logger.error('Exception occurred: ' + str(e))
return jsonify({'error': str(e)}), 500
def test_process_and_stream():
# 模拟七段数据,每段包含指定的中文键名和更多详细数据
data_segments = [
{
"base_info": {
"基础信息": {
"project_name": "测试项目1",
"project_code": "TP001",
"project_manager": "张三",
"start_date": "2024-01-10",
"end_date": "2024-12-31"
}
}
},
{
"qualification_review": {
"资格审查": {
"review_criteria": ["公司资质", "过往业绩", "财务报表"],
"required_documents": ["营业执照", "资质证书", "近三年财务报告"],
"minimum_requirements": {
"company_age": "至少5年",
"past_projects": "至少3个大型项目"
}
}
}
},
{
"technical_standards": {
"技术标": {
"technical_requirements": ["设备质量要求", "施工工艺", "安全标准"],
"materials_list": ["钢筋", "水泥", "电缆"],
"equipment_specs": {
"excavator": "型号X123",
"concrete_mixer": "型号Y456"
}
}
}
},
{
"commercial_standards": {
"商务标": {
"pricing_method": "固定总价",
"payment_schedule": ["30%合同签订", "40%中期支付", "30%项目完成支付"],
"contract_conditions": {
"warranty_period": "2年",
"penalty_clauses": "延期每周罚款5%"
}
}
}
},
{
"invalid_requirements": {
"无效标与废标项": {
"common_issues": ["未按要求提交保证金", "技术标不达标"],
"invalidation_reasons": {
"missing_documents": "缺少必要文件",
"unqualified_technical_specs": "技术规格不合要求"
}
}
}
},
{
"bidding_documents_requirements": {
"投标文件要求": {
"file_format": "PDF",
"submission_deadline": "2024-08-01 17:00",
"submission_location": "北京市某某大厦5楼",
"required_sections": ["公司简介", "技术方案", "商务报价"]
}
}
},
{
"opening_bid": {
"开评定标流程": {
"bid_opening_time": "2024-09-01 10:00",
"location": "会议室A",
"evaluation_criteria": ["价格", "技术能力", "项目经验"],
"evaluation_process": {
"first_round": "资格审查",
"second_round": "技术评分",
"final_round": "商务报价评定"
}
}
}
}
]
filename = "test_file.pdf"
for i, data in enumerate(data_segments, 1):
response = {
'message': f'Processing segment {i}',
'filename': filename,
'data': data
}
yield f"data: {json.dumps(response, ensure_ascii=False)}\n\n"
time.sleep(3) # 每隔5秒发送一段数据
# 在结束信号之前发送完整的数据
combined_data = {}
for segment in data_segments:
for outer_key, inner_dict in segment.items():
# 获取内层字典的第一个(也是唯一的)键值对
inner_key, inner_value = next(iter(inner_dict.items()))
combined_data[inner_key] = inner_value
# 发送完整的大字典
complete_response = {
'message': 'Combined data',
'filename': filename,
'data': combined_data
}
yield f"data: {json.dumps(complete_response, ensure_ascii=False)}\n\n"
# 发送结束信号
final_response = {
'message': 'File processed successfully',
'filename': filename,
'data': 'END'
}
yield f"data: {json.dumps(final_response, ensure_ascii=False)}\n\n"
# @app.route('/get_json', methods=['POST'])
# def testjson():
# final_json_path="C:\\Users\\Administrator\\Desktop\\fsdownload\\temp4\\fd55f067-2cf6-475c-b7ce-4498f6606bf6\\final_result.json"
# with open(final_json_path, 'r', encoding='utf-8') as f:
# print('final_json_path:'+final_json_path)
# zbparse_data = json.load(f)
# json_str = json.dumps(zbparse_data, ensure_ascii=False)
# print(json_str)
# return jsonify({
# 'message': 'File uploaded and processed successfully',
# 'filename': os.path.basename(final_json_path),
# 'data': json_str
# })
def remove_directory(path):
logger = g.logger
try:
shutil.rmtree(path)
logger.info(f"Successfully removed directory: {path}") # 使用全局 logger 记录
except Exception as e:
logger.error(f"Failed to remove directory {path}: {str(e)}") # 使用全局 logger 记录
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)