zbparse/flask_app/test_case/test_upload.py

47 lines
1.7 KiB
Python
Raw Permalink Normal View History

2024-12-04 17:04:40 +08:00
import requests
import concurrent.futures
import json
# 定义请求的URL
url = "http://47.98.59.178:5000/upload"
# 定义请求的Body
payload = {
"file_url": "https://bid-assistance.oss-cn-wuhan-lr.aliyuncs.com/test/094%E5%AE%9A%E7%A8%BF-%E6%B9%96%E5%8C%97%E5%B7%A5%E4%B8%9A%E5%A4%A7%E5%AD%A6%E8%BD%BB%E6%AD%A6%E5%99%A8%E6%A8%A1%E6%8B%9F%E5%B0%84%E5%87%BB%E8%AE%BE%E5%A4%87%E9%87%87%E8%B4%AD%E9%A1%B9%E7%9B%AE%E6%8B%9B%E6%A0%87%E6%96%87%E4%BB%B6.pdf?Expires=1733329351&OSSAccessKeyId=TMP.3KempAXSx1zKv24wTkVQsSwUfAGwieGmh3q9GyyLZN4bmaQWJiJ35tHbKEzWBuo316t8i9t3RgEAN9NtzF3gd4zC6yEsJM&Signature=92uuaITkGtcR0E5Y0P5BAaGM%2Fu8%3D",
"zb_type": 2
}
headers = {
"Content-Type": "application/json"
}
def send_request(session, url, payload, headers):
try:
response = session.post(url, data=json.dumps(payload), headers=headers)
return response.status_code, response.text
except Exception as e:
return None, str(e)
def main():
num_requests = 5 # 并发请求数量
with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as executor:
with requests.Session() as session:
# 创建所有任务
futures = [
executor.submit(send_request, session, url, payload, headers)
for _ in range(num_requests)
]
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
status, text = future.result()
if status:
print(f"Status Code: {status}")
# print(f"Response: {text}")
else:
print(f"Request failed: {text}")
if __name__ == "__main__":
main()