139 lines
5.1 KiB
Python
Raw Permalink Normal View History

2025-10-09 09:29:18 +08:00
import os
import psycopg2
from miniohelp import load_config, upload_file
# ------------------ 加载数据库配置 ------------------
def load_sql_config(yaml_name):
import yaml
# 从 yaml 文件中读取 sql 配置
with open(f"{yaml_name}.yaml", "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
return config["sql"]
# ------------------ 插入单条数据到数据库 ------------------
def insert_to_database(conn, table, data):
# 定义插入 SQL
sql = f"""
INSERT INTO {table} (
id, orgcode, model, state, objectname, label
) VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
# 使用游标执行 SQL
with conn.cursor() as cur:
cur.execute(sql, data)
# 提交事务
conn.commit()
print("✅ 数据插入成功")
return True
except psycopg2.Error as e:
# 出现错误时回滚事务
conn.rollback()
print("❌ 插入失败:", e)
return False
# ------------------ 主流程:上传图片 + 插入数据库 ------------------
def upload_and_insert_images_with_labels(
yaml_name, # 配置文件名(不带 .yaml 后缀)
image_dir, # 图片目录
label_dir, # 标签目录
bucket_name, # MinIO 存储桶名
bucket_path, # MinIO 上传路径(目录)
table_name, # 数据库表名
model_name, # 模型名,用于区分不同模型的数据
orgcode="bdzl" # 机构代码,默认 bdzl
):
# 1. 初始化 MinIO 和 SQL 配置
minio_client = load_config(yaml_name) # 加载 MinIO 配置
sql_conf = load_sql_config(yaml_name) # 加载 SQL 配置
# 2. 创建数据库连接
conn = psycopg2.connect(
host=sql_conf["host"],
port=sql_conf["port"],
user=sql_conf["user"],
password=sql_conf["password"],
database=sql_conf["dbname"]
)
# 3. 查询数据库中当前最大 id 和已存在的文件
with conn.cursor() as cur:
# 查询表里当前最大 id
cur.execute(f"SELECT COALESCE(MAX(id), 0) FROM {table_name}")
max_id = cur.fetchone()[0] or 0
next_id = max_id + 1
# 查询该模型已有的文件名(仅文件名部分)
cur.execute(f"SELECT objectname FROM {table_name} WHERE model = %s", (model_name,))
existing_files = set([os.path.basename(row[0]) for row in cur.fetchall()])
# 4. 遍历图片目录
for filename in os.listdir(image_dir):
# 只处理图片类型文件
if not filename.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")):
continue
# 如果数据库已存在该文件,跳过
if filename in existing_files:
print(f"⏭️ 已存在于数据库中,跳过:{filename}")
continue
image_path = os.path.join(image_dir, filename)
name_no_ext = os.path.splitext(filename)[0]
label_path = os.path.join(label_dir, name_no_ext + ".txt")
# 标签文件不存在,跳过
if not os.path.exists(label_path):
print(f"⚠️ 未找到标签:{label_path},跳过")
continue
# 读取标签文件内容
with open(label_path, "r", encoding="utf-8") as f:
label_content = f.read().strip()
# 5. 上传图片到 MinIO
objectname = upload_file(minio_client, image_path, bucket_name, bucket_path)
if not objectname:
print(f"❌ 上传失败:{filename}")
continue
# 6. 插入数据库
try:
data = (
next_id, # id
orgcode, # 机构代码
model_name, # 模型
1, # state 固定 1
objectname, # MinIO 返回的文件路径
label_content # 标签内容
)
info = insert_to_database(conn, table_name, data)
print(info)
print(f"✅ 成功插入数据库 (id={next_id}){filename}")
next_id += 1
except Exception as e:
print(f"❌ 数据库插入失败:{filename}, error: {e}")
# 7. 提交事务并关闭连接
conn.commit()
conn.close()
print("🎉 所有图片和标签处理完成!")
# ------------------ 示例调用 ------------------
if __name__ == "__main__":
yaml_name = "config_test_dev" # 配置文件名(不含 .yaml 后缀)
image_dir = r"D:\dataset\images\train" # 图片路径
label_dir = r"D:\dataset\labels\train" # 标签路径
bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f" # MinIO 桶名
bucket_directory = "new_datasets/fence" # MinIO 内的目录
table_name = "public.aidataset" # 数据库表名
model_name = "08ff91fd-60d2-470f-9675-b18800229654" # 模型uuid
orgcode = "bdzl" # 机构代码
# 调用主函数
upload_and_insert_images_with_labels(
yaml_name, image_dir, label_dir,
bucket_name, bucket_directory,
table_name, model_name, orgcode
)