zbparse/flask_app/general/清除file_id.py
2024-12-18 11:31:47 +08:00

117 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# flask_app/general/清除file_id.py
import os
from openai import OpenAI
# 初始化 OpenAI 客户端
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
def delete_all_files():
"""
查询所有文件并删除。
"""
try:
# 获取文件列表
file_stk = client.files.list()
# 将文件信息解析为字典格式
file_data = file_stk.to_dict()
# 提取所有文件的 id
file_ids = [file["id"] for file in file_data["data"]]
# 循环删除每个文件
for file_id in file_ids:
try:
file_object = client.files.delete(file_id)
print(f"Deleted file with id: {file_id} - {file_object}")
except Exception as e:
print(f"Failed to delete file with id {file_id}: {e}")
except Exception as e:
print(f"An error occurred while deleting files: {e}")
def read_file_ids(output_folder):
"""
从指定文件夹中读取 file_ids.txt 文件,返回文件 ID 列表。
:param output_folder: 输出文件夹路径
:return: 文件 ID 列表
"""
file_ids = []
file_ids_path = os.path.join(output_folder, 'file_ids.txt')
# 检查文件是否存在
if os.path.exists(file_ids_path):
try:
with open(file_ids_path, 'r', encoding='utf-8') as file:
# 按行读取文件内容
file_ids = [line.strip() for line in file.readlines()]
print(f"读取到的文件 ID 列表:{file_ids}")
except Exception as e:
print(f"读取 file_ids.txt 文件时发生错误: {e}")
else:
print(f"文件 {file_ids_path} 不存在。")
return file_ids
from concurrent.futures import ThreadPoolExecutor, as_completed
def delete_file_by_ids(file_ids):
"""
根据传入的 file_id 列表删除指定的文件。
:param file_ids: 一个包含文件 ID 的字符串列表
:return: 一个包含删除失败的 file_id 的列表,如果全部成功则返回空列表
"""
failed_file_ids = []
if not isinstance(file_ids, list):
print("Error: file_ids should be a list.")
return file_ids # 返回所有 file_ids 作为失败项
if not all(isinstance(file_id, str) for file_id in file_ids):
print("Error: Each file_id should be a string.")
# 找出非字符串的 file_id 并添加到失败列表
failed_file_ids = [file_id for file_id in file_ids if not isinstance(file_id, str)]
# 仅尝试删除有效的 file_id
valid_file_ids = [file_id for file_id in file_ids if isinstance(file_id, str)]
else:
valid_file_ids = file_ids
def delete_file(file_id):
try:
# 假设 openai.File.delete 会返回一个文件对象
file_object = client.files.delete(file_id)
return None # 成功返回 None
except Exception as e:
print(f"Failed to delete file with id {file_id}: {e}")
return file_id # 返回失败的 file_id
try:
with ThreadPoolExecutor(max_workers=len(valid_file_ids)) as executor:
# 提交所有删除任务
futures = {executor.submit(delete_file, file_id): file_id for file_id in valid_file_ids}
# 收集结果
for future in as_completed(futures):
result = future.result()
if result is not None: # 如果返回值不是 None表示删除失败
failed_file_ids.append(result)
except Exception as e:
print(f"An error occurred while processing the file_ids: {e}")
# 如果发生整体异常,认为所有未处理的 file_ids 都失败了
failed_file_ids.extend(valid_file_ids)
return failed_file_ids
if __name__ == '__main__':
# delete_all_files()
# file_ids=["file-fe-Lu6GKmTR1NjimC2tOebRAIYT"]
# delete_file_by_ids(file_ids)
delete_all_files()