import os import re import cv2 import numpy as np import shutil from concurrent.futures import ThreadPoolExecutor # ------------------ 工具函数 ------------------ def clean_filename(name): name = name.strip() name = re.sub(r'[\s\r\n\t]+', '', name) return name.lower() def num_to_coord(num, cols, cell_width, cell_height, offset=1): n = num - 1 + offset r = n // cols c = n % cols x1 = c * cell_width y1 = r * cell_height x2 = x1 + cell_width y2 = y1 + cell_height return x1, y1, x2, y2 def polygon_to_yolo(poly, img_width, img_height): flat = [coord for point in poly for coord in point] return [flat[i] / (img_width if i % 2 == 0 else img_height) for i in range(len(flat))] def convex_hull_poly(points): if not points: return [] pts = np.array(points, dtype=np.int32) hull = cv2.convexHull(pts) return hull.reshape(-1, 2).tolist() def expand_polygon(poly, expand_px=3): """多边形顶点膨胀 expand_px 像素""" if not poly: return [] pts = np.array(poly, dtype=np.int32) cx = np.mean(pts[:, 0]) cy = np.mean(pts[:, 1]) vec = pts - np.array([[cx, cy]]) norm = np.linalg.norm(vec, axis=1, keepdims=True) norm[norm == 0] = 1 vec_unit = vec / norm pts_expanded = pts + (vec_unit * expand_px).astype(int) return pts_expanded.tolist() color_map = { 0: (0, 255, 255), 1: (255, 0, 255), 2: (0, 255, 0), 3: (255, 0, 0), 4: (0, 0, 255), 5: (255, 255, 0), 6: (128, 128, 0), 7: (128, 0, 128), 8: (0, 128, 128), 9: (128, 128, 128), 10: (0, 0, 128), 11: (0, 128, 0) } # ------------------ 匹配图片 ------------------ def find_matching_image(txt_path, input_root): txt_name = os.path.basename(txt_path).lower() base_name = re.sub(r'(_partclass)?\.txt$', '', txt_name) base_name = re.sub(r'\.jpg$', '', base_name) for root, _, files in os.walk(input_root): for f in files: if f.lower().endswith((".jpg", ".jpeg", ".png")): img_base = os.path.splitext(f)[0].lower() if base_name == img_base: return os.path.join(root, f) return None # ------------------ 原有处理函数(带多边形膨胀) ------------------ def process_pixel_txt(img_path, txt_path, class_map, output_root): image = cv2.imread(img_path) if image is None: return False h, w = image.shape[:2] vis_img = image.copy() yolo_labels = [] unknown_labels = set() with open(txt_path, "r", encoding="utf-8") as f: for line in f: parts = line.strip().split() if len(parts) < 5: continue try: x, y, w_box, h_box = map(int, parts[:4]) except: continue label = parts[4] cls_id = class_map.get(label, -1) if cls_id == -1: unknown_labels.add(label) continue poly = [(x, y), (x + w_box, y), (x + w_box, y + h_box), (x, y + h_box)] hull = convex_hull_poly(poly) hull = expand_polygon(hull, expand_px=3) yolo_labels.append(f"{cls_id} " + " ".join(map(str, polygon_to_yolo(hull, w, h)))) cv2.polylines(vis_img, [np.array(hull, np.int32)], True, color=color_map.get(cls_id, (255, 255, 255)), thickness=2) if unknown_labels: print(f"⚠️ 未知类别 {unknown_labels} 在文件: {txt_path}") if not yolo_labels: return False base = os.path.splitext(os.path.basename(img_path))[0] folder_img = os.path.join(output_root, "images") os.makedirs(folder_img, exist_ok=True) shutil.copy2(img_path, os.path.join(folder_img, os.path.basename(img_path))) folder_labels = os.path.join(output_root, "labels") os.makedirs(folder_labels, exist_ok=True) with open(os.path.join(folder_labels, base + ".txt"), "w", encoding="utf-8") as f: f.write("\n".join(yolo_labels)) folder_vis = os.path.join(output_root, "visual") os.makedirs(folder_vis, exist_ok=True) cv2.imwrite(os.path.join(folder_vis, base + "-visual.jpg"), vis_img) print(f"✅ 已处理 Pixel TXT: {base}") return True def process_grid_txt(img_path, txt_path, class_map, output_root): image = cv2.imread(img_path) if image is None: return False h, w = image.shape[:2] cell_width, cell_height = 108, 102 cols = max(1, w // cell_width) vis_img = image.copy() overlay = image.copy() alpha = 0.5 yolo_labels = [] with open(txt_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue numbers = re.findall(r"(\d+)(?=-|$)", line.split()[-1]) numbers = [int(n) for n in numbers] cname = None for key in class_map.keys(): if line.startswith(key): cname = key break if cname is None or not numbers: continue for num in numbers: x1, y1, x2, y2 = num_to_coord(num, cols, cell_width, cell_height) cv2.rectangle(overlay, (x1, y1), (x2, y2), color_map.get(class_map[cname], (128, 128, 128)), -1) cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0, image) points = [] for num in numbers: x1, y1, x2, y2 = num_to_coord(num, cols, cell_width, cell_height) points.extend([(x1, y1), (x2, y1), (x2, y2), (x1, y2)]) hull = convex_hull_poly(points) hull = expand_polygon(hull, expand_px=3) cls_id = class_map[cname] pts = np.array(hull, np.int32).reshape((-1, 1, 2)) cv2.polylines(vis_img, [pts], True, color_map.get(cls_id, (128, 128, 128)), 2) yolo_labels.append(f"{cls_id} " + " ".join(map(str, polygon_to_yolo(hull, w, h)))) if not yolo_labels: return False base = os.path.splitext(os.path.basename(img_path))[0] for folder_name in ["images", "labels", "visual", "highlighted"]: folder = os.path.join(output_root, folder_name) os.makedirs(folder, exist_ok=True) shutil.copy2(img_path, os.path.join(output_root, "images", os.path.basename(img_path))) with open(os.path.join(output_root, "labels", base + ".txt"), "w", encoding="utf-8") as f: f.write("\n".join(yolo_labels)) cv2.imwrite(os.path.join(output_root, "visual", base + "-visual.jpg"), vis_img) cv2.imwrite(os.path.join(output_root, "highlighted", base + "-highlighted.jpg"), image) print(f"✅ 已处理 Grid TXT: {base}") return True # ------------------ 批量处理函数 ------------------ def batch_process_txt_first(input_root, output_root, mode_type="5211", max_workers=4): if mode_type == "5211": class_maps = { "asphalt": class_map_asphalt, "cream": class_map_cream, "gravel": class_map_gravel, } elif mode_type == "5210": class_maps = { "asphalt": class_map_asphalt_road, "cream": class_map_cream_road, "gravel": class_map_gravel, } else: raise ValueError("mode_type 必须是 '5210' 或 '5211'") txt_files = [] for root, _, files in os.walk(input_root): for file in files: if file.lower().endswith(".txt"): txt_files.append(os.path.join(root, file)) def process_single(txt_path): img_path = find_matching_image(txt_path, input_root) if not img_path: print(f"⚠️ 未找到匹配图片: {txt_path}") return with open(txt_path, "r", encoding="utf-8") as f: content = f.read().lower() if any(k in content for k in ["水泥", "cement", "cream"]): road_type = "cream" elif any(k in content for k in ["沥青", "asphalt"]): road_type = "asphalt" else: road_type = "gravel" selected_map = class_maps[road_type] output_folder = os.path.join(output_root, road_type) os.makedirs(output_folder, exist_ok=True) if "_partclass" in txt_path.lower(): process_grid_txt(img_path, txt_path, selected_map, output_folder) else: process_pixel_txt(img_path, txt_path, selected_map, output_folder) with ThreadPoolExecutor(max_workers=max_workers) as executor: executor.map(process_single, txt_files) # ------------------ 类别映射 ------------------ class_map_asphalt = { # "纵向裂缝": 0, "横向裂缝": 1, "网裂": 2, "坑槽": 3, "松散": 4, "龟裂": 0, "块状裂缝": 1, "纵向裂缝": 2, "横向裂缝": 3, "沉陷": 4, "车辙": 5, "波浪拥包": 6, "坑槽": 7, "松散": 8, "泛油": 9, "修补": 10, } class_map_cream = { "破碎板": 0, "裂缝": 1, "坑洞": 2, "露骨": 3, "错台": 4, "拱起": 5, } class_map_gravel = { "坑槽": 0, "沉陷": 1, "车辙": 2, "波浪搓板": 3, } class_map_asphalt_road = { "龟裂": 0, "块状裂缝": 1, "纵向裂缝": 2, "横向裂缝": 3, "沉陷": 4, "车辙": 5, "波浪拥包": 6, "坑槽": 7, "松散": 8, "泛油": 9, "修补": 10, } class_map_cream_road = { "破碎板": 0, "裂缝": 1, "板角断裂": 2, "错台": 3, "拱起": 4, "边角剥落": 5, "接缝料损坏": 6, "坑洞": 7, "唧泥": 8, "露骨": 9, "修补": 10, } # ------------------ 主程序 ------------------ if __name__ == "__main__": input_root = r"D:\work\develop\LF-where\01" output_root = r"D:\work\develop\LF-where\out" mode_type = "5211" # 5211 或 5210 batch_process_txt_first(input_root, output_root, mode_type, max_workers=8)