import os import psycopg2 from miniohelp import load_config, upload_file # ------------------ 加载数据库配置 ------------------ def load_sql_config(yaml_name): import yaml # 从 yaml 文件中读取 sql 配置 with open(f"{yaml_name}.yaml", "r", encoding="utf-8") as f: config = yaml.safe_load(f) return config["sql"] # ------------------ 插入单条数据到数据库 ------------------ def insert_to_database(conn, table, data): # 定义插入 SQL sql = f""" INSERT INTO {table} ( id, orgcode, model, state, objectname, label ) VALUES (%s, %s, %s, %s, %s, %s) """ try: # 使用游标执行 SQL with conn.cursor() as cur: cur.execute(sql, data) # 提交事务 conn.commit() print("✅ 数据插入成功") return True except psycopg2.Error as e: # 出现错误时回滚事务 conn.rollback() print("❌ 插入失败:", e) return False # ------------------ 主流程:上传图片 + 插入数据库 ------------------ def upload_and_insert_images_with_labels( yaml_name, # 配置文件名(不带 .yaml 后缀) image_dir, # 图片目录 label_dir, # 标签目录 bucket_name, # MinIO 存储桶名 bucket_path, # MinIO 上传路径(目录) table_name, # 数据库表名 model_name, # 模型名,用于区分不同模型的数据 orgcode="bdzl" # 机构代码,默认 bdzl ): # 1. 初始化 MinIO 和 SQL 配置 minio_client = load_config(yaml_name) # 加载 MinIO 配置 sql_conf = load_sql_config(yaml_name) # 加载 SQL 配置 # 2. 创建数据库连接 conn = psycopg2.connect( host=sql_conf["host"], port=sql_conf["port"], user=sql_conf["user"], password=sql_conf["password"], database=sql_conf["dbname"] ) # 3. 查询数据库中当前最大 id 和已存在的文件 with conn.cursor() as cur: # 查询表里当前最大 id cur.execute(f"SELECT COALESCE(MAX(id), 0) FROM {table_name}") max_id = cur.fetchone()[0] or 0 next_id = max_id + 1 # 查询该模型已有的文件名(仅文件名部分) cur.execute(f"SELECT objectname FROM {table_name} WHERE model = %s", (model_name,)) existing_files = set([os.path.basename(row[0]) for row in cur.fetchall()]) # 4. 遍历图片目录 for filename in os.listdir(image_dir): # 只处理图片类型文件 if not filename.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")): continue # 如果数据库已存在该文件,跳过 if filename in existing_files: print(f"⏭️ 已存在于数据库中,跳过:{filename}") continue image_path = os.path.join(image_dir, filename) name_no_ext = os.path.splitext(filename)[0] label_path = os.path.join(label_dir, name_no_ext + ".txt") # 标签文件不存在,跳过 if not os.path.exists(label_path): print(f"⚠️ 未找到标签:{label_path},跳过") continue # 读取标签文件内容 with open(label_path, "r", encoding="utf-8") as f: label_content = f.read().strip() # 5. 上传图片到 MinIO objectname = upload_file(minio_client, image_path, bucket_name, bucket_path) if not objectname: print(f"❌ 上传失败:{filename}") continue # 6. 插入数据库 try: data = ( next_id, # id orgcode, # 机构代码 model_name, # 模型 1, # state 固定 1 objectname, # MinIO 返回的文件路径 label_content # 标签内容 ) info = insert_to_database(conn, table_name, data) print(info) print(f"✅ 成功插入数据库 (id={next_id}):{filename}") next_id += 1 except Exception as e: print(f"❌ 数据库插入失败:{filename}, error: {e}") # 7. 提交事务并关闭连接 conn.commit() conn.close() print("🎉 所有图片和标签处理完成!") # ------------------ 示例调用 ------------------ if __name__ == "__main__": yaml_name = "config_test_dev" # 配置文件名(不含 .yaml 后缀) image_dir = r"D:\dataset\images\train" # 图片路径 label_dir = r"D:\dataset\labels\train" # 标签路径 bucket_name = "300bdf2b-a150-406e-be63-d28bd29b409f" # MinIO 桶名 bucket_directory = "new_datasets/fence" # MinIO 内的目录 table_name = "public.aidataset" # 数据库表名 model_name = "08ff91fd-60d2-470f-9675-b18800229654" # 模型uuid orgcode = "bdzl" # 机构代码 # 调用主函数 upload_and_insert_images_with_labels( yaml_name, image_dir, label_dir, bucket_name, bucket_directory, table_name, model_name, orgcode )