ai-train_platform/util/yolo2pix_new.py

807 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import zipfile
import shutil
import cv2
import numpy as np
from collections import defaultdict
import smb
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment
from openpyxl.utils import get_column_letter
import glob
from datetime import datetime
# ---------------- 常量 ----------------
CELL_AREA = 0.0036 # 每格面积 (平方米)
GRID_WIDTH = 108 # 网格像素宽
GRID_HEIGHT = 102 # 网格像素高
COVER_RATIO = 0.01 # mask 覆盖比例阈值
# ---------------- 路面类别映射 ----------------
CLASS_MAP_ASPHALT = {
"龟裂":0,"块状裂缝":1,"纵向裂缝":2,"横向裂缝":3,"沉陷":4,"车辙":5,"波浪拥包":6,"坑槽":7,"松散":8,"泛油":9,"修补":10
}
CLASS_MAP_CEMENT = {
"破碎板":0,"裂缝":1,"板角断裂":2,"错台":3,"拱起":4,"边角剥落":5,"接缝料损坏":6,"坑洞":7,"唧泥":8,"露骨":9,"修补":10
}
CLASS_MAP_GRAVEL = {
"坑槽":0,"沉陷":1,"车辙":2,"波浪搓板":3
}
ROAD_TYPE_EN_TO_CN = {
"asphalt":"沥青",
"cement":"水泥",
"gravel":"砾石"
}
# ---------------- 工具函数 ----------------
def num_to_coord(num, cols, cell_w, cell_h):
n = num - 1
r, c = divmod(n, cols)
x1, y1 = c * cell_w, r * cell_h
x2, y2 = x1 + cell_w, y1 + cell_h
return x1, y1, x2, y2
def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGHT), save_path=None):
image = cv2.imread(image_path)
if image is None: return
h, w = image.shape[:2]
cell_w, cell_h = cell_size
cols = w // cell_w
overlay = image.copy()
for cname, nums in grid_cells.items():
color = (np.random.randint(64,255),np.random.randint(64,255),np.random.randint(64,255))
for num in nums:
x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h)
cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1)
cv2.addWeighted(overlay,0.4,image,0.6,0,image)
for i in range(0, w, cell_w):
cv2.line(image,(i,0),(i,h),(100,100,100),1)
for j in range(0, h, cell_h):
cv2.line(image,(0,j),(w,j),(100,100,100),1)
if save_path: cv2.imwrite(save_path,image)
return image
def detect_road_type_from_content(label_file):
"""根据标签内容判断路面类型"""
try:
with open(label_file,'r',encoding='utf-8') as f:
content = f.read()
except:
return "gravel"
for kw in CLASS_MAP_ASPHALT.keys():
if kw in content: return "asphalt"
for kw in CLASS_MAP_CEMENT.keys():
if kw in content: return "cement"
for kw in CLASS_MAP_GRAVEL.keys():
if kw in content: return "gravel"
return "gravel"
def get_road_info(road_dict, pile_dict, img_file_name):
"""获取路线信息"""
parts = pile_dict.get(img_file_name)
if parts :
road_code = parts[0]
road_info = road_dict.get(road_code)
if road_info :
data = road_info['data']
return data
return {}
def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name):
"""根据读取的excel内容内容判断路面类型"""
road_code = 'xxxxxxxxxxx'
pile_no = "xxxxx"
road_type = "asphalt"
parts = pile_dict.get(img_file_name)
if parts :
road_code = parts[0]
pile_no = parts[1]
road_info = road_dict.get(road_code)
if road_info :
data = road_info['data']
pile_no = parts[1]
road_type_cn = data['路面类型(沥青/水泥/砂石)']
identify_width = data['识别宽度(米)']
if road_type_cn == '沥青' :
road_type = "asphalt"
elif road_type_cn == '水泥' :
road_type = "cement"
elif road_type_cn == '砾石' :
road_type = "gravel"
return road_code, pile_no, road_type
def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,cover_ratio=COVER_RATIO):
"""将YOLO-Seg标签转换成格子编号和类别"""
img_file_name = os.path.basename(image_path)
road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name)
if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT
elif road_type=="cement": class_map = CLASS_MAP_CEMENT
else: class_map = CLASS_MAP_GRAVEL
class_names = list(class_map.keys())
img = cv2.imread(image_path)
if img is None: return "", {}
h, w = img.shape[:2]
cols = max(1, w//GRID_WIDTH)
rows = max(1, h//GRID_HEIGHT)
result_lines = []
all_class_cells = {}
with open(label_file,'r',encoding='utf-8') as f:
for line in f:
parts = line.strip().split()
if len(parts)<5: continue
cls_id = int(parts[0])
coords = [float(x) for x in parts[1:]]
if len(coords)%2!=0: coords=coords[:-1]
if len(coords)<6: continue
poly = np.array(coords,dtype=np.float32).reshape(-1,2)
poly[:,0]*=w
poly[:,1]*=h
mask = np.zeros((h,w),dtype=np.uint8)
cv2.fillPoly(mask,[poly.astype(np.int32)],255)
cell_info = []
covered_cells=[]
min_x = cols
min_y = rows
max_x = 0
max_y = 0
for r in range(rows):
for c in range(cols):
x1,y1 = c*GRID_WIDTH, r*GRID_HEIGHT
x2,y2 = min(w,x1+GRID_WIDTH), min(h,y1+GRID_HEIGHT)
region = mask[y1:y2, x1:x2]
if np.count_nonzero(region)/region.size>cover_ratio:
covered_cells.append(r*cols+c+1)
# 最小x坐标y坐标
if min_x > c :
min_x = c
if min_y > r :
min_y = r
# 最大x坐标y坐标
if max_x < c + 1 :
max_x = c + 1
if max_y < r + 1 :
max_y = r + 1
if not covered_cells: continue
# min_cell = covered_cells[0]
# max_cell = covered_cells[len(covered_cells)-1]
cname = class_names[cls_id] if cls_id<len(class_names) else str(cls_id)
ids_str = '-'.join(map(str,sorted(covered_cells)))+'-'
pile_no_tmp = f"桩号:K000{pile_no}"
result_lines.append(f"{cname} {pile_no_tmp} {ROAD_TYPE_EN_TO_CN.get(road_type, 'xx')} {ids_str}")
if cname not in all_class_cells: all_class_cells[cname]=set()
cell_info.append(covered_cells)
cell_info.append([max_x - min_x, max_y - min_y])
all_class_cells[cname] = cell_info
return '\n'.join(result_lines), all_class_cells, road_type, rows * cols
def yoloseg_to_grid(image_path,label_file,cover_ratio=COVER_RATIO):
"""将YOLO-Seg标签转换成格子编号和类别"""
road_type = detect_road_type_from_content(label_file)
if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT
elif road_type=="cement": class_map = CLASS_MAP_CEMENT
else: class_map = CLASS_MAP_GRAVEL
class_names = list(class_map.keys())
img = cv2.imread(image_path)
if img is None: return "", {}
h, w = img.shape[:2]
cols = max(1, w//GRID_WIDTH)
rows = max(1, h//GRID_HEIGHT)
result_lines = []
all_class_cells = {}
with open(label_file,'r',encoding='utf-8') as f:
for line in f:
parts = line.strip().split()
if len(parts)<5: continue
cls_id = int(parts[0])
coords = [float(x) for x in parts[1:]]
if len(coords)%2!=0: coords=coords[:-1]
if len(coords)<6: continue
poly = np.array(coords,dtype=np.float32).reshape(-1,2)
poly[:,0]*=w
poly[:,1]*=h
mask = np.zeros((h,w),dtype=np.uint8)
cv2.fillPoly(mask,[poly.astype(np.int32)],255)
covered_cells=[]
for r in range(rows):
for c in range(cols):
x1,y1 = c*GRID_WIDTH, r*GRID_HEIGHT
x2,y2 = min(w,x1+GRID_WIDTH), min(h,y1+GRID_HEIGHT)
region = mask[y1:y2, x1:x2]
if np.count_nonzero(region)/region.size>cover_ratio:
covered_cells.append(r*cols+c+1)
if not covered_cells: continue
cname = class_names[cls_id] if cls_id<len(class_names) else str(cls_id)
ids_str = '-'.join(map(str,sorted(covered_cells)))+'-'
result_lines.append(f"{cname} {ids_str}")
if cname not in all_class_cells: all_class_cells[cname]=set()
all_class_cells[cname].update(covered_cells)
return '\n'.join(result_lines), all_class_cells, road_type
def generate_header(road_type):
if road_type=="asphalt": return "起点桩号(km),识别宽度(m),破损率DR(%),龟裂,块状裂缝,纵向裂缝,横向裂缝,沉陷,车辙,波浪拥包,坑槽,松散,泛油,修补"
if road_type=="cement": return "起点桩号(km),识别宽度(m),破损率DR(%),破碎板,裂缝,板角断裂,错台,拱起,边角剥落,接缝料损坏,坑洞,唧泥,露骨,修补"
if road_type=="gravel": return "起点桩号(km),识别宽度(m),破损率DR(%),坑槽,沉陷,车辙,波浪搓板"
return ""
def get_min_max_pile(group) :
min_pile = float(99.000)
max_pile = float(0.000)
for g in group :
tmp_pile = convert_special_format(g[0])
if min_pile > tmp_pile :
min_pile = tmp_pile
if max_pile < tmp_pile :
max_pile = tmp_pile
return min_pile, max_pile
def convert_special_format(input_str):
"""
将特殊格式字符串转换为浮点数
支持格式: "0+022" -> 0.022, "1+234" -> 1.234
"""
if '+' in input_str:
# 分割整数部分和小数部分
parts = input_str.split('+')
if len(parts) == 2:
integer_part = parts[0]
decimal_part = parts[1]
# 构建标准小数格式
standard_format = f"{integer_part}.{decimal_part}"
return float(standard_format)
else:
raise ValueError(f"无效的格式: {input_str}")
else:
# 如果没有 '+',直接转换
return float(input_str)
# 是否在区间内
def in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
if increment > 0 :
if cur_pile_no >= tmp_start and cur_pile_no < tmp_end :
return True
else :
return False
else :
if cur_pile_no > tmp_end and cur_pile_no <= tmp_start :
return True
else :
return False
# 指定间隔区间,输出基本信息+灾害数据
def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interval=10,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT) :
process_info_data = []
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
road_code = pile_dict.get(img_file_name)[0]
road_level = road_data.get('技术等级', '')
road_type_cn = road_data.get('路面类型(沥青/水泥/砂石)', '沥青')
group_list = summary_data
increment = float(0.001 * interval)
pile_no_start = float(0.000)
pile_no_end = float(0.000) + increment
# 上行/下行
if up_or_down == '下行' :
group_list = list(summary_data)[::-1]
increment = float(-0.001 * interval)
tmp_pile_no = convert_special_format(group_list[len(group_list)-1][1])
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no + (tmp_pile_no % increment)
else :
pile_no_end = tmp_pile_no + increment
index = 0
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
pile_no_s = f"{tmp_start:0.3f}"
pile_no_e = f"{tmp_end:0.3f}"
up_or_down_code = "B" if up_or_down == '下行' else "A"
if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
# 没在刻度内直接输出无病害数据
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','','']
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
# for k in keys:
# row.append(f"{0:.2f}")
process_info_data.append(row)
# f.write(','.join(row)+'\n')
else :
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','']
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
if in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
pile_no = f"{tmp_start:0.3f}"
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
# for k in keys:
# tmp_row.append(counts.get(k, [0,0,0])[0])
subRows.append(tmp_row)
index = index + 1
else :
break
# 同列汇总 10m一个区间--对应5张图
column_sums = [f"{(sum(column)/(interval / 2)):0.2f}" for column in zip(*subRows)]
row += column_sums
# f.write(','.join(row)+'\n')
row += ['','','','','','','','','','','','','','','']
process_info_data.append(row)
tmp_start = tmp_end
tmp_end = tmp_start + increment
print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
return process_info_data
def adjust_column_width(worksheet):
"""
调整列宽函数
"""
for col_idx in range(1, worksheet.max_column + 1):
max_length = 0
column_letter = get_column_letter(col_idx)
for row_idx in range(1, worksheet.max_row + 1):
try:
cell = worksheet.cell(row=row_idx, column=col_idx)
# 检查单元格是否在合并单元格范围内
if any(cell.coordinate in merged_range for merged_range in worksheet.merged_cells.ranges):
continue # 跳过合并单元格
if cell.value is not None:
cell_length = len(str(cell.value))
if cell_length > max_length:
max_length = cell_length
except:
continue
if max_length > 0:
worksheet.column_dimensions[column_letter].width = max_length + 2
def create_multiple_sheets_with_multiple_headers(file_path, excel_data):
# 创建工作簿
wb = Workbook()
# 删除默认创建的sheet
wb.remove(wb.active)
for sheet_data in excel_data:
# 创建工作表
ws = wb.create_sheet(title=sheet_data['name'])
# 设置多表头
current_row = 1
# 多级表头
for heads in sheet_data['headers'] :
for head in heads :
ws.merge_cells(f'{head[1]}{current_row}:{head[2]}{current_row}')
ws[f'{head[1]}{current_row}'] = head[0]
ws[f'{head[1]}{current_row}'].font = Font(bold=True, size=14)
ws[f'{head[1]}{current_row}'].alignment = Alignment(horizontal='center')
current_row += 1
# 列标题
for col_idx, column_name in enumerate(sheet_data['columns'], 1):
cell = ws.cell(row=current_row, column=col_idx, value=column_name)
cell.font = Font(bold=True)
current_row += 1
# 写入数据
for row_data in sheet_data['data']:
for col_idx, value in enumerate(row_data, 1):
ws.cell(row=current_row, column=col_idx, value=value)
current_row += 1
# 自动调整列宽
adjust_column_width(ws)
# 保存文件
wb.save(file_path)
print(f"文件已保存为 {file_path}")
# ---------------- 主函数-共享目录 ----------------
def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT):
os.makedirs(dir,exist_ok=True)
# 解压
# 读取桩号映射
# 遍历图片
summary_data = []
for root,_,files in os.walk(dir):
for file in files:
if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) :
image_path = os.path.join(root,file)
label_file = os.path.splitext(image_path)[0]+".txt"
if not os.path.exists(label_file):
print(f"⚠️ 找不到标签: {label_file}")
continue
out_txt, class_cells, road_type, all_cell_num = yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file)
# 写每张图独立 _grid.txt
grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt"
with open(grid_txt_path,'w',encoding='utf-8') as f:
f.write(out_txt)
# 生成网格可视化
# draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg")
# 统计各类面积
counts = {k:[len(v[0])*cell_area, v[1][0], v[1][1]] for k,v in class_cells.items()}
# total_area = sum(counts.values())
# 灾害总面积比例
merged_set = set([])
for k,v in class_cells.items() :
merged_set = merged_set.union(v[0])
total_area = len(merged_set)
# 桩号 路线编号
parts = pile_dict.get(file)
pile_no = "0+000"
if parts :
pile_no = parts[1]
# 破损率 DR (%) = total_area / 总面积
# DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0
DR= total_area / all_cell_num * 100
summary_data.append((pile_no, DR, counts, road_type))
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
# 写桩号问题列表.txt
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
road_code = pile_dict.get(img_file_name)[0]
road_type = summary_data[0][3]
min_pile, max_pile = get_min_max_pile(summary_data)
out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
header = generate_header(road_type)
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
for pile_no,DR,counts,rt in summary_data:
row = [pile_no,"3.6",f"{DR:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{counts.get(k,[0,0,0])[0]:.2f}")
f.write(','.join(row)+'\n')
print(f"输出完成: {out_file}")
# 写灾害数据.txt
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
road_code = pile_dict.get(img_file_name)[0]
group_by_road_type = {}
for data in summary_data:
group_by_road_type.setdefault(data[3], []).append(data)
for road_type, group in group_by_road_type.items():
min_pile, max_pile = get_min_max_pile(group)
out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
header = generate_header(road_type)
group_list = group
increment = float(0.010)
pile_no_start = float(0.000)
pile_no_end = float(0.000) + increment
# 上行/下行
if up_or_down == '下行' :
group_list = list(group)[::-1]
increment = float(-0.010)
tmp_pile_no = convert_special_format(group_list[len(group_list)-1][1])
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no + (tmp_pile_no % increment)
else :
pile_no_end = tmp_pile_no + increment
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
index = 0
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
# 没在刻度内直接输出无病害数据
pile_no = f"{tmp_start:0.3f}"
row = [pile_no,identify_width,f"{0:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{0:.2f}")
f.write(','.join(row)+'\n')
else :
row = [f"{tmp_start:0.3f}", identify_width]
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
if in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
pile_no = f"{tmp_start:0.3f}"
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
tmp_row.append(counts.get(k, [0,0,0])[0])
subRows.append(tmp_row)
index = index + 1
else :
break
# 同列汇总 10m一个区间--对应5张图
column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)]
row += column_sums
f.write(','.join(row)+'\n')
tmp_start = tmp_end
tmp_end = tmp_start + increment
print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
print(f"输出完成: {out_file}")
# 病害明显列表.xlsx
headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注']
data_list = []
if summary_data:
img_file_path = os.path.dirname(image_path)
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
excel_index = 1
for data in summary_data:
damage_data = data[2]
for attr_name, attr_value in damage_data.items():
excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value[1]*cell_area, attr_value[2]*cell_area, attr_value[0], '', '']
data_list.append(excel_data)
all_data = [headers] + data_list
smb.write_to_excel_pandas(all_data, img_file_path + '/病害明细列表.xlsx')
# 综合明细表.xlsx
heads = ['路线编码','起点','终点','车道编码','上下行','公路等级','路面类型','PQI','PQI等级','DR(%)','PCI','PCI等级','IRI','RQI','RQI等级','RD','RDI','RDI等级','SMTD','PBI','PBI等级','WR','PWI','PWI等级','备注']
data1 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,10)
data2 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,100)
data3 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,1000)
excel_data = [
{
'name': '综合明细十米',
'headers': [[['综合明细十米','A','Y']]],
'columns': heads,
'data': data1
},
{
'name': '综合明细百米',
'headers': [[['综合明细百米','A','Y']]],
'columns': heads,
'data': data2
},
{
'name': '综合明细千米',
'headers': [[['综合明细千米','A','Y']]],
'columns': heads,
'data': data3
}
]
create_multiple_sheets_with_multiple_headers(img_file_path + '/综合明细表.xlsx', excel_data)
# ---------------- 主函数 ----------------
def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT):
if not os.path.exists(zip_path):
raise FileNotFoundError(f"{zip_path} 不存在")
os.makedirs(output_dir,exist_ok=True)
# 解压
with zipfile.ZipFile(zip_path,'r') as zip_ref:
zip_ref.extractall(output_dir)
# 读取桩号映射
pile_dict = {}
with open(pile_map_file,'r',encoding='utf-8') as f:
for line in f:
parts = line.strip().split("->")
if len(parts)>=4:
pile_dict[parts[3]]=parts # filename -> 桩号
# 遍历图片
summary_data = []
for root,_,files in os.walk(output_dir):
for file in files:
if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) :
image_path = os.path.join(root,file)
label_file = os.path.splitext(image_path)[0]+".txt"
if not os.path.exists(label_file):
print(f"⚠️ 找不到标签: {label_file}")
continue
out_txt, class_cells, road_type, = yoloseg_to_grid(image_path,label_file)
# 写每张图独立 _grid.txt
grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt"
with open(grid_txt_path,'w',encoding='utf-8') as f:
f.write(out_txt)
# 生成网格可视化
draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg")
# 统计各类面积
counts = {k:len(v)*cell_area for k,v in class_cells.items()}
total_area = sum(counts.values())
# 桩号
pile_no = pile_dict.get(file,"0+000")
# 破损率 DR (%) = total_area / 总面积
DR = total_area / (total_area if total_area > 0 else 1) * 100 # 简化为100%或者0
summary_data.append((pile_no, DR, counts, road_type))
# 写桩号问题列表.txt
if summary_data:
road_type = summary_data[0][3]
out_file = os.path.join(output_dir,"桩号问题列表.txt")
header = generate_header(road_type)
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
for pile_no,DR,counts,rt in summary_data:
row = [pile_no,"3.6",f"{DR:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{counts.get(k,0):.2f}")
f.write(','.join(row)+'\n')
print(f"✅ 输出完成: {out_file}")
# 路线编码 -> 路线信息
def get_road_dict(local_dir):
"""
从本地目录读取Excel文件构建路线字典
Args:
local_dir: 本地目录路径
Returns:
dict: 路线编码到路线信息的映射字典
"""
# 查找匹配的Excel文件
pattern = os.path.join(local_dir, '每公里指标明细表*.xls')
found_paths = glob.glob(pattern)
print(f"\n找到 {len(found_paths)}'每公里指标明细表*.xls' 文件:")
for i, path in enumerate(found_paths, 1):
print(f"{i}. {path}")
road_dict = {}
if len(found_paths) > 0:
# 读取第一个匹配的Excel文件
df = pd.read_excel(found_paths[0])
# 处理所有行(这里需要根据实际情况调整处理逻辑)
for index, row in df.iterrows():
data = row.to_dict()
if pd.notna(data.get('线路编码', None)):
up_or_down = 'A'
if data.get('方向(上行/下行)', '') == '下行':
up_or_down = 'B'
# 构建key确保区划代码为整数
area_code = data.get('区划代码', '')
if pd.notna(area_code):
area_code = str(int(float(area_code))) if str(area_code).replace('.', '').isdigit() else str(area_code)
else:
area_code = ''
key = f"{data['线路编码']}{area_code}{up_or_down}"
if key in road_dict:
road_dict[key].append({'index': index, 'data': data})
else:
road_dict[key] = [{'index': index, 'data': data}]
return road_dict
# filename -> 桩号
def get_pile_dict(local_dir):
"""
从本地目录读取fileindex.txt文件构建桩号字典
Args:
local_dir: 本地目录路径
Returns:
dict: 文件名到桩号信息的映射字典
"""
# 查找fileindex.txt文件
pattern = os.path.join(local_dir, 'fileindex.txt')
found_paths = glob.glob(pattern)
print(f"\n找到 {len(found_paths)}'fileindex.txt' 文件:")
for i, path in enumerate(found_paths, 1):
print(f"{i}. {path}")
pile_dict = {}
if len(found_paths) > 0:
# 读取第一个匹配的txt文件
with open(found_paths[0], 'r', encoding='utf-8') as file:
lines = file.readlines()
for i, line in enumerate(lines, 1):
parts = line.strip().split("->")
if len(parts) >= 4:
pile_dict[parts[3]] = parts # filename -> 桩号
return pile_dict
# ---------------- 示例调用 ----------------
if __name__=="__main__":
# zip_path = "D:/devForBdzlWork/ai-train_platform/predict/inferenceResult.zip" # 输入 ZIP 文件
# pile_map_file = "D:/devForBdzlWork/ai-train_platform/predict/pile_map.txt" # 图片名 -> 桩号
# process_zip(zip_path=zip_path,pile_map_file=pile_map_file,output_dir="output")
# output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7"
# pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234")
# road_dict = smb.get_road_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234")
# process_dir(road_dict, pile_dict, output_dir)
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7"
pile_dict = get_pile_dict(output_dir)
road_dict = get_road_dict(output_dir)
process_dir(road_dict, pile_dict, output_dir)