244 lines
11 KiB
Python
244 lines
11 KiB
Python
|
|
import os
|
||
|
|
import zipfile
|
||
|
|
import shutil
|
||
|
|
import cv2
|
||
|
|
import numpy as np
|
||
|
|
from collections import defaultdict
|
||
|
|
import smb
|
||
|
|
|
||
|
|
# ---------------- 常量 ----------------
|
||
|
|
CELL_AREA = 0.0036 # 每格面积 (平方米)
|
||
|
|
GRID_WIDTH = 108 # 网格像素宽
|
||
|
|
GRID_HEIGHT = 102 # 网格像素高
|
||
|
|
COVER_RATIO = 0.01 # mask 覆盖比例阈值
|
||
|
|
|
||
|
|
# ---------------- 路面类别映射 ----------------
|
||
|
|
CLASS_MAP_ASPHALT = {
|
||
|
|
"龟裂":0,"块状裂缝":1,"纵向裂缝":2,"横向裂缝":3,"沉陷":4,"车辙":5,"波浪拥包":6,"坑槽":7,"松散":8,"泛油":9,"修补":10
|
||
|
|
}
|
||
|
|
CLASS_MAP_CEMENT = {
|
||
|
|
"破碎板":0,"裂缝":1,"板角断裂":2,"错台":3,"拱起":4,"边角剥落":5,"接缝料损坏":6,"坑洞":7,"唧泥":8,"露骨":9,"修补":10
|
||
|
|
}
|
||
|
|
CLASS_MAP_GRAVEL = {
|
||
|
|
"坑槽":0,"沉陷":1,"车辙":2,"波浪搓板":3
|
||
|
|
}
|
||
|
|
|
||
|
|
# ---------------- 工具函数 ----------------
|
||
|
|
def num_to_coord(num, cols, cell_w, cell_h):
|
||
|
|
n = num - 1
|
||
|
|
r, c = divmod(n, cols)
|
||
|
|
x1, y1 = c * cell_w, r * cell_h
|
||
|
|
x2, y2 = x1 + cell_w, y1 + cell_h
|
||
|
|
return x1, y1, x2, y2
|
||
|
|
|
||
|
|
def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGHT), save_path=None):
|
||
|
|
image = cv2.imread(image_path)
|
||
|
|
if image is None: return
|
||
|
|
h, w = image.shape[:2]
|
||
|
|
cell_w, cell_h = cell_size
|
||
|
|
cols = w // cell_w
|
||
|
|
overlay = image.copy()
|
||
|
|
for cname, nums in grid_cells.items():
|
||
|
|
color = (np.random.randint(64,255),np.random.randint(64,255),np.random.randint(64,255))
|
||
|
|
for num in nums:
|
||
|
|
x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h)
|
||
|
|
cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1)
|
||
|
|
cv2.addWeighted(overlay,0.4,image,0.6,0,image)
|
||
|
|
for i in range(0, w, cell_w):
|
||
|
|
cv2.line(image,(i,0),(i,h),(100,100,100),1)
|
||
|
|
for j in range(0, h, cell_h):
|
||
|
|
cv2.line(image,(0,j),(w,j),(100,100,100),1)
|
||
|
|
if save_path: cv2.imwrite(save_path,image)
|
||
|
|
return image
|
||
|
|
|
||
|
|
def detect_road_type_from_content(label_file):
|
||
|
|
"""根据标签内容判断路面类型"""
|
||
|
|
try:
|
||
|
|
with open(label_file,'r',encoding='utf-8') as f:
|
||
|
|
content = f.read()
|
||
|
|
except:
|
||
|
|
return "gravel"
|
||
|
|
for kw in CLASS_MAP_ASPHALT.keys():
|
||
|
|
if kw in content: return "asphalt"
|
||
|
|
for kw in CLASS_MAP_CEMENT.keys():
|
||
|
|
if kw in content: return "cement"
|
||
|
|
for kw in CLASS_MAP_GRAVEL.keys():
|
||
|
|
if kw in content: return "gravel"
|
||
|
|
return "gravel"
|
||
|
|
|
||
|
|
def yoloseg_to_grid(image_path,label_file,cover_ratio=COVER_RATIO):
|
||
|
|
"""将YOLO-Seg标签转换成格子编号和类别"""
|
||
|
|
road_type = detect_road_type_from_content(label_file)
|
||
|
|
if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT
|
||
|
|
elif road_type=="cement": class_map = CLASS_MAP_CEMENT
|
||
|
|
else: class_map = CLASS_MAP_GRAVEL
|
||
|
|
class_names = list(class_map.keys())
|
||
|
|
|
||
|
|
img = cv2.imread(image_path)
|
||
|
|
if img is None: return "", {}
|
||
|
|
h, w = img.shape[:2]
|
||
|
|
cols = max(1, w//GRID_WIDTH)
|
||
|
|
rows = max(1, h//GRID_HEIGHT)
|
||
|
|
|
||
|
|
result_lines = []
|
||
|
|
all_class_cells = {}
|
||
|
|
with open(label_file,'r',encoding='utf-8') as f:
|
||
|
|
for line in f:
|
||
|
|
parts = line.strip().split()
|
||
|
|
if len(parts)<5: continue
|
||
|
|
cls_id = int(parts[0])
|
||
|
|
coords = [float(x) for x in parts[1:]]
|
||
|
|
if len(coords)%2!=0: coords=coords[:-1]
|
||
|
|
if len(coords)<6: continue
|
||
|
|
poly = np.array(coords,dtype=np.float32).reshape(-1,2)
|
||
|
|
poly[:,0]*=w
|
||
|
|
poly[:,1]*=h
|
||
|
|
mask = np.zeros((h,w),dtype=np.uint8)
|
||
|
|
cv2.fillPoly(mask,[poly.astype(np.int32)],255)
|
||
|
|
covered_cells=[]
|
||
|
|
for r in range(rows):
|
||
|
|
for c in range(cols):
|
||
|
|
x1,y1 = c*GRID_WIDTH, r*GRID_HEIGHT
|
||
|
|
x2,y2 = min(w,x1+GRID_WIDTH), min(h,y1+GRID_HEIGHT)
|
||
|
|
region = mask[y1:y2, x1:x2]
|
||
|
|
if np.count_nonzero(region)/region.size>cover_ratio:
|
||
|
|
covered_cells.append(r*cols+c+1)
|
||
|
|
if not covered_cells: continue
|
||
|
|
cname = class_names[cls_id] if cls_id<len(class_names) else str(cls_id)
|
||
|
|
ids_str = '-'.join(map(str,sorted(covered_cells)))+'-'
|
||
|
|
result_lines.append(f"{cname} {ids_str}")
|
||
|
|
if cname not in all_class_cells: all_class_cells[cname]=set()
|
||
|
|
all_class_cells[cname].update(covered_cells)
|
||
|
|
return '\n'.join(result_lines), all_class_cells, road_type
|
||
|
|
|
||
|
|
def generate_header(road_type):
|
||
|
|
if road_type=="asphalt": return "起点桩号(km),识别宽度(m),破损率DR(%),龟裂,块状裂缝,纵向裂缝,横向裂缝,沉陷,车辙,波浪拥包,坑槽,松散,泛油,修补"
|
||
|
|
if road_type=="cement": return "起点桩号(km),识别宽度(m),破损率DR(%),破碎板,裂缝,板角断裂,错台,拱起,边角剥落,接缝料损坏,坑洞,唧泥,露骨,修补"
|
||
|
|
if road_type=="gravel": return "起点桩号(km),识别宽度(m),破损率DR(%),坑槽,沉陷,车辙,波浪搓板"
|
||
|
|
return ""
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------- 主函数-共享目录 ----------------
|
||
|
|
def process_dir(pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT):
|
||
|
|
os.makedirs(dir,exist_ok=True)
|
||
|
|
# 解压
|
||
|
|
# 读取桩号映射
|
||
|
|
# 遍历图片
|
||
|
|
summary_data = []
|
||
|
|
for root,_,files in os.walk(dir):
|
||
|
|
for file in files:
|
||
|
|
if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) :
|
||
|
|
image_path = os.path.join(root,file)
|
||
|
|
label_file = os.path.splitext(image_path)[0]+".txt"
|
||
|
|
if not os.path.exists(label_file):
|
||
|
|
print(f"⚠️ 找不到标签: {label_file}")
|
||
|
|
continue
|
||
|
|
out_txt, class_cells, road_type = yoloseg_to_grid(image_path,label_file)
|
||
|
|
# 写每张图独立 _grid.txt
|
||
|
|
grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt"
|
||
|
|
with open(grid_txt_path,'w',encoding='utf-8') as f:
|
||
|
|
f.write(out_txt)
|
||
|
|
# 生成网格可视化
|
||
|
|
draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg")
|
||
|
|
# 统计各类面积
|
||
|
|
counts = {k:len(v)*cell_area for k,v in class_cells.items()}
|
||
|
|
total_area = sum(counts.values())
|
||
|
|
# 桩号
|
||
|
|
pile_no = pile_dict.get(file,"未知")
|
||
|
|
# 破损率 DR (%) = total_area / 总面积
|
||
|
|
DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0
|
||
|
|
summary_data.append((pile_no, DR, counts, road_type))
|
||
|
|
|
||
|
|
# 写桩号问题列表.txt
|
||
|
|
if summary_data:
|
||
|
|
road_type = summary_data[0][3]
|
||
|
|
out_file = os.path.join(dir,"桩号问题列表.txt")
|
||
|
|
header = generate_header(road_type)
|
||
|
|
with open(out_file,'w',encoding='utf-8') as f:
|
||
|
|
f.write(header+'\n')
|
||
|
|
for pile_no,DR,counts,rt in summary_data:
|
||
|
|
row = [pile_no,"3.6",f"{DR:.2f}"]
|
||
|
|
if road_type=="asphalt":
|
||
|
|
keys = list(CLASS_MAP_ASPHALT.keys())
|
||
|
|
elif road_type=="cement":
|
||
|
|
keys = list(CLASS_MAP_CEMENT.keys())
|
||
|
|
else:
|
||
|
|
keys = list(CLASS_MAP_GRAVEL.keys())
|
||
|
|
for k in keys:
|
||
|
|
row.append(f"{counts.get(k,0):.2f}")
|
||
|
|
f.write(','.join(row)+'\n')
|
||
|
|
print(f"✅ 输出完成: {out_file}")
|
||
|
|
|
||
|
|
# ---------------- 主函数 ----------------
|
||
|
|
def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT):
|
||
|
|
if not os.path.exists(zip_path):
|
||
|
|
raise FileNotFoundError(f"{zip_path} 不存在")
|
||
|
|
os.makedirs(output_dir,exist_ok=True)
|
||
|
|
# 解压
|
||
|
|
with zipfile.ZipFile(zip_path,'r') as zip_ref:
|
||
|
|
zip_ref.extractall(output_dir)
|
||
|
|
|
||
|
|
# 读取桩号映射
|
||
|
|
pile_dict = {}
|
||
|
|
with open(pile_map_file,'r',encoding='utf-8') as f:
|
||
|
|
for line in f:
|
||
|
|
parts = line.strip().split("->")
|
||
|
|
if len(parts)>=4:
|
||
|
|
pile_dict[parts[3]]=parts[1] # filename -> 桩号
|
||
|
|
|
||
|
|
# 遍历图片
|
||
|
|
summary_data = []
|
||
|
|
for root,_,files in os.walk(output_dir):
|
||
|
|
for file in files:
|
||
|
|
if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) :
|
||
|
|
image_path = os.path.join(root,file)
|
||
|
|
label_file = os.path.splitext(image_path)[0]+".txt"
|
||
|
|
if not os.path.exists(label_file):
|
||
|
|
print(f"⚠️ 找不到标签: {label_file}")
|
||
|
|
continue
|
||
|
|
out_txt, class_cells, road_type = yoloseg_to_grid(image_path,label_file)
|
||
|
|
# 写每张图独立 _grid.txt
|
||
|
|
grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt"
|
||
|
|
with open(grid_txt_path,'w',encoding='utf-8') as f:
|
||
|
|
f.write(out_txt)
|
||
|
|
# 生成网格可视化
|
||
|
|
draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg")
|
||
|
|
# 统计各类面积
|
||
|
|
counts = {k:len(v)*cell_area for k,v in class_cells.items()}
|
||
|
|
total_area = sum(counts.values())
|
||
|
|
# 桩号
|
||
|
|
pile_no = pile_dict.get(file,"未知")
|
||
|
|
# 破损率 DR (%) = total_area / 总面积
|
||
|
|
DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0
|
||
|
|
summary_data.append((pile_no, DR, counts, road_type))
|
||
|
|
|
||
|
|
# 写桩号问题列表.txt
|
||
|
|
if summary_data:
|
||
|
|
road_type = summary_data[0][3]
|
||
|
|
out_file = os.path.join(output_dir,"桩号问题列表.txt")
|
||
|
|
header = generate_header(road_type)
|
||
|
|
with open(out_file,'w',encoding='utf-8') as f:
|
||
|
|
f.write(header+'\n')
|
||
|
|
for pile_no,DR,counts,rt in summary_data:
|
||
|
|
row = [pile_no,"3.6",f"{DR:.2f}"]
|
||
|
|
if road_type=="asphalt":
|
||
|
|
keys = list(CLASS_MAP_ASPHALT.keys())
|
||
|
|
elif road_type=="cement":
|
||
|
|
keys = list(CLASS_MAP_CEMENT.keys())
|
||
|
|
else:
|
||
|
|
keys = list(CLASS_MAP_GRAVEL.keys())
|
||
|
|
for k in keys:
|
||
|
|
row.append(f"{counts.get(k,0):.2f}")
|
||
|
|
f.write(','.join(row)+'\n')
|
||
|
|
print(f"✅ 输出完成: {out_file}")
|
||
|
|
|
||
|
|
# ---------------- 示例调用 ----------------
|
||
|
|
if __name__=="__main__":
|
||
|
|
# zip_path = "D:/devForBdzlWork/ai-train_platform/predict/inferenceResult.zip" # 输入 ZIP 文件
|
||
|
|
# pile_map_file = "D:/devForBdzlWork/ai-train_platform/predict/pile_map.txt" # 图片名 -> 桩号
|
||
|
|
# process_zip(zip_path=zip_path,pile_map_file=pile_map_file,output_dir="output")
|
||
|
|
|
||
|
|
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/1"
|
||
|
|
pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234")
|
||
|
|
process_dir(pile_dict, output_dir)
|