2025-07-10 10:04:45 +08:00

166 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from minio import Minio
from minio.error import S3Error
from urllib.parse import urlparse
import os
import requests
import yaml
# 读取配置并初始化 MinIO 客户端
def load_config(yaml_name):
yaml_path = f"{yaml_name}.yaml"
# 读取 YAML 配置文件
with open(yaml_path, 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
# 获取 MinIO 配置
minio_config = config["minio"]
# 初始化 MinIO 客户端
minio_client = Minio(
endpoint=minio_config["endpoint"],
access_key=minio_config["access_key"],
secret_key=minio_config["secret_key"],
secure=minio_config.get("secure", False)
)
return minio_client
def read_sql_config(yaml_name):
"""
读取 SQL 配置
"""
yaml_path = f"{yaml_name}.yaml"
with open(yaml_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
sql_config = config.get('sql')
if not sql_config:
raise ValueError("未找到 'sql' 配置块")
return sql_config
def parse_minio_url(minio_url):
"""解析 MinIO 完整 URL返回 bucket_name 和 object_name"""
path = urlparse(minio_url).path.lstrip("/")
parts = path.split("/", 1)
if len(parts) != 2:
raise ValueError("URL 格式错误,无法提取 bucket 和 object")
return parts[0], parts[1]
def create_bucket(client):
"""访问 MinIO 服务器,打印存储桶"""
try:
buckets = client.list_buckets()
for bucket in buckets:
print(f"Bucket: {bucket.name}, Created: {bucket.creation_date}")
except S3Error as e:
print(f"Error: {e}")
def downFile(client, object_name, bucket_name, local_path=None):
"""下载文件,可指定 bucket 和本地保存路径"""
current_directory = os.path.dirname(os.path.abspath(__file__))
download_path = local_path or os.path.join(current_directory, os.path.basename(object_name))
os.makedirs(os.path.dirname(download_path), exist_ok=True)
try:
client.fget_object(bucket_name, object_name, download_path)
print(f"✅ 文件已成功下载到 {download_path}")
return download_path
except S3Error as e:
print(f"❌ 下载文件时出错: {e}")
return None
def upload_file(client, file_path, bucket_name, bucket_directory):
"""上传文件到指定 MinIO 存储桶和目录"""
try:
if not client.bucket_exists(bucket_name):
print(f"❌ 存储桶 {bucket_name} 不存在")
return None
file_name = os.path.basename(file_path)
object_name = f"{bucket_directory.rstrip('/')}/{file_name}" if bucket_directory else file_name
client.fput_object(bucket_name, object_name, file_path)
print(f"✅ 文件已上传至 {bucket_name}/{object_name}")
return f"{object_name}"
except S3Error as e:
print(f"❌ 上传文件时出错: {e}")
return None
def upload_file_t(client, file_path, time_str, bucket_name, bucket_directory):
"""上传文件到指定 MinIO 存储桶和目录(带时间路径)"""
try:
if not client.bucket_exists(bucket_name):
print(f"❌ 存储桶 {bucket_name} 不存在")
return None
file_name = os.path.basename(file_path)
object_name = f"{bucket_directory.rstrip('/')}/{time_str}/{file_name}" if bucket_directory else file_name
client.fput_object(bucket_name, object_name, file_path)
print(f"✅ 文件已上传至 {bucket_name}/{object_name}")
return f"{bucket_name}/{object_name}"
except S3Error as e:
print(f"❌ 上传文件时出错: {e}")
return None
def upload_folder(client, folder_path, bucket_name, bucket_directory):
"""上传整个文件夹到指定 MinIO 存储桶目录"""
try:
if not client.bucket_exists(bucket_name):
print(f"❌ 存储桶 {bucket_name} 不存在")
return
for root, _, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
object_name = os.path.relpath(file_path, folder_path).replace("\\", "/")
if bucket_directory:
object_name = f"{bucket_directory.rstrip('/')}/{object_name}"
client.fput_object(bucket_name, object_name, file_path)
print(f"✅ 文件 {file_path} 已上传至 {bucket_name}/{object_name}")
except S3Error as e:
print(f"❌ 上传文件夹时出错: {e}")
def download_file_url(url: str, save_path: str):
"""
从指定 URL 下载文件并保存到本地路径。
:param url: 文件的 URL。
:param save_path: 本地保存路径(包括文件名)。
"""
try:
response = requests.get(url, stream=True)
response.raise_for_status() # 如果请求失败会抛出异常
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"文件已成功下载到: {save_path}")
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
if __name__ == '__main__':
# 初始化 MinIO 客户端
client = Minio(
endpoint="222.212.85.86",
access_key="adminjdskfj",
secret_key="123456ksldjfal@Y",
secure=False
)
# 示例:通过完整 URL 下载文件
full_url = "http://222.212.85.86:9000/300bdf2b-a150-406e-be63-d28bd29b409f/media/86961d3a-8790-4bc0-bfc6-dc45a8e4b9bd/DJI_202504291707_001_86961d3a-8790-4bc0-bfc6-dc45a8e4b9bd/DJI_20250429170933_0001_V.jpeg"
bucket_name, object_name = parse_minio_url(full_url)
downFile(client, object_name, bucket_name)
# 示例:上传文件
file_path = r"D:\work\develop\sample.jpg" # 修改为实际文件
upload_file(client, file_path, "300bdf2b-a150-406e-be63-d28bd29b409f", "media/output_video")
# 示例:上传文件夹
folder_path = r"D:\work\develop"
upload_folder(client, folder_path, "300bdf2b-a150-406e-be63-d28bd29b409f", "media/output_video")