2024-09-25 15:58:27 +08:00

125 lines
5.8 KiB
Python

import concurrent.futures
import json
import time
import sys
def fetch_project_basic_info(knowledge_name, truncate0, output_folder, clause_path):
time.sleep(2) # 模拟任务耗时5秒
return {"basic_info": "Project basic info"}
def fetch_qualification_review(truncate1, truncate3, knowledge_name, truncate0_jsonpath, clause_path, input_file_path, output_folder):
time.sleep(4) # 模拟任务耗时10秒
return {"qualification_review": "Qualification review"}
def fetch_evaluation_standards(truncate1):
time.sleep(6) # 模拟任务耗时15秒
return {"technical_standards": "Technical standards", "commercial_standards": "Commercial standards"}
def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3):
time.sleep(8) # 模拟任务耗时20秒
return {"invalid_requirements": "Invalid requirements"}
def fetch_bidding_documents_requirements(clause_path):
time.sleep(10) # 模拟任务耗时25秒
return {"bidding_documents_requirements": "Bidding documents requirements"}
def fetch_bid_opening(clause_path):
time.sleep(12) # 模拟任务耗时30秒
return {"opening_bid": "Opening bid"}
def transform_json_values(data):
# 假设这个函数对数据进行某些转换,简单返回数据
return data
def get_global_logger(unique_id):
import logging
logger = logging.getLogger(unique_id)
return logger
def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id):
# 模拟返回处理后的数据
return {
'knowledge_name': 'KnowledgeName_' + unique_id,
'truncate0': 'truncate0',
'truncate1': 'truncate1',
'truncate3': 'truncate3',
'truncate0_jsonpath': 'truncate0_jsonpath',
'clause_path': 'clause_path',
'invalid_docpath': 'invalid_docpath',
'input_file_path': 'input_file_path',
'output_folder': output_folder
}
def deleteKnowledge(knowledge_index):
# 模拟删除操作
pass
def main_process(output_folder, downloaded_file_path, file_type, unique_id):
global logger
logger = get_global_logger(unique_id)
# 预处理文件,获取处理后的数据
processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id)
if not processed_data:
yield json.dumps({}) # 如果处理数据失败,返回空的 JSON
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'],
processed_data['truncate0'], output_folder,
processed_data['clause_path']),
'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'],
processed_data['truncate3'],
processed_data['knowledge_name'],
processed_data['truncate0_jsonpath'],
processed_data['clause_path'], processed_data['input_file_path'],
processed_data['output_folder']),
'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']),
'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'],
output_folder, processed_data['truncate0_jsonpath'],
processed_data['clause_path'], processed_data['truncate3']),
'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements,
processed_data['clause_path']),
'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path'])
}
# 按照任务完成的顺序返回结果
for future in concurrent.futures.as_completed(futures.values()):
key = next(k for k, v in futures.items() if v == future)
try:
result = future.result()
# 记录哪个任务完成了
logger.info(f"Task {key} completed.")
# 处理 evaluation_standards 返回的技术标和商务标分别作为两个结果
if key == 'evaluation_standards':
technical_standards = result["technical_standards"]
commercial_standards = result["commercial_standards"]
# 分别发送技术标和商务标
modified_technical_result = transform_json_values({"technical_standards": technical_standards})
modified_commercial_result = transform_json_values({"commercial_standards": commercial_standards})
yield json.dumps(modified_technical_result, ensure_ascii=False) # 直接返回 JSON 数据
yield json.dumps(modified_commercial_result, ensure_ascii=False) # 直接返回 JSON 数据
sys.stdout.flush() # 强制刷新流式数据
else:
modified_result = transform_json_values({key: result})
yield json.dumps(modified_result, ensure_ascii=False) # 直接返回 JSON 数据
sys.stdout.flush() # 强制刷新流式数据
except Exception as exc:
logger.error(f"Error processing {key}: {exc}")
yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False)
sys.stdout.flush()
# 删除知识索引
deleteKnowledge(processed_data['knowledge_name'])
# 调用主函数,模拟执行