import os import zipfile import shutil import cv2 import numpy as np from collections import defaultdict import smb import pandas as pd from openpyxl import Workbook from openpyxl.styles import Font, Alignment from openpyxl.utils import get_column_letter import glob from datetime import datetime # ---------------- 常量 ---------------- CELL_AREA = 0.0036 # 每格面积 (平方米) GRID_WIDTH = 108 # 网格像素宽 GRID_HEIGHT = 102 # 网格像素高 COVER_RATIO = 0.01 # mask 覆盖比例阈值 # ---------------- 路面类别映射 ---------------- CLASS_MAP_ASPHALT = { "龟裂":0,"块状裂缝":1,"纵向裂缝":2,"横向裂缝":3,"沉陷":4,"车辙":5,"波浪拥包":6,"坑槽":7,"松散":8,"泛油":9,"修补":10 } CLASS_MAP_CEMENT = { "破碎板":0,"裂缝":1,"板角断裂":2,"错台":3,"拱起":4,"边角剥落":5,"接缝料损坏":6,"坑洞":7,"唧泥":8,"露骨":9,"修补":10 } CLASS_MAP_GRAVEL = { "坑槽":0,"沉陷":1,"车辙":2,"波浪搓板":3 } ROAD_TYPE_EN_TO_CN = { "asphalt":"沥青", "cement":"水泥", "gravel":"砾石" } # ---------------- 工具函数 ---------------- def num_to_coord(num, cols, cell_w, cell_h): n = num - 1 r, c = divmod(n, cols) x1, y1 = c * cell_w, r * cell_h x2, y2 = x1 + cell_w, y1 + cell_h return x1, y1, x2, y2 def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGHT), save_path=None): image = cv2.imread(image_path) if image is None: return h, w = image.shape[:2] cell_w, cell_h = cell_size cols = w // cell_w overlay = image.copy() for cname, nums in grid_cells.items(): color = (np.random.randint(64,255),np.random.randint(64,255),np.random.randint(64,255)) for num in nums: x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h) cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1) cv2.addWeighted(overlay,0.4,image,0.6,0,image) for i in range(0, w, cell_w): cv2.line(image,(i,0),(i,h),(100,100,100),1) for j in range(0, h, cell_h): cv2.line(image,(0,j),(w,j),(100,100,100),1) if save_path: cv2.imwrite(save_path,image) return image def detect_road_type_from_content(label_file): """根据标签内容判断路面类型""" try: with open(label_file,'r',encoding='utf-8') as f: content = f.read() except: return "gravel" for kw in CLASS_MAP_ASPHALT.keys(): if kw in content: return "asphalt" for kw in CLASS_MAP_CEMENT.keys(): if kw in content: return "cement" for kw in CLASS_MAP_GRAVEL.keys(): if kw in content: return "gravel" return "gravel" def get_road_info(road_dict, pile_dict, img_file_name): """获取路线信息""" parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] road_info = road_dict.get(road_code) if road_info : data = road_info['data'] return data return {} def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name): """根据读取的excel内容内容判断路面类型""" road_code = 'xxxxxxxxxxx' pile_no = "xxxxx" road_type = "asphalt" parts = pile_dict.get(img_file_name) if parts : road_code = parts[0] pile_no = parts[1] road_info = road_dict.get(road_code) if road_info : data = road_info['data'] pile_no = parts[1] road_type_cn = data['路面类型(沥青/水泥/砂石)'] identify_width = data['识别宽度(米)'] if road_type_cn == '沥青' : road_type = "asphalt" elif road_type_cn == '水泥' : road_type = "cement" elif road_type_cn == '砾石' : road_type = "gravel" return road_code, pile_no, road_type def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,cover_ratio=COVER_RATIO): """将YOLO-Seg标签转换成格子编号和类别""" img_file_name = os.path.basename(image_path) road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT elif road_type=="cement": class_map = CLASS_MAP_CEMENT else: class_map = CLASS_MAP_GRAVEL class_names = list(class_map.keys()) img = cv2.imread(image_path) if img is None: return "", {} h, w = img.shape[:2] cols = max(1, w//GRID_WIDTH) rows = max(1, h//GRID_HEIGHT) result_lines = [] all_class_cells = {} with open(label_file,'r',encoding='utf-8') as f: for line in f: parts = line.strip().split() if len(parts)<5: continue cls_id = int(parts[0]) coords = [float(x) for x in parts[1:]] if len(coords)%2!=0: coords=coords[:-1] if len(coords)<6: continue poly = np.array(coords,dtype=np.float32).reshape(-1,2) poly[:,0]*=w poly[:,1]*=h mask = np.zeros((h,w),dtype=np.uint8) cv2.fillPoly(mask,[poly.astype(np.int32)],255) cell_info = [] covered_cells=[] min_x = cols min_y = rows max_x = 0 max_y = 0 for r in range(rows): for c in range(cols): x1,y1 = c*GRID_WIDTH, r*GRID_HEIGHT x2,y2 = min(w,x1+GRID_WIDTH), min(h,y1+GRID_HEIGHT) region = mask[y1:y2, x1:x2] if np.count_nonzero(region)/region.size>cover_ratio: covered_cells.append(r*cols+c+1) # 最小x坐标,y坐标 if min_x > c : min_x = c if min_y > r : min_y = r # 最大x坐标,y坐标 if max_x < c + 1 : max_x = c + 1 if max_y < r + 1 : max_y = r + 1 if not covered_cells: continue # min_cell = covered_cells[0] # max_cell = covered_cells[len(covered_cells)-1] cname = class_names[cls_id] if cls_idcover_ratio: covered_cells.append(r*cols+c+1) if not covered_cells: continue cname = class_names[cls_id] if cls_id tmp_pile : min_pile = tmp_pile if max_pile < tmp_pile : max_pile = tmp_pile return min_pile, max_pile def convert_special_format(input_str): """ 将特殊格式字符串转换为浮点数 支持格式: "0+022" -> 0.022, "1+234" -> 1.234 """ if '+' in input_str: # 分割整数部分和小数部分 parts = input_str.split('+') if len(parts) == 2: integer_part = parts[0] decimal_part = parts[1] # 构建标准小数格式 standard_format = f"{integer_part}.{decimal_part}" return float(standard_format) else: raise ValueError(f"无效的格式: {input_str}") else: # 如果没有 '+',直接转换 return float(input_str) # 是否在区间内 def in_interval(increment, cur_pile_no, tmp_start, tmp_end) : if increment > 0 : if cur_pile_no >= tmp_start and cur_pile_no < tmp_end : return True else : return False else : if cur_pile_no > tmp_end and cur_pile_no <= tmp_start : return True else : return False # 指定间隔区间,输出基本信息+灾害数据 def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interval=10,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT) : process_info_data = [] if summary_data: img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') road_code = pile_dict.get(img_file_name)[0] road_level = road_data.get('技术等级', '') road_type_cn = road_data.get('路面类型(沥青/水泥/砂石)', '沥青') group_list = summary_data increment = float(0.001 * interval) pile_no_start = float(0.000) pile_no_end = float(0.000) + increment # 上行/下行 if up_or_down == '下行' : group_list = list(summary_data)[::-1] increment = float(-0.001 * interval) tmp_pile_no = convert_special_format(group_list[len(group_list)-1][1]) pile_no_start = tmp_pile_no if tmp_pile_no % increment != 0 : pile_no_end = tmp_pile_no + (tmp_pile_no % increment) else : pile_no_end = tmp_pile_no + increment index = 0 tmp_start = pile_no_start tmp_end = pile_no_end while True : # 每10m一个区间,在区间内进行灾害计算 pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) pile_no_s = f"{tmp_start:0.3f}" pile_no_e = f"{tmp_end:0.3f}" up_or_down_code = "B" if up_or_down == '下行' else "A" if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) : # 没在刻度内直接输出无病害数据 row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','',''] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) # for k in keys: # row.append(f"{0:.2f}") process_info_data.append(row) # f.write(','.join(row)+'\n') else : row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'',''] subRows = [] while index < len(group_list): pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) tmp_row = [] if in_interval(increment, cur_pile_no, tmp_start, tmp_end) : pile_no = f"{tmp_start:0.3f}" tmp_row = [DR] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) # for k in keys: # tmp_row.append(counts.get(k, [0,0,0])[0]) subRows.append(tmp_row) index = index + 1 else : break # 同列汇总 10m一个区间--对应5张图 column_sums = [f"{(sum(column)/(interval / 2)):0.2f}" for column in zip(*subRows)] row += column_sums # f.write(','.join(row)+'\n') row += ['','','','','','','','','','','','','','',''] process_info_data.append(row) tmp_start = tmp_end tmp_end = tmp_start + increment print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}") if index >= len(group_list) : break return process_info_data def adjust_column_width(worksheet): """ 调整列宽函数 """ for col_idx in range(1, worksheet.max_column + 1): max_length = 0 column_letter = get_column_letter(col_idx) for row_idx in range(1, worksheet.max_row + 1): try: cell = worksheet.cell(row=row_idx, column=col_idx) # 检查单元格是否在合并单元格范围内 if any(cell.coordinate in merged_range for merged_range in worksheet.merged_cells.ranges): continue # 跳过合并单元格 if cell.value is not None: cell_length = len(str(cell.value)) if cell_length > max_length: max_length = cell_length except: continue if max_length > 0: worksheet.column_dimensions[column_letter].width = max_length + 2 def create_multiple_sheets_with_multiple_headers(file_path, excel_data): # 创建工作簿 wb = Workbook() # 删除默认创建的sheet wb.remove(wb.active) for sheet_data in excel_data: # 创建工作表 ws = wb.create_sheet(title=sheet_data['name']) # 设置多表头 current_row = 1 # 多级表头 for heads in sheet_data['headers'] : for head in heads : ws.merge_cells(f'{head[1]}{current_row}:{head[2]}{current_row}') ws[f'{head[1]}{current_row}'] = head[0] ws[f'{head[1]}{current_row}'].font = Font(bold=True, size=14) ws[f'{head[1]}{current_row}'].alignment = Alignment(horizontal='center') current_row += 1 # 列标题 for col_idx, column_name in enumerate(sheet_data['columns'], 1): cell = ws.cell(row=current_row, column=col_idx, value=column_name) cell.font = Font(bold=True) current_row += 1 # 写入数据 for row_data in sheet_data['data']: for col_idx, value in enumerate(row_data, 1): ws.cell(row=current_row, column=col_idx, value=value) current_row += 1 # 自动调整列宽 adjust_column_width(ws) # 保存文件 wb.save(file_path) print(f"文件已保存为 {file_path}") # ---------------- 主函数-共享目录 ---------------- def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): os.makedirs(dir,exist_ok=True) # 解压 # 读取桩号映射 # 遍历图片 summary_data = [] for root,_,files in os.walk(dir): for file in files: if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) : image_path = os.path.join(root,file) label_file = os.path.splitext(image_path)[0]+".txt" if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue out_txt, class_cells, road_type, all_cell_num = yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 # draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 counts = {k:[len(v[0])*cell_area, v[1][0], v[1][1]] for k,v in class_cells.items()} # total_area = sum(counts.values()) # 灾害总面积比例 merged_set = set([]) for k,v in class_cells.items() : merged_set = merged_set.union(v[0]) total_area = len(merged_set) # 桩号 路线编号 parts = pile_dict.get(file) pile_no = "0+000" if parts : pile_no = parts[1] # 破损率 DR (%) = total_area / 总面积 # DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0 DR= total_area / all_cell_num * 100 summary_data.append((pile_no, DR, counts, road_type)) current_time = datetime.now().strftime("%Y%m%d%H%M%S") # 写桩号问题列表.txt if summary_data: img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) road_code = pile_dict.get(img_file_name)[0] road_type = summary_data[0][3] min_pile, max_pile = get_min_max_pile(summary_data) out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') for pile_no,DR,counts,rt in summary_data: row = [pile_no,"3.6",f"{DR:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{counts.get(k,[0,0,0])[0]:.2f}") f.write(','.join(row)+'\n') print(f"输出完成: {out_file}") # 写灾害数据.txt if summary_data: img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') road_code = pile_dict.get(img_file_name)[0] group_by_road_type = {} for data in summary_data: group_by_road_type.setdefault(data[3], []).append(data) for road_type, group in group_by_road_type.items(): min_pile, max_pile = get_min_max_pile(group) out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt") header = generate_header(road_type) group_list = group increment = float(0.010) pile_no_start = float(0.000) pile_no_end = float(0.000) + increment # 上行/下行 if up_or_down == '下行' : group_list = list(group)[::-1] increment = float(-0.010) tmp_pile_no = convert_special_format(group_list[len(group_list)-1][1]) pile_no_start = tmp_pile_no if tmp_pile_no % increment != 0 : pile_no_end = tmp_pile_no + (tmp_pile_no % increment) else : pile_no_end = tmp_pile_no + increment with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') index = 0 tmp_start = pile_no_start tmp_end = pile_no_end while True : # 每10m一个区间,在区间内进行灾害计算 pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) : # 没在刻度内直接输出无病害数据 pile_no = f"{tmp_start:0.3f}" row = [pile_no,identify_width,f"{0:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{0:.2f}") f.write(','.join(row)+'\n') else : row = [f"{tmp_start:0.3f}", identify_width] subRows = [] while index < len(group_list): pile_no, DR, counts, road_type = group_list[index] cur_pile_no = convert_special_format(pile_no) tmp_row = [] if in_interval(increment, cur_pile_no, tmp_start, tmp_end) : pile_no = f"{tmp_start:0.3f}" tmp_row = [DR] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: tmp_row.append(counts.get(k, [0,0,0])[0]) subRows.append(tmp_row) index = index + 1 else : break # 同列汇总 10m一个区间--对应5张图 column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)] row += column_sums f.write(','.join(row)+'\n') tmp_start = tmp_end tmp_end = tmp_start + increment print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}") if index >= len(group_list) : break print(f"输出完成: {out_file}") # 病害明显列表.xlsx headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注'] data_list = [] if summary_data: img_file_path = os.path.dirname(image_path) img_file_name = os.path.basename(image_path) road_data = get_road_info(road_dict, pile_dict, img_file_name) road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) identify_width = road_data.get('识别宽度(米)', '3.6') up_or_down = road_data.get('方向(上行/下行)', '上行') excel_index = 1 for data in summary_data: damage_data = data[2] for attr_name, attr_value in damage_data.items(): excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value[1]*cell_area, attr_value[2]*cell_area, attr_value[0], '', ''] data_list.append(excel_data) all_data = [headers] + data_list smb.write_to_excel_pandas(all_data, img_file_path + '/病害明细列表.xlsx') # 综合明细表.xlsx heads = ['路线编码','起点','终点','车道编码','上下行','公路等级','路面类型','PQI','PQI等级','DR(%)','PCI','PCI等级','IRI','RQI','RQI等级','RD','RDI','RDI等级','SMTD','PBI','PBI等级','WR','PWI','PWI等级','备注'] data1 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,10) data2 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,100) data3 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,1000) excel_data = [ { 'name': '综合明细十米', 'headers': [[['综合明细十米','A','Y']]], 'columns': heads, 'data': data1 }, { 'name': '综合明细百米', 'headers': [[['综合明细百米','A','Y']]], 'columns': heads, 'data': data2 }, { 'name': '综合明细千米', 'headers': [[['综合明细千米','A','Y']]], 'columns': heads, 'data': data3 } ] create_multiple_sheets_with_multiple_headers(img_file_path + '/综合明细表.xlsx', excel_data) # ---------------- 主函数 ---------------- def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): if not os.path.exists(zip_path): raise FileNotFoundError(f"{zip_path} 不存在") os.makedirs(output_dir,exist_ok=True) # 解压 with zipfile.ZipFile(zip_path,'r') as zip_ref: zip_ref.extractall(output_dir) # 读取桩号映射 pile_dict = {} with open(pile_map_file,'r',encoding='utf-8') as f: for line in f: parts = line.strip().split("->") if len(parts)>=4: pile_dict[parts[3]]=parts # filename -> 桩号 # 遍历图片 summary_data = [] for root,_,files in os.walk(output_dir): for file in files: if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) : image_path = os.path.join(root,file) label_file = os.path.splitext(image_path)[0]+".txt" if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue out_txt, class_cells, road_type, = yoloseg_to_grid(image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 counts = {k:len(v)*cell_area for k,v in class_cells.items()} total_area = sum(counts.values()) # 桩号 pile_no = pile_dict.get(file,"0+000") # 破损率 DR (%) = total_area / 总面积 DR = total_area / (total_area if total_area > 0 else 1) * 100 # 简化为100%或者0 summary_data.append((pile_no, DR, counts, road_type)) # 写桩号问题列表.txt if summary_data: road_type = summary_data[0][3] out_file = os.path.join(output_dir,"桩号问题列表.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') for pile_no,DR,counts,rt in summary_data: row = [pile_no,"3.6",f"{DR:.2f}"] if road_type=="asphalt": keys = list(CLASS_MAP_ASPHALT.keys()) elif road_type=="cement": keys = list(CLASS_MAP_CEMENT.keys()) else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: row.append(f"{counts.get(k,0):.2f}") f.write(','.join(row)+'\n') print(f"✅ 输出完成: {out_file}") # 路线编码 -> 路线信息 def get_road_dict(local_dir): """ 从本地目录读取Excel文件,构建路线字典 Args: local_dir: 本地目录路径 Returns: dict: 路线编码到路线信息的映射字典 """ # 查找匹配的Excel文件 pattern = os.path.join(local_dir, '每公里指标明细表*.xls') found_paths = glob.glob(pattern) print(f"\n找到 {len(found_paths)} 个 '每公里指标明细表*.xls' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") road_dict = {} if len(found_paths) > 0: # 读取第一个匹配的Excel文件 df = pd.read_excel(found_paths[0]) # 处理所有行(这里需要根据实际情况调整处理逻辑) for index, row in df.iterrows(): data = row.to_dict() if pd.notna(data.get('线路编码', None)): up_or_down = 'A' if data.get('方向(上行/下行)', '') == '下行': up_or_down = 'B' # 构建key,确保区划代码为整数 area_code = data.get('区划代码', '') if pd.notna(area_code): area_code = str(int(float(area_code))) if str(area_code).replace('.', '').isdigit() else str(area_code) else: area_code = '' key = f"{data['线路编码']}{area_code}{up_or_down}" if key in road_dict: road_dict[key].append({'index': index, 'data': data}) else: road_dict[key] = [{'index': index, 'data': data}] return road_dict # filename -> 桩号 def get_pile_dict(local_dir): """ 从本地目录读取fileindex.txt文件,构建桩号字典 Args: local_dir: 本地目录路径 Returns: dict: 文件名到桩号信息的映射字典 """ # 查找fileindex.txt文件 pattern = os.path.join(local_dir, 'fileindex.txt') found_paths = glob.glob(pattern) print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") pile_dict = {} if len(found_paths) > 0: # 读取第一个匹配的txt文件 with open(found_paths[0], 'r', encoding='utf-8') as file: lines = file.readlines() for i, line in enumerate(lines, 1): parts = line.strip().split("->") if len(parts) >= 4: pile_dict[parts[3]] = parts # filename -> 桩号 return pile_dict # ---------------- 示例调用 ---------------- if __name__=="__main__": # zip_path = "D:/devForBdzlWork/ai-train_platform/predict/inferenceResult.zip" # 输入 ZIP 文件 # pile_map_file = "D:/devForBdzlWork/ai-train_platform/predict/pile_map.txt" # 图片名 -> 桩号 # process_zip(zip_path=zip_path,pile_map_file=pile_map_file,output_dir="output") # output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" # pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") # road_dict = smb.get_road_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") # process_dir(road_dict, pile_dict, output_dir) output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" pile_dict = get_pile_dict(output_dir) road_dict = get_road_dict(output_dir) process_dir(road_dict, pile_dict, output_dir)