166 lines
6.0 KiB
Python
166 lines
6.0 KiB
Python
from minio import Minio
|
||
from minio.error import S3Error
|
||
from urllib.parse import urlparse
|
||
import os
|
||
import requests
|
||
import yaml
|
||
|
||
# 读取配置并初始化 MinIO 客户端
|
||
def load_config(yaml_name):
|
||
yaml_path = f"{yaml_name}.yaml"
|
||
# 读取 YAML 配置文件
|
||
with open(yaml_path, 'r', encoding='utf-8') as file:
|
||
config = yaml.safe_load(file)
|
||
|
||
# 获取 MinIO 配置
|
||
minio_config = config["minio"]
|
||
|
||
# 初始化 MinIO 客户端
|
||
minio_client = Minio(
|
||
endpoint=minio_config["endpoint"],
|
||
access_key=minio_config["access_key"],
|
||
secret_key=minio_config["secret_key"],
|
||
secure=minio_config.get("secure", False)
|
||
)
|
||
|
||
return minio_client
|
||
|
||
def read_sql_config(yaml_name):
|
||
"""
|
||
读取 SQL 配置
|
||
"""
|
||
yaml_path = f"{yaml_name}.yaml"
|
||
with open(yaml_path, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
sql_config = config.get('sql')
|
||
if not sql_config:
|
||
raise ValueError("未找到 'sql' 配置块")
|
||
return sql_config
|
||
|
||
def parse_minio_url(minio_url):
|
||
"""解析 MinIO 完整 URL,返回 bucket_name 和 object_name"""
|
||
path = urlparse(minio_url).path.lstrip("/")
|
||
parts = path.split("/", 1)
|
||
if len(parts) != 2:
|
||
raise ValueError("URL 格式错误,无法提取 bucket 和 object")
|
||
return parts[0], parts[1]
|
||
|
||
def create_bucket(client):
|
||
"""访问 MinIO 服务器,打印存储桶"""
|
||
try:
|
||
buckets = client.list_buckets()
|
||
for bucket in buckets:
|
||
print(f"Bucket: {bucket.name}, Created: {bucket.creation_date}")
|
||
except S3Error as e:
|
||
print(f"Error: {e}")
|
||
|
||
def downFile(client, object_name, bucket_name, local_path=None):
|
||
"""下载文件,可指定 bucket 和本地保存路径"""
|
||
current_directory = os.path.dirname(os.path.abspath(__file__))
|
||
download_path = local_path or os.path.join(current_directory, os.path.basename(object_name))
|
||
os.makedirs(os.path.dirname(download_path), exist_ok=True)
|
||
|
||
try:
|
||
client.fget_object(bucket_name, object_name, download_path)
|
||
print(f"✅ 文件已成功下载到 {download_path}")
|
||
return download_path
|
||
except S3Error as e:
|
||
print(f"❌ 下载文件时出错: {e}")
|
||
return None
|
||
|
||
def upload_file(client, file_path, bucket_name, bucket_directory):
|
||
"""上传文件到指定 MinIO 存储桶和目录"""
|
||
try:
|
||
if not client.bucket_exists(bucket_name):
|
||
print(f"❌ 存储桶 {bucket_name} 不存在")
|
||
return None
|
||
|
||
file_name = os.path.basename(file_path)
|
||
object_name = f"{bucket_directory.rstrip('/')}/{file_name}" if bucket_directory else file_name
|
||
|
||
client.fput_object(bucket_name, object_name, file_path)
|
||
print(f"✅ 文件已上传至 {bucket_name}/{object_name}")
|
||
return f"{object_name}"
|
||
except S3Error as e:
|
||
print(f"❌ 上传文件时出错: {e}")
|
||
return None
|
||
|
||
def upload_file_t(client, file_path, time_str, bucket_name, bucket_directory):
|
||
"""上传文件到指定 MinIO 存储桶和目录(带时间路径)"""
|
||
try:
|
||
if not client.bucket_exists(bucket_name):
|
||
print(f"❌ 存储桶 {bucket_name} 不存在")
|
||
return None
|
||
|
||
file_name = os.path.basename(file_path)
|
||
object_name = f"{bucket_directory.rstrip('/')}/{time_str}/{file_name}" if bucket_directory else file_name
|
||
|
||
client.fput_object(bucket_name, object_name, file_path)
|
||
print(f"✅ 文件已上传至 {bucket_name}/{object_name}")
|
||
return f"{bucket_name}/{object_name}"
|
||
except S3Error as e:
|
||
print(f"❌ 上传文件时出错: {e}")
|
||
return None
|
||
|
||
def upload_folder(client, folder_path, bucket_name, bucket_directory):
|
||
"""上传整个文件夹到指定 MinIO 存储桶目录"""
|
||
try:
|
||
if not client.bucket_exists(bucket_name):
|
||
print(f"❌ 存储桶 {bucket_name} 不存在")
|
||
return
|
||
|
||
for root, _, files in os.walk(folder_path):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
object_name = os.path.relpath(file_path, folder_path).replace("\\", "/")
|
||
|
||
if bucket_directory:
|
||
object_name = f"{bucket_directory.rstrip('/')}/{object_name}"
|
||
|
||
client.fput_object(bucket_name, object_name, file_path)
|
||
print(f"✅ 文件 {file_path} 已上传至 {bucket_name}/{object_name}")
|
||
except S3Error as e:
|
||
print(f"❌ 上传文件夹时出错: {e}")
|
||
|
||
def download_file_url(url: str, save_path: str):
|
||
"""
|
||
从指定 URL 下载文件并保存到本地路径。
|
||
|
||
:param url: 文件的 URL。
|
||
:param save_path: 本地保存路径(包括文件名)。
|
||
"""
|
||
try:
|
||
response = requests.get(url, stream=True)
|
||
response.raise_for_status() # 如果请求失败会抛出异常
|
||
|
||
with open(save_path, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
print(f"文件已成功下载到: {save_path}")
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"下载失败: {e}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 初始化 MinIO 客户端
|
||
client = Minio(
|
||
endpoint="222.212.85.86",
|
||
access_key="adminjdskfj",
|
||
secret_key="123456ksldjfal@Y",
|
||
secure=False
|
||
)
|
||
|
||
# 示例:通过完整 URL 下载文件
|
||
full_url = "http://222.212.85.86:9000/300bdf2b-a150-406e-be63-d28bd29b409f/media/86961d3a-8790-4bc0-bfc6-dc45a8e4b9bd/DJI_202504291707_001_86961d3a-8790-4bc0-bfc6-dc45a8e4b9bd/DJI_20250429170933_0001_V.jpeg"
|
||
bucket_name, object_name = parse_minio_url(full_url)
|
||
downFile(client, object_name, bucket_name)
|
||
|
||
# 示例:上传文件
|
||
file_path = r"D:\work\develop\sample.jpg" # 修改为实际文件
|
||
upload_file(client, file_path, "300bdf2b-a150-406e-be63-d28bd29b409f", "media/output_video")
|
||
|
||
# 示例:上传文件夹
|
||
folder_path = r"D:\work\develop"
|
||
upload_folder(client, folder_path, "300bdf2b-a150-406e-be63-d28bd29b409f", "media/output_video")
|