389 lines
14 KiB
Python
Raw Permalink Normal View History

import logging
import time
from minio import Minio
from minio.error import S3Error
import os
import urllib.parse
from middleware.util import get_current_date_and_milliseconds
client = Minio(
endpoint="222.212.85.86:9000", # MinIO 服务器地址
access_key="adminjdskfj", # 替换为你的 Access Key
secret_key="123456ksldjfal@Y", # 替换为你的 Secret Key
secure=False # 如果未启用 HTTPS 则设为 False
)
first_dir = 'ai_result'
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def create_bucket():
'''
访问 MinIO 服务器打印存储桶
'''
try:
buckets = client.list_buckets()
for bucket in buckets:
print(f"Bucket: {bucket.name}, Created: {bucket.creation_date}")
except S3Error as e:
print(f"Error: {e}")
def downFile(object_name):
'''下载文件并返回本地路径'''
if not object_name or not isinstance(object_name, str):
logger.error(f"Invalid object name: {object_name}")
return None
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f"
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
download_path = os.path.join(current_dir, os.path.basename(object_name))
# 确保目录存在
os.makedirs(os.path.dirname(download_path), exist_ok=True)
logger.info(f"Attempting to download {object_name} from bucket {bucket_name} to {download_path}")
client.fget_object(
bucket_name=bucket_name,
object_name=object_name,
file_path=download_path
)
logger.info(f"Successfully downloaded file to: {download_path}")
return download_path
except S3Error as e:
logger.error(f"MinIO download error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading file: {e}", exc_info=True)
return None
def downBigFile(object_name):
'''下载文件并返回本地路径,支持大文件进度输出'''
if not object_name or not isinstance(object_name, str):
logger.error(f"Invalid object name: {object_name}")
return None
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f"
try:
current_dir = os.path.dirname(os.path.abspath(__file__))
download_path = os.path.join(current_dir, os.path.basename(object_name))
# 确保目录存在
os.makedirs(os.path.dirname(download_path), exist_ok=True)
logger.info(f"Attempting to download {object_name} from bucket {bucket_name} to {download_path}")
# 获取文件总大小
try:
stat = client.stat_object(bucket_name, object_name)
total_size = stat.size
except S3Error as e:
logger.error(f"Failed to get object stats: {e}")
return None
# 获取对象数据流
response = client.get_object(bucket_name, object_name)
# 手动实现进度跟踪
downloaded_size = 0
chunk_size = 8192 # 8KB chunks
with open(download_path, 'wb') as file:
while True:
data = response.read(chunk_size)
if not data:
break
file.write(data)
downloaded_size += len(data)
# 打印进度
percent = (downloaded_size / total_size) * 100
print(f"\r下载进度: {percent:.2f}% ({downloaded_size}/{total_size} bytes)", end="", flush=True)
print("\n下载完成!") # 换行,避免进度条影响后续日志
response.close()
response.release_conn()
logger.info(f"Successfully downloaded file to: {download_path}")
return download_path
except S3Error as e:
logger.error(f"MinIO download error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading file: {e}", exc_info=True)
return None
def upload_folder(folder_path, bucket_directory):
"""
上传文件夹中的所有文件到 MinIO 指定目录
:param folder_path: 本地文件夹路径
:param bucket_name: MinIO 存储桶名称
:param bucket_directory: MinIO 存储桶内的目标目录可选
"""
# 要下载的桶名和对象名
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f" # 你的桶名称
ai_dir_name = "ai_result"
formatted_date, milliseconds_timestamp = get_current_date_and_milliseconds()
dir_name = os.path.basename(os.path.normpath(folder_path))
file_save_dir = f"{ai_dir_name}/{str(formatted_date)}/{dir_name}"
try:
# 确保存储桶存在
if not client.bucket_exists(bucket_name):
print(f"存储桶 {bucket_name} 不存在")
# 遍历文件夹中的所有文件
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
file_path_dir = os.path.dirname(folder_path)
relative_path = os.path.relpath(file_path, start=file_path_dir)
relative_path = relative_path.replace(os.sep, '/') # 替换文件夹分割符号
object_name = f"{file_save_dir}/{relative_path}"
# if bucket_directory:
# object_name = f"{file_save_dir}/{str(milliseconds_timestamp)}-{file_name}"
# else:
# object_name = f"{file_save_dir}//{str(milliseconds_timestamp)}-{file_name}"
# 上传文件
client.fput_object(bucket_name, object_name, file_path)
print(f"文件 {file_path} 已上传至 {bucket_name}/{object_name}")
return file_save_dir
except S3Error as e:
print(f"上传文件夹时出错: {e}")
def upload_file(file_path, bucket_directory):
"""
上传文件到 MinIO 指定目录
:param file_path: 本地文件路径
:param bucket_name: MinIO 存储桶名称
:param bucket_directory: MinIO 存储桶内的目标目录可选
"""
# 要下载的桶名和对象名
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f" # 你的桶名称
dir_name = "ai_result"
try:
# 确保存储桶存在
if not client.bucket_exists(bucket_name):
print(f"存储桶 {bucket_name} 不存在")
# 获取文件名
file_name = os.path.basename(file_path)
formatted_date, milliseconds_timestamp = get_current_date_and_milliseconds()
# 如果指定了桶目录,则添加前缀
if bucket_directory:
object_name = f"{dir_name}/{str(formatted_date)}/{str(milliseconds_timestamp)}-{file_name}"
else:
object_name = f"{dir_name}/{str(formatted_date)}/{str(milliseconds_timestamp)}-{file_name}"
# 上传文件
client.fput_object(bucket_name, object_name, file_path)
print(f"文件 {file_path} 已上传至 {bucket_name}/{object_name}")
return object_name, "pic"
except S3Error as e:
print(f"上传文件时出错: {e}")
# 将内存中的缓存直接上传minio不做本地存储
def upload_file_from_buffer(buffer, file_name, bucket_directory=None):
"""
上传二进制流到 MinIO 指定目录
:param buffer: BytesIO 对象包含要上传的二进制数据
:param bucket_name: MinIO 存储桶名称
:param bucket_directory: MinIO 存储桶内的目标目录可选
"""
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f" # 你的桶名称
dir_name = "ai_result"
try:
# 确保存储桶存在
if not client.bucket_exists(bucket_name):
print(f"存储桶 {bucket_name} 不存在")
return None
# 获取文件名(如果没有指定目录,则使用默认文件名)
# file_name = "uploaded_file.png" # 默认文件名,可以根据需要修改
if file_name is None:
file_name = "frame.jpg"
formatted_date, milliseconds_timestamp = get_current_date_and_milliseconds()
# # 如果指定了桶目录,则添加前缀
# if bucket_directory:
# object_name = f"{dir_name}/{bucket_directory.rstrip('/')}/{file_name}"
# else:
# object_name = f"{dir_name}/{file_name}"
if bucket_directory:
object_name = f"{dir_name}/{str(formatted_date)}/{str(milliseconds_timestamp)}-{file_name}"
else:
object_name = f"{dir_name}/{str(formatted_date)}/{str(milliseconds_timestamp)}-{file_name}"
# 上传二进制流
# 注意buffer.getvalue() 返回二进制数据
client.put_object(
bucket_name=bucket_name,
object_name=object_name,
data=buffer,
length=buffer.getbuffer().nbytes,
content_type="image/png" # 根据实际内容类型设置
)
print(f"二进制流已上传至 {bucket_name}/{object_name}")
return object_name, "pic"
except S3Error as e:
print(f"上传二进制流时出错: {e}")
return None
from io import BytesIO
def upload_frame_buff_from_buffer(frame_buff, file_name=None, bucket_directory=None):
"""
上传二进制流到 MinIO 指定目录
:param frame_buff: bytes 对象包含要上传的二进制数据
:param file_name: 可选指定文件名
:param bucket_directory: MinIO 存储桶内的目标目录可选
"""
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f"
dir_name = "ai_result"
try:
if not client.bucket_exists(bucket_name):
print(f"存储桶 {bucket_name} 不存在")
return None
if file_name is None:
file_name = "frame.jpg"
formatted_date, milliseconds_timestamp = get_current_date_and_milliseconds()
object_name = f"{dir_name}/{str(formatted_date)}/{str(milliseconds_timestamp)}-{file_name}"
# 将 bytes 包装在 BytesIO 对象中
buffer = BytesIO(frame_buff)
client.put_object(
bucket_name=bucket_name,
object_name=object_name,
data=buffer,
length=len(frame_buff), # 使用原始 bytes 的长度
content_type="image/jpeg"
)
print(f"二进制流已上传至 {bucket_name}/{object_name}")
return object_name, "pic"
except S3Error as e:
print(f"上传二进制流时出错: {e}")
return None
def upload_video_buff_from_buffer(video_buff, file_name=None, bucket_directory=None, video_format="mp4"):
"""
上传视频二进制流MP4/FLV MinIO 指定目录
:param video_buff: bytes 对象包含要上传的视频二进制数据
:param file_name: 可选指定视频文件名无需扩展名 video_format 决定
:param bucket_directory: MinIO 存储桶内的目标目录可选
:param video_format: 视频格式支持 "mp4" "flv"
:return: 上传后的对象路径和文件类型"video"失败时返回 None
"""
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f"
dir_name = "ai_result" # 默认目录
try:
if not client.bucket_exists(bucket_name):
print(f"存储桶 {bucket_name} 不存在")
return None
# 1. 处理文件名和扩展名
if file_name is None:
file_name = "video" # 默认无扩展名
# 根据 video_format 添加扩展名
if video_format.lower() == "flv":
file_name = f"{file_name}.flv" if not file_name.lower().endswith(".flv") else file_name
content_type = "video/x-flv" # FLV 的 MIME 类型
else: # 默认 MP4
file_name = f"{file_name}.mp4" if not file_name.lower().endswith(".mp4") else file_name
content_type = "video/mp4"
formatted_date, milliseconds_timestamp = get_current_date_and_milliseconds()
object_name = f"{dir_name}/{str(formatted_date)}/{str(milliseconds_timestamp)}-{file_name}"
# 2. 上传到 MinIO
buffer = BytesIO(video_buff)
client.put_object(
bucket_name=bucket_name,
object_name=object_name,
data=buffer,
length=len(video_buff),
content_type=content_type, # 动态设置 MIME 类型
)
print(f"视频已上传至 {bucket_name}/{object_name}(格式: {video_format.upper()}")
return object_name, "flv"
except S3Error as e:
print(f"上传视频时出错: {e}")
return None
def downFullPathFile(object_url):
'''从MinIO全路径URL下载文件并返回本地路径'''
if not object_url or not isinstance(object_url, str):
logger.error(f"Invalid URL: {object_url}")
return None
try:
# 解析URL并提取存储桶和对象键
parsed = urllib.parse.urlparse(object_url)
path_parts = parsed.path.strip("/").split("/", 1)
if len(path_parts) < 2:
logger.error(f"Invalid MinIO URL format: {object_url}")
return None
bucket_name = path_parts[0]
object_name = path_parts[1]
# 生成本地保存路径
current_dir = os.path.dirname(os.path.abspath(__file__))
download_path = os.path.join(current_dir, os.path.basename(object_name))
os.makedirs(os.path.dirname(download_path), exist_ok=True)
# 执行下载
client.fget_object(
bucket_name=bucket_name,
object_name=object_name,
file_path=download_path
)
logger.info(f"Downloaded {object_url} to {download_path}")
return download_path
except S3Error as e:
logger.error(f"MinIO API error: {e}")
except Exception as e:
logger.error(f"Download failed: {e}", exc_info=True)
return None
def check_zip_size(object_name):
"""检查MinIO中ZIP文件的大小"""
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f"
try:
stat = client.stat_object(bucket_name, object_name)
size = stat.size
logger.info(f"ZIP文件大小: {size/1024/1024:.2f}MB")
return size
except S3Error as e:
logger.error(f"获取文件大小时出错: {e}")
raise