diff --git a/predict/predict_yolo11seg.py b/predict/predict_yolo11seg.py index 95ddbbd..6f3543e 100644 --- a/predict/predict_yolo11seg.py +++ b/predict/predict_yolo11seg.py @@ -692,10 +692,14 @@ class YOLOSegmentationInference: results.append(result) # 推送识别数据到共享目录 - pile_dict = get_pile_dict(image_path, user_name, pwd) - process_dir(pile_dict, output_dir) + tmpConfig = get_conf(input_dir, user_name, pwd) + pile_dict = get_pile_dict(input_dir, user_name, pwd) + road_dict = get_road_dict(f"{tmpConfig['ip']}/{tmpConfig['share']}", user_name, pwd) + process_dir(road_dict, pile_dict, output_dir) current_time = datetime.now().strftime("%Y%m%d%H%M%S") - scanner.upload_directory(output_dir, config['share'], remote_dir=input_dir+f"_识别/{task_id}/{current_time}") + + remote_dir = f"{tmpConfig['dir']}_识别/{task_id}/{current_time}" + # scanner.upload_directory(output_dir, config['share'], remote_dir=remote_dir) return results @@ -894,7 +898,8 @@ def predict_images_share_dir(task_id, pt_name, zip_url, user_name, pwd, output_d scanner = get_scanner(zip_url, user_name, pwd) found_paths = scanner.find_folders_by_name( share_path=config['share'], - folder_name='图像类' + folder_name='图像类', + start_dir=config['dir'] ) target_path = "" # 识别图片目录 diff --git a/util/smb.py b/util/smb.py index a47ce26..c4516e6 100644 --- a/util/smb.py +++ b/util/smb.py @@ -86,13 +86,13 @@ class SMBScanner: results = [] for row_number, (index, row) in enumerate(df.iterrows(), 1): - print(f"\n处理第 {row_number} 行:") - print("-" * 40) + # print(f"\n处理第 {row_number} 行:") + # print("-" * 40) # 显示行数据 for col_name in df.columns: value = row[col_name] - print(f" {col_name}: {value}") + # print(f" {col_name}: {value}") # 处理逻辑(根据实际需求修改) processed_row = { @@ -108,7 +108,7 @@ class SMBScanner: if row_number % 10 == 0 or row_number == len(df): print(f"\n 进度: {row_number}/{len(df)} ({row_number/len(df)*100:.1f}%)") - print("\n" + "=" * 60) + # print("\n" + "=" * 60) print(f"处理完成!共处理 {len(results)} 行数据") return results @@ -751,27 +751,76 @@ def get_scanner(zip_url, user_name, pwd) : ) return scanner +# 路线编码 -> 路线信息 +def get_road_dict(dir,user_name,pwd) : + config = get_conf(dir, user_name, pwd) + scanner = get_scanner(dir, user_name=user_name, pwd=pwd) + found_paths = scanner.find_files_by_name( + share_path=config['share'], + file_name='每公里指标明细表*.xls', + start_dir=config['dir'] + ) + print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:") + for i, path in enumerate(found_paths, 1): + print(f"{i}. {path}") + + road_dict = {} + if len(found_paths) > 0 : + df = scanner.read_excel(found_paths[0]) + rows = scanner.process_all_rows(df) + for i, row in enumerate(rows, 1): + data = row['data'] + if pd.notna(data['线路编码']) : + up_or_down = 'A' + if data['方向(上行/下行)'] == '下行' : + up_or_down = 'B' + key = f"{data['线路编码']}{str(int(data['区划代码']))}{up_or_down}" + if road_dict.get(key) : + road_dict[key].append(row) + else : + road_dict[key] = [row] # 路线编码 -> 路线信息 + + return road_dict + # filename -> 桩号 def get_pile_dict(dir,user_name,pwd) : config = get_conf(dir, user_name, pwd) scanner = get_scanner(dir, user_name=user_name, pwd=pwd) found_paths = scanner.find_files_by_name( share_path=config['share'], - file_name='fileindex.txt' + file_name='fileindex.txt', + start_dir=config['dir'] ) print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:") for i, path in enumerate(found_paths, 1): print(f"{i}. {path}") - - lines = scanner.read_txt_by_line(full_path=found_paths[0]) pile_dict = {} - for i, line in enumerate(lines, 1): - parts = line.strip().split("->") - if len(parts)>=4: - pile_dict[parts[3]]=parts[1] # filename -> 桩号 + if len(found_paths) > 0 : + lines = scanner.read_txt_by_line(full_path=found_paths[0]) + for i, line in enumerate(lines, 1): + parts = line.strip().split("->") + if len(parts)>=4: + pile_dict[parts[3]]=parts # filename -> 桩号 return pile_dict + + +def write_to_excel_pandas(data, filename, sheet_name='Sheet1'): + """ + 使用 pandas 将数据写入 Excel + + Args: + data: 数据列表,每个元素是一行 + filename: 输出文件名 + sheet_name: 工作表名称 + """ + # 创建 DataFrame + df = pd.DataFrame(data) + + # 写入 Excel + df.to_excel(filename, sheet_name=sheet_name, index=False, header=False) + print(f"数据已写入 {filename}") def main(): # 配置信息 @@ -825,6 +874,15 @@ def main(): # for i, path in enumerate(found_paths, 1): # print(f"{i}. {path}") + # # 查找指定文件 + # found_paths = scanner.find_files_by_name( + # share_path=config['share'], + # file_name='每公里指标明细表*.xls' + # ) + # print(f"\n找到 {len(found_paths)} 个") + # for i, path in enumerate(found_paths, 1): + # print(f"{i}. {path}") + # # 查找指定目录中的所有图片 # full_path = scanner.build_full_path(share_path=config['share'], file_path='西南计算机\\AA县\\报送数据') @@ -833,9 +891,21 @@ def main(): # print(f"{i}. {path}") # # 读取excel - # full_path = scanner.build_full_path(share_path=config['share'], file_path='西南计算机\\AA县\\24年年报.xlsx') + # full_path = scanner.build_full_path(share_path=config['share'], file_path='西南计算机\\AA县\\每公里指标明细表(北碚).xls') # df = scanner.read_excel(full_path) - # scanner.process_all_rows(df) + # rows = scanner.process_all_rows(df) + # road_dict = {} + # for i, row in enumerate(rows, 1): + # data = row['data'] + # if pd.notna(data['线路编码']) : + # up_or_down = 'A' + # if data['方向(上行/下行)'] == '下行' : + # up_or_down = 'B' + # key = f"{data['线路编码']}{str(int(data['区划代码']))}{up_or_down}" + # if road_dict.get(key) : + # road_dict[key].append(row) + # else : + # road_dict[key] = [row] # 路线编码 -> 路线信息 # 读取txt @@ -847,13 +917,19 @@ def main(): # for i, path in enumerate(found_paths, 1): # print(f"{i}. {path}") + # 读取txt # lines = scanner.read_txt_by_line(full_path=found_paths[0]) # for i, line in enumerate(lines, 1): # print(f"{i}. {line}") - output_dir = "D:/devForBdzlWork/ai-train_platform/predictions" - scanner.upload_directory(output_dir, config['share'], remote_dir="西南计算机/AA县/报送数据_识别") + # 上传目录 + # output_dir = "D:/devForBdzlWork/ai-train_platform/predictions" + # scanner.upload_directory(output_dir, config['share'], remote_dir="西南计算机/AA县/报送数据_识别") + # get_pile_dict + # input_dir = "192.168.110.114/share_File/西南计算机/AA县/报送数据/图像类/CD45500155A/Images" + # pile_dict = get_pile_dict(input_dir, config['username'], config['password']) + # print("-------------------------------------------") if __name__ == "__main__": main() \ No newline at end of file diff --git a/util/yolo2pix_new.py b/util/yolo2pix_new.py index 820b5e5..d73150c 100644 --- a/util/yolo2pix_new.py +++ b/util/yolo2pix_new.py @@ -5,6 +5,9 @@ import cv2 import numpy as np from collections import defaultdict import smb +import pandas as pd +import glob +from datetime import datetime # ---------------- 常量 ---------------- CELL_AREA = 0.0036 # 每格面积 (平方米) @@ -23,6 +26,12 @@ CLASS_MAP_GRAVEL = { "坑槽":0,"沉陷":1,"车辙":2,"波浪搓板":3 } +ROAD_TYPE_EN_TO_CN = { + "asphalt":"沥青", + "cement":"水泥", + "gravel":"砾石" +} + # ---------------- 工具函数 ---------------- def num_to_coord(num, cols, cell_w, cell_h): n = num - 1 @@ -66,6 +75,107 @@ def detect_road_type_from_content(label_file): if kw in content: return "gravel" return "gravel" +def get_road_info(road_dict, pile_dict, img_file_name): + """获取路线信息""" + parts = pile_dict.get(img_file_name) + if parts : + road_code = parts[0] + road_info = road_dict.get(road_code) + if road_info : + data = road_info['data'] + return data + return {} + +def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name): + """根据读取的excel内容内容判断路面类型""" + road_code = 'xxxxxxxxxxx' + pile_no = "xxxxx" + road_type = "asphalt" + parts = pile_dict.get(img_file_name) + if parts : + road_code = parts[0] + pile_no = parts[1] + road_info = road_dict.get(road_code) + if road_info : + data = road_info['data'] + pile_no = parts[1] + road_type_cn = data['路面类型(沥青/水泥/砂石)'] + identify_width = data['识别宽度(米)'] + if road_type_cn == '沥青' : + road_type = "asphalt" + elif road_type_cn == '水泥' : + road_type = "cement" + elif road_type_cn == '砾石' : + road_type = "gravel" + return road_code, pile_no, road_type + +def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,cover_ratio=COVER_RATIO): + """将YOLO-Seg标签转换成格子编号和类别""" + img_file_name = os.path.basename(image_path) + road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) + if road_type=="asphalt": class_map = CLASS_MAP_ASPHALT + elif road_type=="cement": class_map = CLASS_MAP_CEMENT + else: class_map = CLASS_MAP_GRAVEL + class_names = list(class_map.keys()) + + img = cv2.imread(image_path) + if img is None: return "", {} + h, w = img.shape[:2] + cols = max(1, w//GRID_WIDTH) + rows = max(1, h//GRID_HEIGHT) + + result_lines = [] + all_class_cells = {} + with open(label_file,'r',encoding='utf-8') as f: + for line in f: + parts = line.strip().split() + if len(parts)<5: continue + cls_id = int(parts[0]) + coords = [float(x) for x in parts[1:]] + if len(coords)%2!=0: coords=coords[:-1] + if len(coords)<6: continue + poly = np.array(coords,dtype=np.float32).reshape(-1,2) + poly[:,0]*=w + poly[:,1]*=h + mask = np.zeros((h,w),dtype=np.uint8) + cv2.fillPoly(mask,[poly.astype(np.int32)],255) + cell_info = [] + covered_cells=[] + min_x = cols + min_y = rows + max_x = 0 + max_y = 0 + for r in range(rows): + for c in range(cols): + x1,y1 = c*GRID_WIDTH, r*GRID_HEIGHT + x2,y2 = min(w,x1+GRID_WIDTH), min(h,y1+GRID_HEIGHT) + region = mask[y1:y2, x1:x2] + if np.count_nonzero(region)/region.size>cover_ratio: + covered_cells.append(r*cols+c+1) + # 最小x坐标,y坐标 + if min_x > c : + min_x = c + if min_y > r : + min_y = r + # 最大x坐标,y坐标 + if max_x < c + 1 : + max_x = c + 1 + if max_y < r + 1 : + max_y = r + 1 + + if not covered_cells: continue + # min_cell = covered_cells[0] + # max_cell = covered_cells[len(covered_cells)-1] + + cname = class_names[cls_id] if cls_id tmp_pile : + min_pile = tmp_pile + if max_pile < tmp_pile : + max_pile = tmp_pile + + return min_pile, max_pile + +def convert_special_format(input_str): + """ + 将特殊格式字符串转换为浮点数 + 支持格式: "0+022" -> 0.022, "1+234" -> 1.234 + """ + if '+' in input_str: + # 分割整数部分和小数部分 + parts = input_str.split('+') + if len(parts) == 2: + integer_part = parts[0] + decimal_part = parts[1] + + # 构建标准小数格式 + standard_format = f"{integer_part}.{decimal_part}" + return float(standard_format) + else: + raise ValueError(f"无效的格式: {input_str}") + else: + # 如果没有 '+',直接转换 + return float(input_str) + +# 是否在区间内 +def in_interval(increment, cur_pile_no, tmp_start, tmp_end) : + if increment > 0 : + if cur_pile_no >= tmp_start and cur_pile_no < tmp_end : + return True + else : + return False + else : + if cur_pile_no > tmp_end and cur_pile_no <= tmp_start : + return True + else : + return False + # ---------------- 主函数-共享目录 ---------------- -def process_dir(pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): +def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): os.makedirs(dir,exist_ok=True) # 解压 # 读取桩号映射 @@ -133,26 +289,43 @@ def process_dir(pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue - out_txt, class_cells, road_type = yoloseg_to_grid(image_path,label_file) + out_txt, class_cells, road_type, all_cell_num = yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: f.write(out_txt) # 生成网格可视化 - draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") + # draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg") # 统计各类面积 - counts = {k:len(v)*cell_area for k,v in class_cells.items()} - total_area = sum(counts.values()) - # 桩号 - pile_no = pile_dict.get(file,"未知") + counts = {k:[len(v[0])*cell_area, v[1][0], v[1][1]] for k,v in class_cells.items()} + # total_area = sum(counts.values()) + # 灾害总面积比例 + merged_set = set([]) + for k,v in class_cells.items() : + merged_set = merged_set.union(v[0]) + total_area = len(merged_set) + + # 桩号 路线编号 + parts = pile_dict.get(file) + pile_no = "0+000" + if parts : + pile_no = parts[1] + # 破损率 DR (%) = total_area / 总面积 - DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0 + # DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0 + DR= total_area / all_cell_num * 100 summary_data.append((pile_no, DR, counts, road_type)) + current_time = datetime.now().strftime("%Y%m%d%H%M%S") # 写桩号问题列表.txt if summary_data: + img_file_name = os.path.basename(image_path) + road_data = get_road_info(road_dict, pile_dict, img_file_name) + road_code = pile_dict.get(img_file_name)[0] road_type = summary_data[0][3] - out_file = os.path.join(dir,"桩号问题列表.txt") + + min_pile, max_pile = get_min_max_pile(summary_data) + out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt") header = generate_header(road_type) with open(out_file,'w',encoding='utf-8') as f: f.write(header+'\n') @@ -165,9 +338,121 @@ def process_dir(pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH else: keys = list(CLASS_MAP_GRAVEL.keys()) for k in keys: - row.append(f"{counts.get(k,0):.2f}") + row.append(f"{counts.get(k,[0,0,0])[0]:.2f}") f.write(','.join(row)+'\n') - print(f"✅ 输出完成: {out_file}") + print(f"输出完成: {out_file}") + + # 写灾害数据.txt + if summary_data: + img_file_name = os.path.basename(image_path) + road_data = get_road_info(road_dict, pile_dict, img_file_name) + identify_width = road_data.get('识别宽度(米)', '3.6') + up_or_down = road_data.get('方向(上行/下行)', '上行') + road_code = pile_dict.get(img_file_name)[0] + group_by_road_type = {} + for data in summary_data: + group_by_road_type.setdefault(data[3], []).append(data) + + + for road_type, group in group_by_road_type.items(): + min_pile, max_pile = get_min_max_pile(group) + out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt") + header = generate_header(road_type) + + group_list = group + increment = float(0.010) + pile_no_start = float(0.000) + pile_no_end = float(0.000) + increment + # 上行/下行 + if up_or_down == '下行' : + group_list = list(group)[::-1] + increment = float(-0.010) + tmp_pile_no = convert_special_format(group[len(group)-1][1]) + pile_no_start = tmp_pile_no + if tmp_pile_no % increment != 0 : + pile_no_end = tmp_pile_no + (tmp_pile_no % increment) + else : + pile_no_end = tmp_pile_no + increment + + with open(out_file,'w',encoding='utf-8') as f: + f.write(header+'\n') + index = 0 + tmp_start = pile_no_start + tmp_end = pile_no_end + while True : + # 每10m一个区间,在区间内进行灾害计算 + pile_no, DR, counts, road_type = summary_data[index] + cur_pile_no = convert_special_format(pile_no) + if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) : + # 没在刻度内直接输出无病害数据 + pile_no = f"{tmp_start:0.3f}" + row = [pile_no,identify_width,f"{0:.2f}"] + if road_type=="asphalt": + keys = list(CLASS_MAP_ASPHALT.keys()) + elif road_type=="cement": + keys = list(CLASS_MAP_CEMENT.keys()) + else: + keys = list(CLASS_MAP_GRAVEL.keys()) + for k in keys: + row.append(f"{0:.2f}") + f.write(','.join(row)+'\n') + else : + row = [f"{tmp_start:0.3f}", identify_width] + subRows = [] + while index < len(group_list): + pile_no, DR, counts, road_type = summary_data[index] + cur_pile_no = convert_special_format(pile_no) + + tmp_row = [] + if in_interval(increment, cur_pile_no, tmp_start, tmp_end) : + pile_no = f"{tmp_start:0.3f}" + tmp_row = [DR] + if road_type=="asphalt": + keys = list(CLASS_MAP_ASPHALT.keys()) + elif road_type=="cement": + keys = list(CLASS_MAP_CEMENT.keys()) + else: + keys = list(CLASS_MAP_GRAVEL.keys()) + for k in keys: + tmp_row.append(counts.get(k, [0,0,0])[0]) + subRows.append(tmp_row) + index = index + 1 + else : + break + + # 同列汇总 10m一个区间--对应5张图 + column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)] + row += column_sums + f.write(','.join(row)+'\n') + + tmp_start = tmp_end + tmp_end = tmp_start + increment + + print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}") + if index >= len(group_list) : + break + + print(f"输出完成: {out_file}") + + # 病害明显列表.xlsx + headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注'] + data_list = [] + if summary_data: + img_file_path = os.path.dirname(image_path) + img_file_name = os.path.basename(image_path) + road_data = get_road_info(road_dict, pile_dict, img_file_name) + road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name) + identify_width = road_data.get('识别宽度(米)', '3.6') + up_or_down = road_data.get('方向(上行/下行)', '上行') + excel_index = 1 + for data in summary_data: + damage_data = data[2] + for attr_name, attr_value in damage_data.items(): + excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value[1]*cell_area, attr_value[2]*cell_area, attr_value[0], '', ''] + data_list.append(excel_data) + + all_data = [headers] + data_list + smb.write_to_excel_pandas(all_data, img_file_path + '/病害明显列表.xlsx') # ---------------- 主函数 ---------------- def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT): @@ -184,7 +469,7 @@ def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,g for line in f: parts = line.strip().split("->") if len(parts)>=4: - pile_dict[parts[3]]=parts[1] # filename -> 桩号 + pile_dict[parts[3]]=parts # filename -> 桩号 # 遍历图片 summary_data = [] @@ -196,7 +481,7 @@ def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,g if not os.path.exists(label_file): print(f"⚠️ 找不到标签: {label_file}") continue - out_txt, class_cells, road_type = yoloseg_to_grid(image_path,label_file) + out_txt, class_cells, road_type, = yoloseg_to_grid(image_path,label_file) # 写每张图独立 _grid.txt grid_txt_path = os.path.splitext(image_path)[0]+"_grid.txt" with open(grid_txt_path,'w',encoding='utf-8') as f: @@ -207,9 +492,9 @@ def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,g counts = {k:len(v)*cell_area for k,v in class_cells.items()} total_area = sum(counts.values()) # 桩号 - pile_no = pile_dict.get(file,"未知") + pile_no = pile_dict.get(file,"0+000") # 破损率 DR (%) = total_area / 总面积 - DR = total_area/ (total_area if total_area>0 else 1) *100 # 简化为100%或者0 + DR = total_area / (total_area if total_area > 0 else 1) * 100 # 简化为100%或者0 summary_data.append((pile_no, DR, counts, road_type)) # 写桩号问题列表.txt @@ -232,12 +517,99 @@ def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,g f.write(','.join(row)+'\n') print(f"✅ 输出完成: {out_file}") + +# 路线编码 -> 路线信息 +def get_road_dict(local_dir): + """ + 从本地目录读取Excel文件,构建路线字典 + + Args: + local_dir: 本地目录路径 + + Returns: + dict: 路线编码到路线信息的映射字典 + """ + # 查找匹配的Excel文件 + pattern = os.path.join(local_dir, '每公里指标明细表*.xls') + found_paths = glob.glob(pattern) + + print(f"\n找到 {len(found_paths)} 个 '每公里指标明细表*.xls' 文件:") + for i, path in enumerate(found_paths, 1): + print(f"{i}. {path}") + + road_dict = {} + if len(found_paths) > 0: + # 读取第一个匹配的Excel文件 + df = pd.read_excel(found_paths[0]) + + # 处理所有行(这里需要根据实际情况调整处理逻辑) + for index, row in df.iterrows(): + data = row.to_dict() + if pd.notna(data.get('线路编码', None)): + up_or_down = 'A' + if data.get('方向(上行/下行)', '') == '下行': + up_or_down = 'B' + + # 构建key,确保区划代码为整数 + area_code = data.get('区划代码', '') + if pd.notna(area_code): + area_code = str(int(float(area_code))) if str(area_code).replace('.', '').isdigit() else str(area_code) + else: + area_code = '' + + key = f"{data['线路编码']}{area_code}{up_or_down}" + + if key in road_dict: + road_dict[key].append({'index': index, 'data': data}) + else: + road_dict[key] = [{'index': index, 'data': data}] + + return road_dict + +# filename -> 桩号 +def get_pile_dict(local_dir): + """ + 从本地目录读取fileindex.txt文件,构建桩号字典 + + Args: + local_dir: 本地目录路径 + + Returns: + dict: 文件名到桩号信息的映射字典 + """ + # 查找fileindex.txt文件 + pattern = os.path.join(local_dir, 'fileindex.txt') + found_paths = glob.glob(pattern) + + print(f"\n找到 {len(found_paths)} 个 'fileindex.txt' 文件:") + for i, path in enumerate(found_paths, 1): + print(f"{i}. {path}") + + pile_dict = {} + if len(found_paths) > 0: + # 读取第一个匹配的txt文件 + with open(found_paths[0], 'r', encoding='utf-8') as file: + lines = file.readlines() + + for i, line in enumerate(lines, 1): + parts = line.strip().split("->") + if len(parts) >= 4: + pile_dict[parts[3]] = parts # filename -> 桩号 + + return pile_dict + # ---------------- 示例调用 ---------------- if __name__=="__main__": # zip_path = "D:/devForBdzlWork/ai-train_platform/predict/inferenceResult.zip" # 输入 ZIP 文件 # pile_map_file = "D:/devForBdzlWork/ai-train_platform/predict/pile_map.txt" # 图片名 -> 桩号 # process_zip(zip_path=zip_path,pile_map_file=pile_map_file,output_dir="output") - output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/1" - pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") - process_dir(pile_dict, output_dir) + # output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" + # pile_dict = smb.get_pile_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") + # road_dict = smb.get_road_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234") + # process_dir(road_dict, pile_dict, output_dir) + + output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7" + pile_dict = get_pile_dict(output_dir) + road_dict = get_road_dict(output_dir) + process_dir(road_dict, pile_dict, output_dir)