import os import zipfile import cv2 import numpy as np import pandas as pd from openpyxl import Workbook from openpyxl.styles import Font, Alignment from openpyxl.utils import get_column_letter import glob import shutil from datetime import datetime try: # 相对导入 from . import smb_tool except ImportError: # 直接运行时回退到绝对导入 import smb_tool # ---------------- 图片操作 ---------------- def read_image(image_path): """读取图片,处理中文路径""" try: image_path = smb_tool.standardized_path(image_path) img_array = np.fromfile(image_path, dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) return img except Exception as e: print(f"读取图片失败: {image_path}, 错误: {e}") return None def save_image(image, save_path): """保存图片,处理中文路径""" try: save_path = smb_tool.standardized_path(save_path) success, img_encoded = cv2.imencode('.jpg', image) if success: img_encoded.tofile(save_path) return True else: print(f"编码图片失败: {save_path}") return False except Exception as e: print(f"保存图片失败: {save_path}, 错误: {e}") return False # ---------------- 常量 ---------------- CELL_WIDTH = 0.1 # 每格宽 (米) CELL_HEIGHT = 0.1 # 每格高 (米) CELL_AREA = 0.01 # 每格面积 (平方米) GRID_WIDTH = 108 # 网格像素宽 GRID_HEIGHT = 102 # 网格像素高 COVER_RATIO = 0.01 # mask 覆盖比例阈值 RESERVED_CELL_NUM = 20 # 道路灾害格子过滤掉两侧数据,只保留每行中间20个格子 ROAD_RECOGNIZE_HEIGHT = 2 # 每两米一张照片 # ---------------- 路面类别映射 ---------------- CLASS_MAP_ASPHALT = { "龟裂":0,"块状裂缝":1,"纵向裂缝":2,"横向裂缝":3,"沉陷":4,"车辙":5,"波浪拥包":6,"坑槽":7,"松散":8,"泛油":9,"修补":10 } CLASS_MAP_CEMENT = { "破碎板":0,"裂缝":1,"板角断裂":2,"错台":3,"拱起":4,"边角剥落":5,"接缝料损坏":6,"坑洞":7,"唧泥":8,"露骨":9,"修补":10 } CLASS_MAP_GRAVEL = { "坑槽":0,"沉陷":1,"车辙":2,"波浪搓板":3 } ROAD_TYPE_EN_TO_CN = { "asphalt":"沥青", "cement":"水泥", "gravel":"砾石" } # ---------------- 工具函数 ---------------- def num_to_coord(num, cols, cell_w, cell_h): n = num - 1 r, c = divmod(n, cols) x1, y1 = c * cell_w, r * cell_h x2, y2 = x1 + cell_w, y1 + cell_h return x1, y1, x2, y2 def calc_grid_param(pic_width, pic_height, actual_width, actual_height, grid_x_count=38, grid_y_count=20) : calc_cell_area = CELL_WIDTH * CELL_HEIGHT # 每格面积 (平方米) calc_grid_width = 108 # 网格像素宽 calc_grid_height = 102 # 网格像素高 grid_x_count = actual_width / CELL_WIDTH grid_y_count = actual_height / CELL_HEIGHT calc_grid_width = round(pic_width / grid_x_count) calc_grid_height = round(pic_height / grid_y_count) return calc_cell_area, calc_grid_width, calc_grid_height def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGHT), save_path=None): # 使用公共方法读取图片 image = read_image(image_path) if image is None: return None h, w = image.shape[:2] cell_w, cell_h = cell_size cols = w // cell_w overlay = image.copy() for cname, nums in grid_cells.items(): color = (np.random.randint(64,255),np.random.randint(64,255),np.random.randint(64,255)) for cell in nums : covered_cells = cell.get("covered_cells") for num in covered_cells: x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h) cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1) # for num in nums[0]: # x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h) # cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1) cv2.addWeighted(overlay,0.4,image,0.6,0,image) for i in range(0, w, cell_w): cv2.line(image,(i,0),(i,h),(100,100,100),1) for j in range(0, h, cell_h): cv2.line(image,(0,j),(w,j),(100,100,100),1) if save_path: # 使用公共方法保存图片 save_image(image, save_path) return image def detect_road_type_from_content(label_file): """根据标签内容判断路面类型""" try: with open(label_file,'r',encoding='utf-8') as f: content = f.read() except: return "gravel" for kw in CLASS_MAP_ASPHALT.keys(): if kw in content: return "asphalt" for kw in CLASS_MAP_CEMENT.keys(): if kw in content: return "cement" for kw in CLASS_MAP_GRAVEL.keys(): if kw in content: return "gravel" return "gravel" def get_road_code(pile_dict) : """获取路线编码""" img_file_name = list(pile_dict.keys())[0] parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] return road_code def get_road_info(road_dict, pile_dict, img_file_name): """获取路线信息""" parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] road_info = road_dict.get(road_code) if road_info : pile_dict_vals = list(pile_dict.values()) pile_dict_vals_len = len(pile_dict_vals) pile_no_min = convert_special_format(pile_dict_vals[0][1]) * 1000 pile_no_max = convert_special_format(pile_dict_vals[pile_dict_vals_len-1][1]) * 1000 if pile_no_max < pile_no_min : #上行下行 tmp = pile_no_max pile_no_max = pile_no_min pile_no_min = tmp final_excel_start_pile_no = pile_no_min final_excel_end_pile_no = pile_no_max for road in road_info : data = road.get("data") excel_start_pile_no = data.get('起桩号(米)',0) excel_end_pile_no = data.get('止桩号(米)',0) if excel_start_pile_no < final_excel_start_pile_no : final_excel_start_pile_no = excel_start_pile_no if excel_end_pile_no > final_excel_end_pile_no : final_excel_end_pile_no = excel_end_pile_no # up_down = data.get("方向(上行/下行)") if excel_start_pile_no <= pile_no_min and excel_end_pile_no >= pile_no_max: return data # 起始结束桩号不匹配 data = road_info[0].get("data") data['起桩号(米)'] = final_excel_start_pile_no data['止桩号(米)'] = final_excel_end_pile_no return data return {} def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name): """根据读取的excel内容内容判断路面类型""" road_code = 'xxxxxxxxxxx' pile_no = "xxxxx" road_type = "asphalt" parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] pile_no = parts[1] road_info = road_dict.get(road_code) if road_info : data = road_info[0]['data'] pile_no = parts[1] road_type_cn = data['路面类型(沥青/水泥/砂石)'] identify_width = data['识别宽度(米)'] if road_type_cn == '沥青' : road_type = "asphalt" elif road_type_cn == '水泥' : road_type = "cement" elif road_type_cn == '砾石' : road_type = "gravel" return road_code, pile_no, road_type def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,cell_index_start,cell_index_end, grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT,cover_ratio=COVER_RATIO): """将YOLO-Seg标签转换成格子编号和类别""" img_file_name = os.path.basename(image_path) road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT elif road_type=="cement": class_map = CLASS_MAP_CEMENT else: class_map = CLASS_MAP_GRAVEL class_names = list(class_map.keys()) img = read_image(image_path) if img is None: return "", {}, road_type, 20 * RESERVED_CELL_NUM h, w = img.shape[:2] cols = max(1, w//grid_width) rows = max(1, h//grid_height) result_lines = [] all_class_cells = {} with open(label_file,'r',encoding='utf-8') as f: for line in f: parts = line.strip().split() if len(parts)<5: continue cls_id = int(parts[0]) coords = [float(x) for x in parts[1:]] if len(coords)%2!=0: coords=coords[:-1] if len(coords)<6: continue poly = np.array(coords,dtype=np.float32).reshape(-1,2) poly[:,0]*=w poly[:,1]*=h mask = np.zeros((h,w),dtype=np.uint8) cv2.fillPoly(mask,[poly.astype(np.int32)],255) cell_info = [] covered_cells=[] min_x = cols min_y = rows max_x = 0 max_y = 0 for r in range(rows): for c in range(cols): x1,y1 = c*grid_width, r*grid_height x2,y2 = min(w,x1+grid_width), min(h,y1+grid_height) region = mask[y1:y2, x1:x2] if c >= cell_index_start and c < cell_index_end and np.count_nonzero(region)/region.size>cover_ratio: covered_cells.append(r*cols+c+1) # 最小x坐标,y坐标 if min_x > c : min_x = c if min_y > r : min_y = r # 最大x坐标,y坐标 if max_x < c + 1 : max_x = c + 1 if max_y < r + 1 : max_y = r + 1 if not covered_cells: continue # min_cell = covered_cells[0] # max_cell = covered_cells[len(covered_cells)-1] cname = class_names[cls_id] if cls_idcover_ratio: covered_cells.append(r*cols+c+1) if not covered_cells: continue cname = class_names[cls_id] if cls_id tmp_pile : min_pile = tmp_pile if max_pile < tmp_pile : max_pile = tmp_pile return min_pile, max_pile def convert_special_format(input_str): """ 将特殊格式字符串转换为浮点数 支持格式: "0+022" -> 0.022, "1+234" -> 1.234 """ if '+' in input_str: # 分割整数部分和小数部分 parts = input_str.split('+') if len(parts) == 2: integer_part = parts[0] decimal_part = parts[1] # 构建标准小数格式 standard_format = f"{integer_part}.{decimal_part}" return float(standard_format) else: raise ValueError(f"无效的格式: {input_str}") else: # 如果没有 '+',直接转换 return float(input_str) # 是否在区间内 def in_interval(increment, cur_pile_no, tmp_start, tmp_end) : if increment > 0 : if cur_pile_no >= tmp_start and cur_pile_no < tmp_end : return 1 elif cur_pile_no < tmp_start : return -1 # 会导致死循环 else : return 0 else : if cur_pile_no > tmp_end and cur_pile_no <= tmp_start : return 1 elif cur_pile_no > tmp_start : return -1 # 会导致死循环 else : return 0 # 指定间隔区间,输出基本信息+灾害数据 def process_info(road_dict,pile_dict,summary_data,current_time,interval=10,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT) : process_info_data = [] if summary_data: img_file_name = list(pile_dict.keys())[0] road_data = get_road_info(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') road_code = pile_dict.get(img_file_name)[0] road_level = road_data.get('技术等级', '') road_type_cn = road_data.get('路面类型(沥青/水泥/砂石)', '沥青') first_pile_no = round(road_data.get('起桩号(米)',0) / 1000, 3) last_pile_no = round(road_data.get('止桩号(米)', 0) / 1000, 3) group_list = [] # 上行/下行 if up_or_down == '下行' : group_list = summary_data list(reversed(group_list)) increment = float(-0.001) * interval tmp_pile_no = last_pile_no pile_no_start = tmp_pile_no if tmp_pile_no % increment != 0 : pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment)) pile_no_end = round(pile_no_end, 3) #处理精度问题 else : pile_no_end = tmp_pile_no + increment pile_no_end = round(pile_no_end, 3) #处理精度问题 else : group_list = summary_data increment = float(0.001) * interval pile_no_start = first_pile_no pile_no_end = pile_no_start + increment index = 0 tmp_start = pile_no_start tmp_end = pile_no_end while True : # 上行 if (tmp_start < last_pile_no and tmp_end > last_pile_no) : tmp_end = last_pile_no if tmp_start >= last_pile_no and tmp_end >= last_pile_no : break # 下行 if (tmp_start > first_pile_no and tmp_end < first_pile_no) : tmp_end = first_pile_no if tmp_start <= first_pile_no and tmp_end <= first_pile_no : break print(f"process_info:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}") if index >= len(group_list) : # 无灾害数据直接输出无病害数据 pile_no_s = format_number_to_k_code(tmp_start) pile_no_e = format_number_to_k_code(tmp_end) up_or_down_code = "B" if up_or_down == '下行' else "A" row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','',''] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) process_info_data.append(row) else : # 每10m一个区间,在区间内进行灾害计算 pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) pile_no_s = format_number_to_k_code(tmp_start) pile_no_e = format_number_to_k_code(tmp_end) up_or_down_code = "B" if up_or_down == '下行' else "A" in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end) if in_interval_val == 0 : # 没在刻度内直接输出无病害数据 row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','',''] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) # for k in keys: # row.append(f"{0:.2f}") process_info_data.append(row) # f.write(','.join(row)+'\n') elif in_interval_val == 1 : row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'',''] subRows = [] while index < len(group_list): pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) tmp_row = [] in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end) if in_interval_val == 1 : tmp_row = [DR] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) # for k in keys: # tmp_row.append(counts.get(k, [0,0,0])[0]) subRows.append(tmp_row) index = index + 1 else : break # 同列汇总 10m一个区间--对应5张图 tmp_interval = round((tmp_end - tmp_start) * 1000) column_sums = [f"{(sum(column)/(tmp_interval / 2)):0.2f}" for column in zip(*subRows)] row += column_sums # f.write(','.join(row)+'\n') row += ['','','','','','','','','','','','','','',''] process_info_data.append(row) else : break; tmp_start = tmp_end tmp_end = tmp_start + increment tmp_end = round(tmp_end, 3) return process_info_data def adjust_column_width(worksheet): """ 调整列宽函数 """ for col_idx in range(1, worksheet.max_column + 1): max_length = 0 column_letter = get_column_letter(col_idx) for row_idx in range(1, worksheet.max_row + 1): try: cell = worksheet.cell(row=row_idx, column=col_idx) # 检查单元格是否在合并单元格范围内 if any(cell.coordinate in merged_range for merged_range in worksheet.merged_cells.ranges): continue # 跳过合并单元格 if cell.value is not None: cell_length = len(str(cell.value)) if cell_length > max_length: max_length = cell_length except: continue if max_length > 0: worksheet.column_dimensions[column_letter].width = max_length + 2 def create_multiple_sheets_with_multiple_headers(file_path, excel_data): # 创建工作簿 wb = Workbook() # 删除默认创建的sheet wb.remove(wb.active) for sheet_data in excel_data: # 创建工作表 ws = wb.create_sheet(title=sheet_data['name']) # 设置多表头 current_row = 1 # 多级表头 for heads in sheet_data['headers'] : for head in heads : ws.merge_cells(f'{head[1]}{current_row}:{head[2]}{current_row}') ws[f'{head[1]}{current_row}'] = head[0] ws[f'{head[1]}{current_row}'].font = Font(bold=True, size=14) ws[f'{head[1]}{current_row}'].alignment = Alignment(horizontal='center') current_row += 1 # 列标题 for col_idx, column_name in enumerate(sheet_data['columns'], 1): cell = ws.cell(row=current_row, column=col_idx, value=column_name) cell.font = Font(bold=True) current_row += 1 # 写入数据 for row_data in sheet_data['data']: for col_idx, value in enumerate(row_data, 1): ws.cell(row=current_row, column=col_idx, value=value) current_row += 1 # 自动调整列宽 adjust_column_width(ws) # 保存文件 wb.save(file_path) print(f"文件已保存为 {file_path}") def format_number_three_decimal_places(number) : """ 将数字转换为 三位小数字符串 """ return f"{number:0.3f}" def format_number_to_k_code(number): """ 将数字转换为 K0000+000 格式 """ num_str = str(number) # 处理整数情况(如 1.0 -> 1) if '.' not in num_str: integer_part = int(num_str) decimal_part = "000" else: integer_part, decimal_part = num_str.split('.') integer_part = int(integer_part) # 确保小数部分有3位,不足补0 decimal_part = decimal_part.ljust(3, '0')[:3] return f"K{integer_part:04d}+{decimal_part}" # ---------------- 主函数-共享目录 ---------------- def process_dir(road_dict,pile_dict,current_time,dir="output",cell_area=CELL_AREA,cell_width=CELL_WIDTH,cell_height=CELL_HEIGHT,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): os.makedirs(dir,exist_ok=True) os.makedirs(f"{dir}/DR",exist_ok=True) os.makedirs(f"{dir}/excel",exist_ok=True) os.makedirs(f"{dir}/sum",exist_ok=True) # 识别宽度(米) road_recognize_width = road_dict.get('识别宽度(米)', 3.6) road_recognize_height = ROAD_RECOGNIZE_HEIGHT grid_width = None grid_height = None cell_index_start = None cell_index_end = None # 解压 # 读取桩号映射 # 遍历图片 image_path = None summary_data = [] for root,_,files in os.walk(dir): for file in files: if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) : image_path = os.path.join(root,file) label_file = os.path.splitext(image_path)[0]+".txt" if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue # 读取图片并计算宽高像素点 if grid_width is None or grid_height is None : # 使用公共方法读取图片 img = read_image(image_path) if img is not None : h, w = img.shape[:2] # 网格像素宽高 grid_width = round(w * CELL_WIDTH / road_recognize_width) grid_height = round(h * CELL_WIDTH / road_recognize_height) print(f"通过图片获取到grid_width = {grid_width}, grid_height = {grid_height}") # 道路灾害格子过滤掉两侧数据,只保留每行中间20个格子 cell_index_start = round((road_recognize_width / CELL_WIDTH - RESERVED_CELL_NUM) / 2) cell_index_end = cell_index_start + RESERVED_CELL_NUM out_txt, class_cells, road_type, all_cell_num = yoloseg_to_grid_share_dir( road_dict,pile_dict,image_path,label_file,cell_index_start,cell_index_end,grid_width=grid_width,grid_height=grid_height) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 draw_grid_on_image(image_path,class_cells,cell_size=(grid_width, grid_height),save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 # counts = {k:[len(v[0])*cell_area, v[1][0], v[1][1]] for k,v in class_cells.items()} counts = {k:[[len(v_child.get("covered_cells"))*cell_area, v_child.get("width"), v_child.get("height")] for v_child in v] for k,v in class_cells.items()} # total_area = sum(counts.values()) # 灾害总面积比例 merged_set = set([]) for k,v in class_cells.items() : # merged_set = merged_set.union(v[0]) for v_child in v: merged_set = merged_set.union(v_child.get("covered_cells")) total_area = len(merged_set) # 桩号 路线编号 parts = pile_dict.get(file) pile_no = "0+000" if parts : pile_no = parts[1] # 破损率 DR (%) = total_area / 总面积 # DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0 DR= total_area / all_cell_num * 100 summary_data.append((pile_no, DR, counts, road_type)) # classes.txt process_classes_txt(road_dict, pile_dict, dir, image_path) # 写桩号问题列表.txt process_damage_detail_txt(road_dict, pile_dict, dir, summary_data, current_time) # 灾害数据.txt process_damage_txt(road_dict, pile_dict, dir, summary_data, current_time) # 病害明细列表.xlsx process_damage_detail_excel(road_dict, pile_dict, dir, cell_area, cell_width, cell_height, summary_data) # 综合明细表.xlsx process_damage_composite_excel(road_dict, pile_dict, dir, summary_data, current_time) def process_classes_txt(road_dict, pile_dict, dir, image_path): img_file_name = os.path.basename(image_path) _, _, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) classes_txt_content = generate_classes_txt_content(road_type) with open(f"{dir}/classes.txt", "w") as file: file.write(classes_txt_content) def process_damage_composite_excel(road_dict, pile_dict, dir, summary_data, current_time): print("输出:综合明细表.xlsx") heads = ['路线编码','起点','终点','车道编码','上下行','公路等级','路面类型','PQI','PQI等级','DR(%)','PCI','PCI等级','IRI','RQI','RQI等级','RD','RDI','RDI等级','SMTD','PBI','PBI等级','WR','PWI','PWI等级','备注'] data1 = process_info(road_dict,pile_dict,summary_data,current_time,10) data2 = process_info(road_dict,pile_dict,summary_data,current_time,100) data3 = process_info(road_dict,pile_dict,summary_data,current_time,1000) road_code = get_road_code(pile_dict) excel_data = [ { 'name': '综合明细十米', 'headers': [[['综合明细十米','A','Y']]], 'columns': heads, 'data': data1 }, { 'name': '综合明细百米', 'headers': [[['综合明细百米','A','Y']]], 'columns': heads, 'data': data2 }, { 'name': '综合明细千米', 'headers': [[['综合明细千米','A','Y']]], 'columns': heads, 'data': data3 } ] out_file = f"{dir}/excel/{road_code}-综合明细表.xlsx" out_file_sum = f"{dir}/sum/{road_code}-综合明细表.xlsx" create_multiple_sheets_with_multiple_headers(out_file, excel_data) # 将文件复制到汇总目录 shutil.copy(out_file, out_file_sum) def process_damage_detail_txt(road_dict, pile_dict, dir, summary_data, current_time): if summary_data: img_file_name = list(pile_dict.keys())[0] road_data = get_road_info(road_dict, pile_dict, img_file_name) road_code = pile_dict.get(img_file_name)[0] road_type = summary_data[0][3] min_pile, max_pile = get_min_max_pile(summary_data) print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt") os.makedirs(f"{dir}/DR", exist_ok=True) out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') for pile_no,DR,counts,rt in summary_data: row = [pile_no,"3.6",f"{DR:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: # row.append(f"{counts.get(k,[0,0,0])[0]:.2f}") sum_count = sum(count[0] for count in counts.get(k, [[0, 0, 0]])) row.append(f"{sum_count:.2f}") f.write(','.join(row)+'\n') print(f"输出完成: {out_file}") def process_damage_detail_excel(road_dict, pile_dict, dir, cell_area, cell_width, cell_height, summary_data): print("输出:病害明细列表.xlsx") os.makedirs(f"{dir}/excel", exist_ok=True) headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注'] data_list = [] if summary_data: img_file_name = list(pile_dict.keys())[0] road_data = get_road_info(road_dict, pile_dict, img_file_name) road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') excel_index = 1 for data in summary_data: damage_data = data[2] for attr_name, attr_value in damage_data.items(): for attr_value_child in attr_value : excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value_child[1]*cell_width, attr_value_child[2]*cell_height, attr_value_child[0], '', ''] data_list.append(excel_data) all_data = [headers] + data_list road_code = get_road_code(pile_dict) out_file = f"{dir}/excel/{road_code}-病害明细列表.xlsx" out_file_sum = f"{dir}/sum/{road_code}-病害明细列表.xlsx" smb_tool.write_to_excel_pandas(all_data, out_file) # 将文件复制到汇总目录 shutil.copy(out_file, out_file_sum) def process_damage_txt(road_dict, pile_dict, dir, summary_data, current_time): if summary_data: img_file_name = list(pile_dict.keys())[0] road_data = get_road_info(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') road_code = pile_dict.get(img_file_name)[0] first_pile_no = round(road_data.get('起桩号(米)',0) / 1000, 3) last_pile_no = round(road_data.get('止桩号(米)', 0) / 1000, 3) group_by_road_type = {} for data in summary_data: group_by_road_type.setdefault(data[3], []).append(data) for road_type, group in group_by_road_type.items(): min_pile, max_pile = get_min_max_pile(group) print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt") out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt") out_file_sum = os.path.join(f"{dir}/sum",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt") header = generate_header(road_type) group_list = [] # 上行/下行 if up_or_down == '下行' : group_list = group list(reversed(group_list)) increment = float(-0.010) tmp_pile_no = last_pile_no pile_no_start = tmp_pile_no if tmp_pile_no % increment != 0 : pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment)) pile_no_end = round(pile_no_end, 3) #处理精度问题 else : pile_no_end = tmp_pile_no + increment pile_no_end = round(pile_no_end, 3) #处理精度问题 else : group_list = group increment = float(0.010) pile_no_start = first_pile_no pile_no_end = pile_no_start + increment with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') index = 0 tmp_start = pile_no_start tmp_end = pile_no_end while True : # 上行 if (tmp_start < last_pile_no and tmp_end > last_pile_no) : tmp_end = last_pile_no if tmp_start >= last_pile_no and tmp_end >= last_pile_no : break # 下行 if (tmp_start > first_pile_no and tmp_end < first_pile_no) : tmp_end = first_pile_no if tmp_start <= first_pile_no and tmp_end <= first_pile_no : break print(f"process_dir:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}") if index >= len(group_list) : # 无灾害直接输出无病害数据 pile_no = format_number_three_decimal_places(tmp_start) row = [pile_no,f"{identify_width}",f"{0:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{0:.2f}") f.write(','.join(row)+'\n') else : # 每10m一个区间,在区间内进行灾害计算 pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end) if in_interval_val == 0 : # 没在刻度内直接输出无病害数据 pile_no = format_number_three_decimal_places(tmp_start) row = [pile_no,f"{identify_width}",f"{0:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{0:.2f}") f.write(','.join(row)+'\n') elif in_interval_val == 1 : row = [format_number_three_decimal_places(tmp_start), f"{identify_width}"] subRows = [] while index < len(group_list): pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) tmp_row = [] in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end) if in_interval_val == 1 : tmp_row = [DR] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: # tmp_row.append(counts.get(k, [0,0,0])[0]) sum_count = sum(count[0] for count in counts.get(k, [[0, 0, 0]])) tmp_row.append(sum_count) subRows.append(tmp_row) index = index + 1 else : break # 同列汇总 10m一个区间--对应5张图 tmp_interval = round((tmp_end - tmp_start) * 1000) column_sums = [f"{(sum(column)/tmp_interval):0.2f}" for column in zip(*subRows)] row += column_sums f.write(','.join(row)+'\n') else : break tmp_start = tmp_end tmp_end = tmp_start + increment tmp_end = round(tmp_end, 3) # 将文件复制到汇总目录 shutil.copy(out_file, out_file_sum) print(f"输出完成: {out_file}") # ---------------- 主函数 ---------------- def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): if not os.path.exists(zip_path): raise FileNotFoundError(f"{zip_path} 不存在") os.makedirs(output_dir,exist_ok=True) # 解压 with zipfile.ZipFile(zip_path,'r') as zip_ref: zip_ref.extractall(output_dir) # 读取桩号映射 pile_dict = {} with open(pile_map_file,'r',encoding='utf-8') as f: for line in f: parts = line.strip().split("->") if len(parts)>=4: pile_dict[parts[3]]=parts # filename -> 桩号 # 遍历图片 summary_data = [] for root,_,files in os.walk(output_dir): for file in files: if file.lower().endswith(("_grid.jpg","_grid.png","_grid.jpeg","_grid.bmp")) : continue if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) : image_path = os.path.join(root,file) label_file = os.path.splitext(image_path)[0]+".txt" if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue out_txt, class_cells, road_type, = yoloseg_to_grid(image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 counts = {k:len(v)*cell_area for k,v in class_cells.items()} total_area = sum(counts.values()) # 桩号 pile_no = pile_dict.get(file,"0+000") # 破损率 DR (%) = total_area / 总面积 DR = total_area / (total_area if total_area > 0 else 1) * 100 # 简化为100%或者0 summary_data.append((pile_no, DR, counts, road_type)) # 写桩号问题列表.txt if summary_data: road_type = summary_data[0][3] out_file = os.path.join(output_dir,"桩号问题列表.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') for pile_no,DR,counts,rt in summary_data: row = [pile_no,"3.6",f"{DR:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{counts.get(k,0):.2f}") f.write(','.join(row)+'\n') print(f"✅ 输出完成: {out_file}") # 路线编码 -> 路线信息 def get_road_dict(local_dir): """ 从本地目录读取Excel文件,构建路线字典 Args: local_dir: 本地目录路径 Returns: dict: 路线编码到路线信息的映射字典 """ # 查找匹配的Excel文件 pattern = os.path.join(local_dir, '每公里指标明细表*.xls*') found_paths = glob.glob(pattern) print(f"\n找到 {len(found_paths)} 个 '每公里指标明细表*.xls*' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") road_dict = {} if len(found_paths) > 0: # 读取第一个匹配的Excel文件 df = pd.read_excel(found_paths[0]) # 处理所有行(这里需要根据实际情况调整处理逻辑) for index, row in df.iterrows(): data = row.to_dict() if pd.notna(data.get('线路编码', None)): up_or_down = 'A' if data.get('方向(上行/下行)', '') == '下行': up_or_down = 'B' # 构建key,确保区划代码为整数 area_code = data.get('区划代码', '') if pd.notna(area_code): area_code = str(int(float(area_code))) if str(area_code).replace('.', '').isdigit() else str(area_code) else: area_code = '' key = f"{data['线路编码']}{area_code}{up_or_down}" if key in road_dict: road_dict[key].append({'index': index, 'data': data}) else: road_dict[key] = [{'index': index, 'data': data}] return road_dict # filename -> 桩号 def get_pile_dict(local_dir): """ 从本地目录读取fileindex.txt文件,构建桩号字典 Args: local_dir: 本地目录路径 Returns: dict: 文件名到桩号信息的映射字典 """ # 查找fileindex.txt文件 pattern = os.path.join(local_dir, 'fileindex.txt') found_paths = glob.glob(pattern) print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") pile_dict = {} if len(found_paths) > 0: # 读取第一个匹配的txt文件 with open(found_paths[0], 'r', encoding='utf-8') as file: lines = file.readlines() for i, line in enumerate(lines, 1): parts = line.strip().split("->") if len(parts) >= 4: pile_dict[parts[3]] = parts # filename -> 桩号 return pile_dict # ---------------- 示例调用 ---------------- if __name__=="__main__": # zip_path = "D:/devForBdzlWork/ai-train_platform/predict/inferenceResult.zip" # 输入 ZIP 文件 # pile_map_file = "D:/devForBdzlWork/ai-train_platform/predict/pile_map.txt" # 图片名 -> 桩号 # process_zip(zip_path=zip_path,pile_map_file=pile_map_file,output_dir="output") # output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" # pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") # road_dict = smb.get_road_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") # process_dir(road_dict, pile_dict, output_dir) # calc_cell_area, calc_grid_width, calc_grid_height = calc_grid_param(2048, 4096, 3.6, 2) # print(f"calc_cell_area={calc_cell_area}, calc_grid_width={calc_grid_width}, calc_grid_height={calc_grid_height}") image_path = r"D:\devForBdzlWork\ai-train_platform\predictions\C005500155A\20250515-075419-696.jpg" road_recognize_width = 3.6 road_recognize_height = 2 grid_width = None grid_height = None cell_index_start = None cell_index_end = None # 读取图片并计算宽高像素点 if grid_width is None or grid_height is None : img = cv2.imread(image_path) if img is not None : h, w = img.shape[:2] # 网格像素宽高 grid_width = round(w * CELL_WIDTH / road_recognize_width) grid_height = round(h * CELL_WIDTH / road_recognize_height) print(f"通过图片获取到grid_width = {grid_width}, grid_height = {grid_height}") # 道路灾害格子过滤掉两侧数据,只保留每行中间20个格子 cell_index_start = round((road_recognize_width / CELL_WIDTH - RESERVED_CELL_NUM) / 2) cell_index_end = cell_index_start + RESERVED_CELL_NUM output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/jlp/C006500107A" pile_dict = get_pile_dict(output_dir) road_dict = get_road_dict(output_dir) current_time = datetime.now().strftime("%Y%m%d%H%M%S") process_dir(road_dict, pile_dict, current_time, output_dir) # arr = [44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 68, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179] # for a in arr : # print(f"a = {a} x = {a % 37} y = {int(a / 37)}")