import os import zipfile import shutil import cv2 import numpy as np from collections import defaultdict import smb import pandas as pd import glob from datetime import datetime # ---------------- 常量 ---------------- CELL_AREA = 0.0036 # 每格面积 (平方米) GRID_WIDTH = 108 # 网格像素宽 GRID_HEIGHT = 102 # 网格像素高 COVER_RATIO = 0.01 # mask 覆盖比例阈值 # ---------------- 路面类别映射 ---------------- CLASS_MAP_ASPHALT = { "龟裂":0,"块状裂缝":1,"纵向裂缝":2,"横向裂缝":3,"沉陷":4,"车辙":5,"波浪拥包":6,"坑槽":7,"松散":8,"泛油":9,"修补":10 } CLASS_MAP_CEMENT = { "破碎板":0,"裂缝":1,"板角断裂":2,"错台":3,"拱起":4,"边角剥落":5,"接缝料损坏":6,"坑洞":7,"唧泥":8,"露骨":9,"修补":10 } CLASS_MAP_GRAVEL = { "坑槽":0,"沉陷":1,"车辙":2,"波浪搓板":3 } ROAD_TYPE_EN_TO_CN = { "asphalt":"沥青", "cement":"水泥", "gravel":"砾石" } # ---------------- 工具函数 ---------------- def num_to_coord(num, cols, cell_w, cell_h): n = num - 1 r, c = divmod(n, cols) x1, y1 = c * cell_w, r * cell_h x2, y2 = x1 + cell_w, y1 + cell_h return x1, y1, x2, y2 def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGHT), save_path=None): image = cv2.imread(image_path) if image is None: return h, w = image.shape[:2] cell_w, cell_h = cell_size cols = w // cell_w overlay = image.copy() for cname, nums in grid_cells.items(): color = (np.random.randint(64,255),np.random.randint(64,255),np.random.randint(64,255)) for num in nums: x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h) cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1) cv2.addWeighted(overlay,0.4,image,0.6,0,image) for i in range(0, w, cell_w): cv2.line(image,(i,0),(i,h),(100,100,100),1) for j in range(0, h, cell_h): cv2.line(image,(0,j),(w,j),(100,100,100),1) if save_path: cv2.imwrite(save_path,image) return image def detect_road_type_from_content(label_file): """根据标签内容判断路面类型""" try: with open(label_file,'r',encoding='utf-8') as f: content = f.read() except: return "gravel" for kw in CLASS_MAP_ASPHALT.keys(): if kw in content: return "asphalt" for kw in CLASS_MAP_CEMENT.keys(): if kw in content: return "cement" for kw in CLASS_MAP_GRAVEL.keys(): if kw in content: return "gravel" return "gravel" def get_road_info(road_dict, pile_dict, img_file_name): """获取路线信息""" parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] road_info = road_dict.get(road_code) if road_info : data = road_info['data'] return data return {} def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name): """根据读取的excel内容内容判断路面类型""" road_code = 'xxxxxxxxxxx' pile_no = "xxxxx" road_type = "asphalt" parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] pile_no = parts[1] road_info = road_dict.get(road_code) if road_info : data = road_info['data'] pile_no = parts[1] road_type_cn = data['路面类型(沥青/水泥/砂石)'] identify_width = data['识别宽度(米)'] if road_type_cn == '沥青' : road_type = "asphalt" elif road_type_cn == '水泥' : road_type = "cement" elif road_type_cn == '砾石' : road_type = "gravel" return road_code, pile_no, road_type def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,cover_ratio=COVER_RATIO): """将YOLO-Seg标签转换成格子编号和类别""" img_file_name = os.path.basename(image_path) road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT elif road_type=="cement": class_map = CLASS_MAP_CEMENT else: class_map = CLASS_MAP_GRAVEL class_names = list(class_map.keys()) img = cv2.imread(image_path) if img is None: return "", {} h, w = img.shape[:2] cols = max(1, w//GRID_WIDTH) rows = max(1, h//GRID_HEIGHT) result_lines = [] all_class_cells = {} with open(label_file,'r',encoding='utf-8') as f: for line in f: parts = line.strip().split() if len(parts)<5: continue cls_id = int(parts[0]) coords = [float(x) for x in parts[1:]] if len(coords)%2!=0: coords=coords[:-1] if len(coords)<6: continue poly = np.array(coords,dtype=np.float32).reshape(-1,2) poly[:,0]*=w poly[:,1]*=h mask = np.zeros((h,w),dtype=np.uint8) cv2.fillPoly(mask,[poly.astype(np.int32)],255) cell_info = [] covered_cells=[] min_x = cols min_y = rows max_x = 0 max_y = 0 for r in range(rows): for c in range(cols): x1,y1 = c*GRID_WIDTH, r*GRID_HEIGHT x2,y2 = min(w,x1+GRID_WIDTH), min(h,y1+GRID_HEIGHT) region = mask[y1:y2, x1:x2] if np.count_nonzero(region)/region.size>cover_ratio: covered_cells.append(r*cols+c+1) # 最小x坐标,y坐标 if min_x > c : min_x = c if min_y > r : min_y = r # 最大x坐标,y坐标 if max_x < c + 1 : max_x = c + 1 if max_y < r + 1 : max_y = r + 1 if not covered_cells: continue # min_cell = covered_cells[0] # max_cell = covered_cells[len(covered_cells)-1] cname = class_names[cls_id] if cls_idcover_ratio: covered_cells.append(r*cols+c+1) if not covered_cells: continue cname = class_names[cls_id] if cls_id tmp_pile : min_pile = tmp_pile if max_pile < tmp_pile : max_pile = tmp_pile return min_pile, max_pile def convert_special_format(input_str): """ 将特殊格式字符串转换为浮点数 支持格式: "0+022" -> 0.022, "1+234" -> 1.234 """ if '+' in input_str: # 分割整数部分和小数部分 parts = input_str.split('+') if len(parts) == 2: integer_part = parts[0] decimal_part = parts[1] # 构建标准小数格式 standard_format = f"{integer_part}.{decimal_part}" return float(standard_format) else: raise ValueError(f"无效的格式: {input_str}") else: # 如果没有 '+',直接转换 return float(input_str) # 是否在区间内 def in_interval(increment, cur_pile_no, tmp_start, tmp_end) : if increment > 0 : if cur_pile_no >= tmp_start and cur_pile_no < tmp_end : return True else : return False else : if cur_pile_no > tmp_end and cur_pile_no <= tmp_start : return True else : return False # ---------------- 主函数-共享目录 ---------------- def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): os.makedirs(dir,exist_ok=True) # 解压 # 读取桩号映射 # 遍历图片 summary_data = [] for root,_,files in os.walk(dir): for file in files: if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) : image_path = os.path.join(root,file) label_file = os.path.splitext(image_path)[0]+".txt" if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue out_txt, class_cells, road_type, all_cell_num = yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 # draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 counts = {k:[len(v[0])*cell_area, v[1][0], v[1][1]] for k,v in class_cells.items()} # total_area = sum(counts.values()) # 灾害总面积比例 merged_set = set([]) for k,v in class_cells.items() : merged_set = merged_set.union(v[0]) total_area = len(merged_set) # 桩号 路线编号 parts = pile_dict.get(file) pile_no = "0+000" if parts : pile_no = parts[1] # 破损率 DR (%) = total_area / 总面积 # DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0 DR= total_area / all_cell_num * 100 summary_data.append((pile_no, DR, counts, road_type)) current_time = datetime.now().strftime("%Y%m%d%H%M%S") # 写桩号问题列表.txt if summary_data: img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) road_code = pile_dict.get(img_file_name)[0] road_type = summary_data[0][3] min_pile, max_pile = get_min_max_pile(summary_data) out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') for pile_no,DR,counts,rt in summary_data: row = [pile_no,"3.6",f"{DR:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{counts.get(k,[0,0,0])[0]:.2f}") f.write(','.join(row)+'\n') print(f"输出完成: {out_file}") # 写灾害数据.txt if summary_data: img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') road_code = pile_dict.get(img_file_name)[0] group_by_road_type = {} for data in summary_data: group_by_road_type.setdefault(data[3], []).append(data) for road_type, group in group_by_road_type.items(): min_pile, max_pile = get_min_max_pile(group) out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt") header = generate_header(road_type) group_list = group increment = float(0.010) pile_no_start = float(0.000) pile_no_end = float(0.000) + increment # 上行/下行 if up_or_down == '下行' : group_list = list(group)[::-1] increment = float(-0.010) tmp_pile_no = convert_special_format(group[len(group)-1][1]) pile_no_start = tmp_pile_no if tmp_pile_no % increment != 0 : pile_no_end = tmp_pile_no + (tmp_pile_no % increment) else : pile_no_end = tmp_pile_no + increment with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') index = 0 tmp_start = pile_no_start tmp_end = pile_no_end while True : # 每10m一个区间,在区间内进行灾害计算 pile_no, DR, counts, road_type = summary_data[index] cur_pile_no = convert_special_format(pile_no) if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) : # 没在刻度内直接输出无病害数据 pile_no = f"{tmp_start:0.3f}" row = [pile_no,identify_width,f"{0:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{0:.2f}") f.write(','.join(row)+'\n') else : row = [f"{tmp_start:0.3f}", identify_width] subRows = [] while index < len(group_list): pile_no, DR, counts, road_type = summary_data[index] cur_pile_no = convert_special_format(pile_no) tmp_row = [] if in_interval(increment, cur_pile_no, tmp_start, tmp_end) : pile_no = f"{tmp_start:0.3f}" tmp_row = [DR] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: tmp_row.append(counts.get(k, [0,0,0])[0]) subRows.append(tmp_row) index = index + 1 else : break # 同列汇总 10m一个区间--对应5张图 column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)] row += column_sums f.write(','.join(row)+'\n') tmp_start = tmp_end tmp_end = tmp_start + increment print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}") if index >= len(group_list) : break print(f"输出完成: {out_file}") # 病害明显列表.xlsx headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注'] data_list = [] if summary_data: img_file_path = os.path.dirname(image_path) img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') excel_index = 1 for data in summary_data: damage_data = data[2] for attr_name, attr_value in damage_data.items(): excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value[1]*cell_area, attr_value[2]*cell_area, attr_value[0], '', ''] data_list.append(excel_data) all_data = [headers] + data_list smb.write_to_excel_pandas(all_data, img_file_path + '/病害明显列表.xlsx') # ---------------- 主函数 ---------------- def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): if not os.path.exists(zip_path): raise FileNotFoundError(f"{zip_path} 不存在") os.makedirs(output_dir,exist_ok=True) # 解压 with zipfile.ZipFile(zip_path,'r') as zip_ref: zip_ref.extractall(output_dir) # 读取桩号映射 pile_dict = {} with open(pile_map_file,'r',encoding='utf-8') as f: for line in f: parts = line.strip().split("->") if len(parts)>=4: pile_dict[parts[3]]=parts # filename -> 桩号 # 遍历图片 summary_data = [] for root,_,files in os.walk(output_dir): for file in files: if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) : image_path = os.path.join(root,file) label_file = os.path.splitext(image_path)[0]+".txt" if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue out_txt, class_cells, road_type, = yoloseg_to_grid(image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 counts = {k:len(v)*cell_area for k,v in class_cells.items()} total_area = sum(counts.values()) # 桩号 pile_no = pile_dict.get(file,"0+000") # 破损率 DR (%) = total_area / 总面积 DR = total_area / (total_area if total_area > 0 else 1) * 100 # 简化为100%或者0 summary_data.append((pile_no, DR, counts, road_type)) # 写桩号问题列表.txt if summary_data: road_type = summary_data[0][3] out_file = os.path.join(output_dir,"桩号问题列表.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') for pile_no,DR,counts,rt in summary_data: row = [pile_no,"3.6",f"{DR:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{counts.get(k,0):.2f}") f.write(','.join(row)+'\n') print(f"✅ 输出完成: {out_file}") # 路线编码 -> 路线信息 def get_road_dict(local_dir): """ 从本地目录读取Excel文件,构建路线字典 Args: local_dir: 本地目录路径 Returns: dict: 路线编码到路线信息的映射字典 """ # 查找匹配的Excel文件 pattern = os.path.join(local_dir, '每公里指标明细表*.xls') found_paths = glob.glob(pattern) print(f"\n找到 {len(found_paths)} 个 '每公里指标明细表*.xls' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") road_dict = {} if len(found_paths) > 0: # 读取第一个匹配的Excel文件 df = pd.read_excel(found_paths[0]) # 处理所有行(这里需要根据实际情况调整处理逻辑) for index, row in df.iterrows(): data = row.to_dict() if pd.notna(data.get('线路编码', None)): up_or_down = 'A' if data.get('方向(上行/下行)', '') == '下行': up_or_down = 'B' # 构建key,确保区划代码为整数 area_code = data.get('区划代码', '') if pd.notna(area_code): area_code = str(int(float(area_code))) if str(area_code).replace('.', '').isdigit() else str(area_code) else: area_code = '' key = f"{data['线路编码']}{area_code}{up_or_down}" if key in road_dict: road_dict[key].append({'index': index, 'data': data}) else: road_dict[key] = [{'index': index, 'data': data}] return road_dict # filename -> 桩号 def get_pile_dict(local_dir): """ 从本地目录读取fileindex.txt文件,构建桩号字典 Args: local_dir: 本地目录路径 Returns: dict: 文件名到桩号信息的映射字典 """ # 查找fileindex.txt文件 pattern = os.path.join(local_dir, 'fileindex.txt') found_paths = glob.glob(pattern) print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") pile_dict = {} if len(found_paths) > 0: # 读取第一个匹配的txt文件 with open(found_paths[0], 'r', encoding='utf-8') as file: lines = file.readlines() for i, line in enumerate(lines, 1): parts = line.strip().split("->") if len(parts) >= 4: pile_dict[parts[3]] = parts # filename -> 桩号 return pile_dict # ---------------- 示例调用 ---------------- if __name__=="__main__": # zip_path = "D:/devForBdzlWork/ai-train_platform/predict/inferenceResult.zip" # 输入 ZIP 文件 # pile_map_file = "D:/devForBdzlWork/ai-train_platform/predict/pile_map.txt" # 图片名 -> 桩号 # process_zip(zip_path=zip_path,pile_map_file=pile_map_file,output_dir="output") # output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" # pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") # road_dict = smb.get_road_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") # process_dir(road_dict, pile_dict, output_dir) output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" pile_dict = get_pile_dict(output_dir) road_dict = get_road_dict(output_dir) process_dir(road_dict, pile_dict, output_dir)