# import asyncio # import os.path # import shutil # import sys # import threading # import time # from pathlib import Path # # from hachoir.parser.image.iptc import datasets # # from middleware.minio_util import downFullPathFile # from middleware.query_model import ModelConfigDAO # import yaml # import multiprocessing # import torch # from ultralytics import YOLO # # # async def download_train(task_id: str, bz_training_task_id: int, pt_name: str): # # DB_CONFIG = { # "dbname": "smart_dev_123", # "user": "postgres", # "password": "root", # "host": "8.137.54.85", # "port": "5060" # } # # # 创建DAO实例 # dao = ModelConfigDAO(DB_CONFIG) # time_ns=time.time_ns() # output_root=f"dataset-{time_ns}" # if not os.path.exists(output_root): # os.mkdir(output_root) # list_labels = dao.get_labels(bz_training_task_id) # list_datasets = dao.get_datasets(bz_training_task_id) # label_yaml_list = dao.get_label_yaml(bz_training_task_id) # # # 定义数据结构(字典) # uavid_config = { # # "path": "", # 替换为你的绝对路径 # "train": "images/train", # 训练集路径 # "val": "images/val", # 验证集路径 # "test": "images/test", # 测试集路径(可选) # "names": {} # } # uavid_config["path"]=os.path.abspath(output_root) # for i,item in enumerate(label_yaml_list): # item.id_order=i # uavid_config["names"][f"{i}"]=item.e_name # # 生成 YAML 文件 # data_yaml="data.yaml" # with open(data_yaml, "w", encoding="utf-8") as f: # yaml.dump( # uavid_config, # f, # default_flow_style=False, # 禁用紧凑格式(保持多行) # allow_unicode=True, # 允许 Unicode 字符 # sort_keys=False # 保持键的顺序 # ) # file_name = os.path.basename(data_yaml) # des_path = os.path.join(output_root, file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_yaml, output_root) # print(f"output_rootoutput_rootoutput_rootoutput_root {os.path.abspath(output_root)}") # # # print("YAML 文件已生成:uavid_config.yaml") # # invalid_indices = [] # # for index, pic in enumerate(list_datasets): # if pic.resource_original_path: # 图像路径有效 # download_path = downFullPathFile(pic.resource_original_path) # if download_path: # 下载成功 # pic.local_path = download_path # # pic.label_name = Path(download_path).stem # 截取图片名称,用作标签 # else: # invalid_indices.append(index) #存储不符合条件的索引,准备删除 # else: # invalid_indices.append(index)#存储不符合条件的索引,准备删除 # # # # 从后往前删除(避免删除时索引错乱),删除不符合条件的list_datasets # # for idx in sorted(invalid_indices, reverse=True): # del list_datasets[idx] # # for data_pic in list_datasets: #整理完整的图像与标签集的对应关系 # for label in list_labels: # if data_pic.id == label.id: # for item in label_yaml_list: # if label.label_ids==item.id: # data_pic.label_content=data_pic.label_content+item.id_order+" "+label.annotation_data+ '\n' # # # for data_pic in list_datasets: # label_txt = f"{data_pic.label_name}.txt" # with open(label_txt, 'w', encoding='utf-8') as f: # f.write(data_pic.label_content) # data_pic.label_txt_path=os.path.abspath(label_txt) # # 移动文件,制作数据集 # # # dataset_dirs = { # "images": Path(output_root) / "images", # "labels": Path(output_root) / "labels" # } # for ds_dir in dataset_dirs.values(): # (ds_dir / "val").mkdir(parents=True, exist_ok=True) # (ds_dir / "train").mkdir(parents=True, exist_ok=True) # (ds_dir / "test").mkdir(parents=True, exist_ok=True) # # count_pic=0 # for data_pic in list_datasets: # count_pic=count_pic+1 # if count_pic%10<8: # images_train_path=dataset_dirs["images"] # image_dir=os.path.join(images_train_path,"train") # file_name=os.path.basename(data_pic.local_path) # des_path=os.path.join(image_dir,file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_pic.local_path, image_dir) # # # labels_train_path=dataset_dirs["labels"] # label_dir=os.path.join(labels_train_path,"train") # # file_name=os.path.basename(data_pic.label_txt_path) # des_path=os.path.join(label_dir,file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_pic.label_txt_path, label_dir) # if count_pic%10==8: # images_val_path=dataset_dirs["images"] # image_dir=os.path.join(images_val_path,"val") # file_name=os.path.basename(data_pic.local_path) # des_path=os.path.join(image_dir,file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_pic.local_path, image_dir) # # # labels_val_path=dataset_dirs["labels"] # label_dir=os.path.join(labels_val_path,"val") # file_name=os.path.basename(data_pic.label_txt_path) # des_path=os.path.join(label_dir,file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_pic.label_txt_path, label_dir) # # if count_pic%10==9: # images_test_path=dataset_dirs["images"] # image_dir=os.path.join(images_test_path,"test") # file_name=os.path.basename(data_pic.local_path) # des_path=os.path.join(image_dir,file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_pic.local_path, image_dir) # # # labels_test_path=dataset_dirs["labels"] # label_dir=os.path.join(labels_test_path,"test") # file_name=os.path.basename(data_pic.label_txt_path) # des_path=os.path.join(label_dir,file_name) # if os.path.exists(des_path): # os.remove(des_path) # shutil.move(data_pic.label_txt_path, label_dir) # # # # custom_config = { # "epochs": 50, # 快速测试用 # "batch_size": 4, # } # # # 启动后台训练 # pid = await run_background_training( # dataset_dir=output_root, # weight_name=pt_name, # config_overrides=custom_config # ) # # print(f"pid--{pid}") # dao.insert_train_pid(task_id,train_pid=pid) # # # def train_model(dataset_dir,weight_name="best_segmentation_model.pt", config_overrides=None): # """ # 训练模型并保存权重 # :param weight_name: 自定义权重文件名(如 "uavid_seg_v1.pt") # :param config_overrides: 覆盖默认配置的字典(可选) # """ # # 合并配置(允许通过参数覆盖默认配置) # # # 默认配置(可通过函数参数覆盖) # DEFAULT_CONFIG = { # "model": "pt/yolo11s-seg.pt", # "pretrained": True, # "data": os.path.join(dataset_dir, "data.yaml"), # 关键修改:指向 data.yaml # "project": "UAVid_Segmentation", # "name": "v1.5_official", # "epochs": 1000, # "batch_size": 8, # "img_size": 640, # "workers": 4, # "optimizer": "SGD", # "lr0": 0.01, # "lrf": 0.01, # "momentum": 0.9, # "weight_decay": 0.0005, # "augment": True, # "hyp": { # "mosaic": 0.5, # "copy_paste": 0.2, # "mixup": 0.15, # }, # } # # config = DEFAULT_CONFIG.copy() # if config_overrides: # config.update(config_overrides) # # # 初始化模型 # model = YOLO(config["model"]) # # # 开始训练 # results = model.train( # data=config["data"], # project=config["project"], # name=config["name"], # epochs=config["epochs"], # batch=config["batch_size"], # imgsz=config["img_size"], # workers=config["workers"], # optimizer=config["optimizer"], # lr0=config["lr0"], # lrf=config["lrf"], # momentum=config["momentum"], # weight_decay=config["weight_decay"], # augment=config["augment"], # device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), # ) # # # 验证模型 # metrics = model.val() # print(f"Validation mAP: {metrics.box_map:.2f} (box), {metrics.seg_map:.2f} (mask)") # # # 保存最佳模型(使用自定义名称) # best_model = results.best_model # torch.save(best_model, weight_name) # print(f"Best model saved to: {weight_name}") # # # # def run_background_training(output_root: str, weight_name="best_segmentation_model.pt", config_overrides=None): # # """使用 spawn 上下文启动进程""" # # ctx = multiprocessing.get_context('spawn') # # process = ctx.Process( # # target=train_model, # # args=(output_root, weight_name, config_overrides), # # daemon=False # # ) # # process.start() # # return process.pid # # import asyncio # # async def run_background_training(dataset_dir, weight_name, config_overrides=None): # """异步启动训练进程""" # process = await asyncio.create_subprocess_exec( # sys.executable, # "train_worker.py", # "--dataset", dataset_dir, # "--weight", weight_name, # "--epochs", str(config_overrides.get("epochs", 50)), # "--batch", str(config_overrides.get("batch", 4)), # stdout=asyncio.subprocess.PIPE, # stderr=asyncio.subprocess.PIPE, # ) # return process.pid import asyncio import os.path import shutil import sys import threading import time import subprocess import json from pathlib import Path from middleware.minio_util import downFullPathFile from middleware.query_model import ModelConfigDAO import yaml import torch from ultralytics import YOLO async def download_train(task_id: str, bz_training_task_id: int, pt_name: str): """ 下载训练数据并启动训练 这个函数负责准备数据,然后使用线程+subprocess创建独立进程执行训练 """ try: current_pid = os.getpid() print(f"Starting download and training for task {task_id} in process {current_pid}") DB_CONFIG = { "dbname": "smart_dev_123", "user": "postgres", "password": "root", "host": "8.137.54.85", "port": "5060" } # 创建DAO实例 dao = ModelConfigDAO(DB_CONFIG) time_ns = time.time_ns() output_root = f"dataset/dataset-{time_ns}" try: if not os.path.exists(output_root): os.mkdir(output_root) print(f"Created output directory: {output_root}") except Exception as e: print(f"Failed to create output directory: {e}") raise try: # 获取标签和数据集信息 list_labels = dao.get_labels(bz_training_task_id) list_datasets = dao.get_datasets(bz_training_task_id) label_yaml_list = dao.get_label_yaml(bz_training_task_id) print( f"Retrieved {len(list_labels)} labels, {len(list_datasets)} datasets, {len(label_yaml_list)} label configs") except Exception as e: print(f"Failed to retrieve data from database: {e}") raise # 定义数据结构(字典) uavid_config = { "path": "", # 替换为你的绝对路径 "train": "images/train", # 训练集路径 "val": "images/val", # 验证集路径 "test": "images/test", # 测试集路径(可选) "names": {} } try: uavid_config["path"] = os.path.abspath(output_root) for i, item in enumerate(label_yaml_list): item.id_order = i uavid_config["names"][f"{i}"] = item.e_name # 生成 YAML 文件 data_yaml = "data.yaml" with open(data_yaml, "w", encoding="utf-8") as f: yaml.dump( uavid_config, f, default_flow_style=False, # 禁用紧凑格式(保持多行) allow_unicode=True, # 允许 Unicode 字符 sort_keys=False # 保持键的顺序 ) file_name = os.path.basename(data_yaml) des_path = os.path.join(output_root, file_name) if os.path.exists(des_path): os.remove(des_path) shutil.move(data_yaml, output_root) print(f"Generated YAML config at: {os.path.abspath(output_root)}") except Exception as e: print(f"Failed to generate YAML config: {e}") raise # 下载数据集 invalid_indices = [] try: for index, pic in enumerate(list_datasets): if hasattr(pic, 'resource_original_path') and pic.resource_original_path: # 图像路径有效 try: download_path = downFullPathFile(pic.resource_original_path) if download_path: # 下载成功 pic.local_path = download_path pic.label_name = Path(download_path).stem # 截取图片名称,用作标签 print(f"Downloaded file: {download_path}") else: invalid_indices.append(index) # 存储不符合条件的索引,准备删除 print(f"Failed to download file: {pic.resource_original_path}") except Exception as e: invalid_indices.append(index) print(f"Error downloading file {pic.resource_original_path}: {e}") else: invalid_indices.append(index) # 存储不符合条件的索引,准备删除 except Exception as e: print(f"Error processing datasets: {e}") raise # 从后往前删除(避免删除时索引错乱),删除不符合条件的list_datasets try: for idx in sorted(invalid_indices, reverse=True): del list_datasets[idx] print(f"Filtered datasets: {len(list_datasets)} valid items remaining") except Exception as e: print(f"Error filtering datasets: {e}") raise # 整理标签内容 try: for data_pic in list_datasets: # 整理完整的图像与标签集的对应关系 for label in list_labels: if hasattr(data_pic, 'id') and hasattr(label, 'id') and data_pic.id == label.id: for item in label_yaml_list: if hasattr(label, 'label_ids') and hasattr(item, 'id') and label.label_ids == item.id: # 假设label有annotation_data属性 annotation = getattr(label, 'annotation_data', '') current_content = getattr(data_pic, 'label_content', '') data_pic.label_content = f"{current_content}{item.id_order} {annotation}\n" except Exception as e: print(f"Error organizing labels: {e}") raise # 创建标签文件 try: for data_pic in list_datasets: if hasattr(data_pic, 'label_name'): label_txt = f"{data_pic.label_name}.txt" with open(label_txt, 'w', encoding='utf-8') as f: f.write(getattr(data_pic, 'label_content', '')) data_pic.label_txt_path = os.path.abspath(label_txt) print(f"Created label file: {label_txt}") except Exception as e: print(f"Error creating label files: {e}") raise # 移动文件,制作数据集 try: dataset_dirs = { "images": Path(output_root) / "images", "labels": Path(output_root) / "labels" } for ds_dir in dataset_dirs.values(): (ds_dir / "val").mkdir(parents=True, exist_ok=True) (ds_dir / "train").mkdir(parents=True, exist_ok=True) (ds_dir / "test").mkdir(parents=True, exist_ok=True) print("Created dataset directory structure") except Exception as e: print(f"Error creating dataset directories: {e}") raise # 分配数据集到训练、验证、测试集 try: count_pic = 0 for data_pic in list_datasets: count_pic += 1 # 80% 训练集, 10% 验证集, 10% 测试集 if count_pic % 10 < 8: split = "train" elif count_pic % 10 == 8: split = "val" else: # count_pic % 10 == 9 split = "test" # 移动图像文件 if hasattr(data_pic, 'local_path') and os.path.exists(data_pic.local_path): images_path = dataset_dirs["images"] image_dir = os.path.join(images_path, split) file_name = os.path.basename(data_pic.local_path) des_path = os.path.join(image_dir, file_name) if os.path.exists(des_path): os.remove(des_path) shutil.move(data_pic.local_path, image_dir) # 移动标签文件 if hasattr(data_pic, 'label_txt_path') and os.path.exists(data_pic.label_txt_path): labels_path = dataset_dirs["labels"] label_dir = os.path.join(labels_path, split) file_name = os.path.basename(data_pic.label_txt_path) des_path = os.path.join(label_dir, file_name) if os.path.exists(des_path): os.remove(des_path) shutil.move(data_pic.label_txt_path, label_dir) print(f"Organized {count_pic} files into dataset splits") except Exception as e: print(f"Error organizing dataset splits: {e}") raise # 训练配置 custom_config = { "epochs": 50, # 快速测试用 "batch_size": 4, "workers": 0, # 禁用多进程数据加载 } # 保存训练配置到文件 config_file = f"train_config_{task_id}.json" with open(config_file, 'w', encoding='utf-8') as f: json.dump({ 'dataset_dir': output_root, 'pt_name': pt_name, 'config_overrides': custom_config, 'db_config': DB_CONFIG, 'task_id': task_id }, f) print(f"Training data preparation completed for task {task_id}") # 在Windows上使用线程+subprocess创建训练进程 # 避免使用asyncio.create_subprocess_exec loop = asyncio.get_event_loop() training_pid = await loop.run_in_executor( None, # 使用默认的线程池 start_training_process, config_file ) if training_pid: print(f"pid--{training_pid}") dao.insert_train_pid(task_id, train_pid=training_pid) return training_pid else: raise Exception("Failed to start training process") except Exception as e: print(f"Training failed for task {task_id}: {e}", exc_info=True) raise def start_training_process(config_file: str) -> int: """ 在独立线程中启动训练进程 使用subprocess.Popen创建训练进程 """ try: # 创建训练脚本内容 train_script = ''' import sys import json import os import torch from ultralytics import YOLO class MockModelConfigDAO: def __init__(self, db_config): self.db_config = db_config def insert_train_pid(self, task_id, train_pid): print(f"Inserted training PID {train_pid} for task {task_id}") def train_model(dataset_dir, weight_name="best_segmentation_model.pt", config_overrides=None): """ 训练模型并保存权重 """ try: current_pid = os.getpid() print(f"Starting model training in process {current_pid} with dataset: {dataset_dir}") # 默认配置(可通过参数覆盖) DEFAULT_CONFIG = { "model": "pt/yolo11s-seg.pt", "pretrained": True, "data": os.path.join(dataset_dir, "data.yaml"), "project": "UAVid_Segmentation", "name": "v1.5_official", "epochs": 1000, "batch_size": 8, "img_size": 640, "workers": 0, # 禁用多进程数据加载 "optimizer": "SGD", "lr0": 0.01, "lrf": 0.01, "momentum": 0.9, "weight_decay": 0.0005, "augment": True, "hyp": { "mosaic": 0.5, "copy_paste": 0.2, "mixup": 0.15, }, } config = DEFAULT_CONFIG.copy() if config_overrides: config.update(config_overrides) print(f"Training config: {config}") # 检查数据配置文件 data_path = config["data"] if not os.path.exists(data_path): raise FileNotFoundError(f"Data configuration file not found: {data_path}") # 初始化模型 model = YOLO(config["model"]) print(f"Model initialized with: {config["model"]}") # 开始训练 results = model.train( data=config["data"], project=config["project"], name=config["name"], epochs=config["epochs"], batch=config["batch_size"], imgsz=config["img_size"], workers=config["workers"], optimizer=config["optimizer"], lr0=config["lr0"], lrf=config["lrf"], momentum=config["momentum"], weight_decay=config["weight_decay"], augment=config["augment"], device=torch.device("cuda" if torch.cuda.is_available() else "cpu"), ) print(f"Training completed successfully in process {current_pid}") # 验证模型 metrics = model.val() print(f"Validation mAP: {metrics.box:.2f} (box), {metrics.seg:.2f} (mask)") # 保存最佳模型 try: if hasattr(results, 'best') and results.best: best_model_path = results.best if os.path.exists(best_model_path): import shutil shutil.copy2(best_model_path, weight_name) print(f"Best model saved to: {os.path.abspath(weight_name)}") else: torch.save(model.state_dict(), weight_name) print(f"Best model path not found, saved state dict to: {weight_name}") else: torch.save(model.state_dict(), weight_name) print(f"Saved model state dict to: {weight_name}") except Exception as e: print(f"Warning: Failed to save best model: {e}") torch.save(model.state_dict(), weight_name) print(f"Fallback: Saved model state dict to: {weight_name}") return True except Exception as e: print(f"Model training failed in process {os.getpid()}: {e}", exc_info=True) raise def main(): if len(sys.argv) != 2: print("Usage: python -c '