147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
import os
|
||
import re
|
||
import cv2
|
||
import zipfile
|
||
import shutil
|
||
import numpy as np
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
# ------------------ 路面类别映射集 ------------------
|
||
CLASS_MAPS_5010 = {
|
||
"asphalt": {
|
||
"龟裂": 0, "块状裂缝": 1, "纵向裂缝": 2, "横向裂缝": 3,
|
||
"沉陷": 4, "车辙": 5, "波浪拥包": 6, "坑槽": 7,
|
||
"松散": 8, "泛油": 9, "修补": 10,
|
||
},
|
||
"cream": {"破碎板": 0, "裂缝": 1, "坑洞": 2, "露骨": 3, "错台": 4, "拱起": 5},
|
||
"gravel": {"坑槽": 0, "沉陷": 1, "车辙": 2, "波浪搓板": 3},
|
||
}
|
||
|
||
CLASS_MAPS_5011 = {
|
||
"asphalt": {
|
||
"龟裂": 0, "块状裂缝": 1, "纵向裂缝": 2, "横向裂缝": 3,
|
||
"沉陷": 4, "车辙": 5, "波浪拥包": 6, "坑槽": 7,
|
||
"松散": 8, "泛油": 9, "修补": 10,
|
||
},
|
||
"cream": {
|
||
"破碎板": 0, "裂缝": 1, "板角断裂": 2, "错台": 3,
|
||
"拱起": 4, "边角剥落": 5, "接缝料损坏": 6,
|
||
"坑洞": 7, "唧泥": 8, "露骨": 9, "修补": 10,
|
||
},
|
||
"gravel": {}, # 可扩展
|
||
}
|
||
|
||
# ------------------ 工具函数 ------------------
|
||
def detect_road_type(text: str) -> str:
|
||
"""检测路面类型"""
|
||
low = text.lower()
|
||
if "沥青" in low:
|
||
return "asphalt"
|
||
if "水泥" in low:
|
||
return "cream"
|
||
if "石子" in low or "gravel" in low:
|
||
return "gravel"
|
||
return "asphalt"
|
||
|
||
def detect_section_type(path: str) -> str:
|
||
"""判断属于5010还是5011"""
|
||
name = os.path.basename(path)
|
||
if "5011" in name:
|
||
return "5011"
|
||
if "5010" in name:
|
||
return "5010"
|
||
# 若未明确,默认5010
|
||
return "5010"
|
||
|
||
def clean_name(name):
|
||
return re.sub(r'[\s\r\n\t]+', '', name)
|
||
|
||
# ------------------ YOLO-Seg 主函数 ------------------
|
||
def yoloseg_to_grid_cells_fixed_v6(image_path, label_path, output_dir, class_map, extra_info=None):
|
||
img = cv2.imread(image_path)
|
||
if img is None:
|
||
return
|
||
h, w = img.shape[:2]
|
||
|
||
txt_name = os.path.splitext(os.path.basename(label_path))[0]
|
||
output_txt = os.path.join(output_dir, f"{txt_name}.txt")
|
||
|
||
with open(label_path, "r", encoding="utf-8") as f:
|
||
lines = f.readlines()
|
||
|
||
with open(output_txt, "w", encoding="utf-8") as out:
|
||
for line in lines:
|
||
parts = line.strip().split()
|
||
if len(parts) < 2:
|
||
continue
|
||
cls_name = parts[-1]
|
||
cls_id = class_map.get(cls_name, -1)
|
||
if cls_id == -1:
|
||
print(f"未识别类别: {cls_name}")
|
||
continue
|
||
coords = list(map(float, parts[:-1]))
|
||
coords = [round(c, 4) for c in coords]
|
||
out.write(f"{cls_id} {' '.join(map(str, coords))} {extra_info or ''}\n")
|
||
|
||
# ------------------ 批处理函数 ------------------
|
||
def process_zip_yoloseg_with_draw(zip_path, output_root):
|
||
temp_dir = "temp_extract"
|
||
if os.path.exists(temp_dir):
|
||
shutil.rmtree(temp_dir)
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
|
||
# 解压
|
||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||
zf.extractall(temp_dir)
|
||
|
||
image_files = []
|
||
for root, _, files in os.walk(temp_dir):
|
||
for f in files:
|
||
if f.lower().endswith((".jpg", ".png", ".jpeg")):
|
||
image_files.append(os.path.join(root, f))
|
||
|
||
print(f"检测到 {len(image_files)} 张图片")
|
||
|
||
def process_one(img_path):
|
||
name = os.path.splitext(os.path.basename(img_path))[0]
|
||
label_path = os.path.join(os.path.dirname(img_path), f"{name}.txt")
|
||
if not os.path.exists(label_path):
|
||
print(f"缺少标签: {img_path}")
|
||
return
|
||
|
||
# 判断5010或5011
|
||
section_type = detect_section_type(img_path)
|
||
|
||
# 读取标签判断路面类型
|
||
with open(label_path, "r", encoding="utf-8") as f:
|
||
txt = f.read()
|
||
road_type = detect_road_type(txt)
|
||
|
||
# 选择对应类别映射表
|
||
if section_type == "5011":
|
||
class_map = CLASS_MAPS_5011.get(road_type, {})
|
||
else:
|
||
class_map = CLASS_MAPS_5010.get(road_type, {})
|
||
|
||
if not class_map:
|
||
print(f"无映射表: {road_type} ({section_type})")
|
||
return
|
||
|
||
out_dir = os.path.join(output_root, road_type, name)
|
||
os.makedirs(out_dir, exist_ok=True)
|
||
|
||
extra_info = f"{section_type}_{road_type}"
|
||
yoloseg_to_grid_cells_fixed_v6(img_path, label_path, out_dir, class_map, extra_info=extra_info)
|
||
|
||
with ThreadPoolExecutor(max_workers=6) as ex:
|
||
list(ex.map(process_one, image_files))
|
||
|
||
shutil.rmtree(temp_dir)
|
||
print("处理完成")
|
||
|
||
# ------------------ 主入口 ------------------
|
||
if __name__ == "__main__":
|
||
zip_path = r"D:\work\data\road_seg.zip" # 输入zip路径
|
||
output_root = r"D:\work\data\output" # 输出根目录
|
||
process_zip_yoloseg_with_draw(zip_path, output_root)
|