import os import re import cv2 import zipfile import shutil import numpy as np from concurrent.futures import ThreadPoolExecutor # ------------------ 路面类别映射集 ------------------ CLASS_MAPS_5010 = { "asphalt": { "龟裂": 0, "块状裂缝": 1, "纵向裂缝": 2, "横向裂缝": 3, "沉陷": 4, "车辙": 5, "波浪拥包": 6, "坑槽": 7, "松散": 8, "泛油": 9, "修补": 10, }, "cream": {"破碎板": 0, "裂缝": 1, "坑洞": 2, "露骨": 3, "错台": 4, "拱起": 5}, "gravel": {"坑槽": 0, "沉陷": 1, "车辙": 2, "波浪搓板": 3}, } CLASS_MAPS_5011 = { "asphalt": { "龟裂": 0, "块状裂缝": 1, "纵向裂缝": 2, "横向裂缝": 3, "沉陷": 4, "车辙": 5, "波浪拥包": 6, "坑槽": 7, "松散": 8, "泛油": 9, "修补": 10, }, "cream": { "破碎板": 0, "裂缝": 1, "板角断裂": 2, "错台": 3, "拱起": 4, "边角剥落": 5, "接缝料损坏": 6, "坑洞": 7, "唧泥": 8, "露骨": 9, "修补": 10, }, "gravel": {}, # 可扩展 } # ------------------ 工具函数 ------------------ def detect_road_type(text: str) -> str: """检测路面类型""" low = text.lower() if "沥青" in low: return "asphalt" if "水泥" in low: return "cream" if "石子" in low or "gravel" in low: return "gravel" return "asphalt" def detect_section_type(path: str) -> str: """判断属于5010还是5011""" name = os.path.basename(path) if "5011" in name: return "5011" if "5010" in name: return "5010" # 若未明确,默认5010 return "5010" def clean_name(name): return re.sub(r'[\s\r\n\t]+', '', name) # ------------------ YOLO-Seg 主函数 ------------------ def yoloseg_to_grid_cells_fixed_v6(image_path, label_path, output_dir, class_map, extra_info=None): img = cv2.imread(image_path) if img is None: return h, w = img.shape[:2] txt_name = os.path.splitext(os.path.basename(label_path))[0] output_txt = os.path.join(output_dir, f"{txt_name}.txt") with open(label_path, "r", encoding="utf-8") as f: lines = f.readlines() with open(output_txt, "w", encoding="utf-8") as out: for line in lines: parts = line.strip().split() if len(parts) < 2: continue cls_name = parts[-1] cls_id = class_map.get(cls_name, -1) if cls_id == -1: print(f"未识别类别: {cls_name}") continue coords = list(map(float, parts[:-1])) coords = [round(c, 4) for c in coords] out.write(f"{cls_id} {' '.join(map(str, coords))} {extra_info or ''}\n") # ------------------ 批处理函数 ------------------ def process_zip_yoloseg_with_draw(zip_path, output_root): temp_dir = "temp_extract" if os.path.exists(temp_dir): shutil.rmtree(temp_dir) os.makedirs(temp_dir, exist_ok=True) # 解压 with zipfile.ZipFile(zip_path, 'r') as zf: zf.extractall(temp_dir) image_files = [] for root, _, files in os.walk(temp_dir): for f in files: if f.lower().endswith((".jpg", ".png", ".jpeg")): image_files.append(os.path.join(root, f)) print(f"检测到 {len(image_files)} 张图片") def process_one(img_path): name = os.path.splitext(os.path.basename(img_path))[0] label_path = os.path.join(os.path.dirname(img_path), f"{name}.txt") if not os.path.exists(label_path): print(f"缺少标签: {img_path}") return # 判断5010或5011 section_type = detect_section_type(img_path) # 读取标签判断路面类型 with open(label_path, "r", encoding="utf-8") as f: txt = f.read() road_type = detect_road_type(txt) # 选择对应类别映射表 if section_type == "5011": class_map = CLASS_MAPS_5011.get(road_type, {}) else: class_map = CLASS_MAPS_5010.get(road_type, {}) if not class_map: print(f"无映射表: {road_type} ({section_type})") return out_dir = os.path.join(output_root, road_type, name) os.makedirs(out_dir, exist_ok=True) extra_info = f"{section_type}_{road_type}" yoloseg_to_grid_cells_fixed_v6(img_path, label_path, out_dir, class_map, extra_info=extra_info) with ThreadPoolExecutor(max_workers=6) as ex: list(ex.map(process_one, image_files)) shutil.rmtree(temp_dir) print("处理完成") # ------------------ 主入口 ------------------ if __name__ == "__main__": zip_path = r"D:\work\data\road_seg.zip" # 输入zip路径 output_root = r"D:\work\data\output" # 输出根目录 process_zip_yoloseg_with_draw(zip_path, output_root)