import concurrent.futures import json import time import sys def fetch_project_basic_info(knowledge_name, truncate0, output_folder, clause_path): time.sleep(2) # 模拟任务耗时5秒 return {"basic_info": "Project basic info"} def fetch_qualification_review(truncate1, truncate3, knowledge_name, truncate0_jsonpath, clause_path, input_file_path, output_folder): time.sleep(4) # 模拟任务耗时10秒 return {"qualification_review": "Qualification review"} def fetch_evaluation_standards(truncate1): time.sleep(6) # 模拟任务耗时15秒 return {"technical_standards": "Technical standards", "commercial_standards": "Commercial standards"} def fetch_invalid_requirements(invalid_docpath, output_folder, truncate0_jsonpath, clause_path, truncate3): time.sleep(8) # 模拟任务耗时20秒 return {"invalid_requirements": "Invalid requirements"} def fetch_bidding_documents_requirements(clause_path): time.sleep(10) # 模拟任务耗时25秒 return {"bidding_documents_requirements": "Bidding documents requirements"} def fetch_bid_opening(clause_path): time.sleep(12) # 模拟任务耗时30秒 return {"opening_bid": "Opening bid"} def transform_json_values(data): # 假设这个函数对数据进行某些转换,简单返回数据 return data def get_global_logger(unique_id): import logging logger = logging.getLogger(unique_id) return logger def preprocess_files(output_folder, downloaded_file_path, file_type, unique_id): # 模拟返回处理后的数据 return { 'knowledge_name': 'KnowledgeName_' + unique_id, 'truncate0': 'truncate0', 'truncate1': 'truncate1', 'truncate3': 'truncate3', 'truncate0_jsonpath': 'truncate0_jsonpath', 'clause_path': 'clause_path', 'invalid_docpath': 'invalid_docpath', 'input_file_path': 'input_file_path', 'output_folder': output_folder } def deleteKnowledge(knowledge_index): # 模拟删除操作 pass def main_process(output_folder, downloaded_file_path, file_type, unique_id): global logger logger = get_global_logger(unique_id) # 预处理文件,获取处理后的数据 processed_data = preprocess_files(output_folder, downloaded_file_path, file_type, unique_id) if not processed_data: yield json.dumps({}) # 如果处理数据失败,返回空的 JSON with concurrent.futures.ThreadPoolExecutor() as executor: futures = { 'base_info': executor.submit(fetch_project_basic_info, processed_data['knowledge_name'], processed_data['truncate0'], output_folder, processed_data['clause_path']), 'qualification_review': executor.submit(fetch_qualification_review, processed_data['truncate1'], processed_data['truncate3'], processed_data['knowledge_name'], processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['input_file_path'], processed_data['output_folder']), 'evaluation_standards': executor.submit(fetch_evaluation_standards, processed_data['truncate1']), 'invalid_requirements': executor.submit(fetch_invalid_requirements, processed_data['invalid_docpath'], output_folder, processed_data['truncate0_jsonpath'], processed_data['clause_path'], processed_data['truncate3']), 'bidding_documents_requirements': executor.submit(fetch_bidding_documents_requirements, processed_data['clause_path']), 'opening_bid': executor.submit(fetch_bid_opening, processed_data['clause_path']) } # 按照任务完成的顺序返回结果 for future in concurrent.futures.as_completed(futures.values()): key = next(k for k, v in futures.items() if v == future) try: result = future.result() # 记录哪个任务完成了 logger.info(f"Task {key} completed.") # 处理 evaluation_standards 返回的技术标和商务标分别作为两个结果 if key == 'evaluation_standards': technical_standards = result["technical_standards"] commercial_standards = result["commercial_standards"] # 分别发送技术标和商务标 modified_technical_result = transform_json_values({"technical_standards": technical_standards}) modified_commercial_result = transform_json_values({"commercial_standards": commercial_standards}) yield json.dumps(modified_technical_result, ensure_ascii=False) # 直接返回 JSON 数据 yield json.dumps(modified_commercial_result, ensure_ascii=False) # 直接返回 JSON 数据 sys.stdout.flush() # 强制刷新流式数据 else: modified_result = transform_json_values({key: result}) yield json.dumps(modified_result, ensure_ascii=False) # 直接返回 JSON 数据 sys.stdout.flush() # 强制刷新流式数据 except Exception as exc: logger.error(f"Error processing {key}: {exc}") yield json.dumps({'error': f'Error processing {key}: {str(exc)}'}, ensure_ascii=False) sys.stdout.flush() # 删除知识索引 deleteKnowledge(processed_data['knowledge_name']) # 调用主函数,模拟执行