diff --git a/docker-compose.yml b/docker-compose.yml index a017377..76c1e3b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,14 @@ services: # - .:/flask_project # 将当前目录挂载到容器的 /flask_project 目录(可选,便于开发时实时更新代码) - /home/Z/zbparse_output_dev:/flask_project/flask_app/static/output # 额外的数据卷挂载 restart: unless-stopped # 容器退出时自动重启,除非明确停止 - privileged: true + mem_limit: "8g" # 容器最大可使用内存为8GB + mem_reservation: "4g" # 容器保证可使用内存为4GB + cpus: 2.0 # 限制容器使用2个CPU核心 + ulimits: + nproc: 65535 # 允许的最大进程数 + nofile: + soft: 65535 # 软限制的文件描述符数 + hard: 65535 # 硬限制的文件描述符数 # 可选:定义网络或其他全局配置 # networks: diff --git a/flask_app/general/通义千问long.py b/flask_app/general/通义千问long.py index b143c08..1974635 100644 --- a/flask_app/general/通义千问long.py +++ b/flask_app/general/通义千问long.py @@ -64,7 +64,7 @@ def qianwen_long(file_id, user_query, max_retries=2, backoff_factor=1.0): - max_retries: 最大重试次数(默认 2 次) - backoff_factor: 指数退避的基础等待时间(默认 1.0 秒) """ - print("call qianwen_long...") + # print("call qianwen_long...") client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), @@ -264,8 +264,9 @@ def qianwen_long_text(file_id, user_query): if __name__ == "__main__": # Example file path - replace with your actual file path - file_path = r"C:\Users\Administrator\Desktop\货物标\output1\陕西省公安厅交通警察总队高速公路交通安全智能感知巡查系统项目 (1)_procurement.pdf" + file_path = r"C:\Users\Administrator\Desktop\货物标\截取test\2-招标文件_before.pdf" file_id = upload_file(file_path) + print(file_id) # # user_query1 = "该招标文件前附表中的项目名称是什么,请以json格式返回给我" # user_query2 = ("请提供文件中关于资格审查的具体内容和标准。") @@ -283,15 +284,3 @@ if __name__ == "__main__": # end_time=time.time() # print("elapsed time:"+str(end_time-start_time)) - user_query = """ -请你根据该货物标中采购要求部分的内容,请你给出"高速公路交通安全智能感知巡查系统软件"的技术参数(或采购要求),请以json格式返回结果,键名为"高速公路交通安全智能感知巡查系统软件", 键值为一个列表,列表中包含若干描述\"{}\"的技术参数(或采购要求)的字符串,需与原文完全一致,即若技术参数前存在序号也要保留,但你不可擅自增添或删减。以下为需要考虑的特殊情况:如果该货物没有相关采购要求或技术参数要求,键值为空列表。示例输出格式如下: -{{ - "摄像机控制键盘": [ - "1、支持串行 RS232/RS422 和 IP 混合控制,允许在一个控制器上使用 RS232/RS422/IP 控制单个系统中的摄像机;", - "2、支持 2 组 RS422 串口 VISCA 协议菊花链控制 2x7 台摄像机。" - ] -}} - """ - response = qianwen_long_stream(file_id, user_query) - print("完整响应内容:") - print(response) diff --git a/flask_app/test_case/test_qianwen_long.py b/flask_app/test_case/test_qianwen_long.py new file mode 100644 index 0000000..09dd91f --- /dev/null +++ b/flask_app/test_case/test_qianwen_long.py @@ -0,0 +1,53 @@ +import concurrent.futures +import os + +from flask_app.general.通义千问long import qianwen_long + + +def multi_threaded_calls(file_id, user_query, num_threads=10): + """ + 使用多线程同时调用 qianwen_long 函数。 + 参数: + - file_id: 上传文件的 ID + - user_query: 用户查询 + - num_threads: 并发线程数量(默认 10) + """ + results = [] + + # 定义一个辅助函数来调用 qianwen_long 并返回结果 + def call_function(thread_id): + print(f"线程 {thread_id} 开始调用 qianwen_long") + result = qianwen_long(file_id, user_query) + # print(f"线程 {thread_id} 调用完成,结果长度:{len(result)}") + return result + + # 使用 ThreadPoolExecutor 来管理线程池 + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + # 提交所有任务 + futures = {executor.submit(call_function, i + 1): i + 1 for i in range(num_threads)} + + # 处理完成的任务 + for future in concurrent.futures.as_completed(futures): + thread_id = futures[future] + try: + data = future.result() + results.append(data) + print(f"线程 {thread_id} 完成,返回数据:{data[:50]}...") # 只打印前50个字符 + except Exception as exc: + print(f"线程 {thread_id} 生成异常:{exc}") + + return results + + +if __name__ == "__main__": + # 定义参数 + # file_path = r"C:\Users\Administrator\Desktop\货物标\截取test\2-招标文件_before.pdf" + # file_id = upload_file(file_path) + file_id = "file-fe-ah0F0SJUY2cEzPx5nONRBvwd" + user_query = "项目名称是?" + num_threads = 200 # 并发线程数量 + # 执行多线程调用 + responses = multi_threaded_calls(file_id, user_query, num_threads) + # 打印所有响应 + for idx, response in enumerate(responses, start=1): + print(f"响应 {idx}: {response}") \ No newline at end of file diff --git a/flask_app/test_case/test_upload.py b/flask_app/test_case/test_upload.py new file mode 100644 index 0000000..e89ddf9 --- /dev/null +++ b/flask_app/test_case/test_upload.py @@ -0,0 +1,46 @@ +import requests +import concurrent.futures +import json + +# 定义请求的URL +url = "http://47.98.59.178:5000/upload" + +# 定义请求的Body +payload = { + "file_url": "https://bid-assistance.oss-cn-wuhan-lr.aliyuncs.com/test/094%E5%AE%9A%E7%A8%BF-%E6%B9%96%E5%8C%97%E5%B7%A5%E4%B8%9A%E5%A4%A7%E5%AD%A6%E8%BD%BB%E6%AD%A6%E5%99%A8%E6%A8%A1%E6%8B%9F%E5%B0%84%E5%87%BB%E8%AE%BE%E5%A4%87%E9%87%87%E8%B4%AD%E9%A1%B9%E7%9B%AE%E6%8B%9B%E6%A0%87%E6%96%87%E4%BB%B6.pdf?Expires=1733329351&OSSAccessKeyId=TMP.3KempAXSx1zKv24wTkVQsSwUfAGwieGmh3q9GyyLZN4bmaQWJiJ35tHbKEzWBuo316t8i9t3RgEAN9NtzF3gd4zC6yEsJM&Signature=92uuaITkGtcR0E5Y0P5BAaGM%2Fu8%3D", + "zb_type": 2 +} + +headers = { + "Content-Type": "application/json" +} + +def send_request(session, url, payload, headers): + try: + response = session.post(url, data=json.dumps(payload), headers=headers) + return response.status_code, response.text + except Exception as e: + return None, str(e) + +def main(): + num_requests = 5 # 并发请求数量 + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as executor: + with requests.Session() as session: + # 创建所有任务 + futures = [ + executor.submit(send_request, session, url, payload, headers) + for _ in range(num_requests) + ] + + # 等待所有任务完成 + for future in concurrent.futures.as_completed(futures): + status, text = future.result() + if status: + print(f"Status Code: {status}") + # print(f"Response: {text}") + else: + print(f"Request failed: {text}") + +if __name__ == "__main__": + main()