灾害数据输出到DR目录;excel数据输出到excel目录

This commit is contained in:
liyubo 2025-11-24 15:38:45 +08:00
parent 333238c7b1
commit e128908d67
3 changed files with 244 additions and 83 deletions

View File

@ -18,7 +18,7 @@ from ultralytics import YOLO
from middleware.minio_util import upload_file, downFile, check_zip_size, upload_folder
from util.yolo2pix_new import *
from util.smb import *
from util.smb_tool import *
import threading
# 定义红白蓝颜色 (BGR格式)
@ -635,8 +635,17 @@ class YOLOSegmentationInference:
except Exception as e:
print(f"处理目录失败: {e}")
return results
def process_image_directory_share_dir_circle(self, task_id, current_time, input_dir_list, user_name, pwd, output_dir: Optional[str] = None,
conf_threshold: float = 0.25, iou_threshold: float = 0.5,
save_mask: bool = False, save_label: bool = False, show: bool = False,
result_save: [] = None) -> List[
InferenceResult]:
for input_dir in input_dir_list :
self.process_image_directory_share_dir(task_id,current_time,input_dir,user_name,pwd,output_dir,conf_threshold,iou_threshold,save_mask,save_label,show,result_save)
def process_image_directory_share_dir(self, task_id, input_dir, user_name, pwd, output_dir: Optional[str] = None,
def process_image_directory_share_dir(self, task_id, current_time, input_dir, user_name, pwd, output_dir: Optional[str] = None,
conf_threshold: float = 0.25, iou_threshold: float = 0.5,
save_mask: bool = False, save_label: bool = False, show: bool = False,
result_save: [] = None) -> List[
@ -694,12 +703,13 @@ class YOLOSegmentationInference:
# 推送识别数据到共享目录
tmpConfig = get_conf(input_dir, user_name, pwd)
pile_dict = get_pile_dict(input_dir, user_name, pwd)
road_dict = get_road_dict(f"{tmpConfig['ip']}/{tmpConfig['share']}", user_name, pwd)
road_dict = get_road_dict(f"{tmpConfig['ip']}/{tmpConfig['share']}/{tmpConfig['excel_dir']}", user_name, pwd)
process_dir(road_dict, pile_dict, output_dir)
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
# 识别生成的文件压缩成zip
zip_folder_shutil(output_dir)
move_file_shutil(output_dir+".zip", output_dir)
remote_dir = f"{tmpConfig['dir']}_识别/{task_id}/{current_time}"
# scanner.upload_directory(output_dir, config['share'], remote_dir=remote_dir)
scanner.upload_directory(output_dir, config['share'], remote_dir=remote_dir)
return results
@ -827,7 +837,7 @@ def predict_images(pt_name, zip_url, output_dir="predictions", conf_threshold=0.
new_mask_label = os.path.join(pix_result_dir, label_name)
shutil.rmtree(result_dir) # 删除所有源文件
output_str, class_cells = yoloseg_to_grid_cells_fixed_v5(
output_str, class_cells = yoloseg_to_grid(
new_origin_img, new_mask_label, class_names=classes
)
@ -900,9 +910,10 @@ def predict_images_share_dir(task_id, pt_name, zip_url, user_name, pwd, output_d
share_path=config['share'],
folder_name='图像类',
start_dir=config['dir'],
max_depth=2
max_depth=5
)
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
target_path = "" # 识别图片目录
flag_dir_path = "" # 标识目录
if len(found_paths) > 0:
@ -931,11 +942,11 @@ def predict_images_share_dir(task_id, pt_name, zip_url, user_name, pwd, output_d
# )
# 创建并启动线程
thread1 = threading.Thread(target=inference.process_image_directory_share_dir, args=(task_id, target_path,user_name,pwd,output_dir,conf_threshold,iou_threshold,save_mask,save_label,show,result_save))
thread1 = threading.Thread(target=inference.process_image_directory_share_dir_circle, args=(task_id,current_time,found_paths,user_name,pwd,output_dir,conf_threshold,iou_threshold,save_mask,save_label,show,result_save))
# 启动线程
thread1.start()
else:
print(f"错误: 输入 {zip_url} 不是有效的文件或目录")
return f"{target_path}_识别/{task_id}", "success"
return standardized_path(f"{target_path}_识别/{task_id}/{current_time}/{task_id}.zip"), "success"

View File

@ -12,6 +12,8 @@ import numpy as np
import cv2
import pandas as pd
import io
import shutil
class SMBScanner:
def __init__(self, ip, username, password, domain=''):
@ -614,13 +616,13 @@ class SMBScanner:
# print(f"{indent}文件夹:{entry.name}/")
result.append(item)
if recursive and current_depth < max_depth:
sub_items = self._walk_directory(
self._walk_directory(
entry.path,
recursive,
max_depth,
current_depth + 1
current_depth + 1,
result
)
result.extend(sub_items)
else:
file_size = self._format_size(file_stat.st_size)
mod_time = datetime.fromtimestamp(
@ -635,8 +637,6 @@ class SMBScanner:
except Exception as e:
print(f"无法读取目录 {path}: {e}")
return result
def _format_size(self, size_bytes):
"""格式化文件大小"""
@ -709,12 +709,17 @@ class SMBScanner:
cv2.destroyAllWindows()
print("窗口已关闭")
def standardized_path(path) :
path = path.replace('\\\\', '/')
path = path.replace('\\', '/')
if path.startswith("/"):
path = path.replace('/', '', 1)
return path
# 从传入的路径中提取ip共享目录目标访问目录
def get_conf(zip_url, user_name, pwd) :
zip_url = zip_url.replace('\\\\', '/')
zip_url = zip_url.replace('\\', '/')
if zip_url.startswith("/"):
zip_url = zip_url.replace('/', '', 1)
zip_url = standardized_path(zip_url)
parts = zip_url.split('/')
if len(parts) < 2 :
@ -725,6 +730,12 @@ def get_conf(zip_url, user_name, pwd) :
if len(parts) > 2:
new_parts = parts[2:]
dir = '/'.join(new_parts)
excel_dir = ''
for index, part in enumerate(parts, 0) :
if part == 'Images' and index - 3 > 2 :
excel_dir = '/'.join(parts[2:index-3])
# 配置信息
config = {
@ -733,7 +744,8 @@ def get_conf(zip_url, user_name, pwd) :
'password': pwd,
'domain': '', # 工作组留空
'share': parts[1],
'dir': dir
'dir': dir,
'excel_dir': excel_dir
}
return config
@ -759,9 +771,9 @@ def get_road_dict(dir,user_name,pwd) :
share_path=config['share'],
file_name='每公里指标明细表*.xls',
start_dir=config['dir'],
max_depth=1
max_depth=4
)
print(f"\n找到 {len(found_paths)}'fileindex.txt' 文件:")
print(f"\n找到 {len(found_paths)}'每公里指标明细表*.xls' 文件:")
for i, path in enumerate(found_paths, 1):
print(f"{i}. {path}")
@ -791,7 +803,7 @@ def get_pile_dict(dir,user_name,pwd) :
share_path=config['share'],
file_name='fileindex.txt',
start_dir=config['dir'],
max_depth=8
max_depth=2
)
print(f"\n找到 {len(found_paths)}'fileindex.txt' 文件:")
for i, path in enumerate(found_paths, 1):
@ -824,6 +836,61 @@ def write_to_excel_pandas(data, filename, sheet_name='Sheet1'):
df.to_excel(filename, sheet_name=sheet_name, index=False, header=False)
print(f"数据已写入 {filename}")
def zip_folder_shutil(folder_path, output_path=None):
"""
使用shutil.make_archive压缩文件夹
Args:
folder_path: 要压缩的文件夹路径
output_path: 输出的zip文件路径可选
"""
# 如果未指定输出路径使用文件夹名作为zip文件名
if output_path is None:
output_path = folder_path
# 确保文件夹路径是绝对路径
folder_path = os.path.abspath(folder_path)
# 获取文件夹的父目录和文件夹名
parent_dir, folder_name = os.path.split(folder_path)
try:
# 创建zip文件
shutil.make_archive(
base_name=output_path, # 输出文件的基本名(不包含扩展名)
format='zip', # 压缩格式
root_dir=parent_dir, # 根目录
base_dir=folder_name # 要压缩的目录(相对于根目录)
)
print(f"成功压缩文件夹: {folder_path} -> {output_path}.zip")
return f"{output_path}.zip"
except Exception as e:
print(f"压缩失败: {e}")
return None
def move_file_shutil(source_path, destination_path):
"""
使用shutil.move移动文件或文件夹
Args:
source_path: 源文件/文件夹路径
destination_path: 目标路径
"""
try:
# 检查源文件是否存在
if not os.path.exists(source_path):
print(f"错误: 源文件不存在 - {source_path}")
return False
# 执行移动操作
shutil.move(source_path, destination_path)
print(f"成功移动: {source_path} -> {destination_path}")
return True
except Exception as e:
print(f"移动失败: {e}")
return False
def main():
# 配置信息
config = {
@ -834,6 +901,9 @@ def main():
'share': 'share_File',
'dir': '西南计算机'
}
# 配置数据提取
tmp_conf = get_conf("192.168.110.114/share_File/北碚报送数据/3.25/图像类/C071500109B/Images",'administrator','abc@1234')
# 创建扫描器
scanner = SMBScanner(
@ -844,9 +914,9 @@ def main():
)
# 遍历共享目录
# scanner.list_directory(
# result = scanner.list_directory(
# share_path=config['share'],
# dir=config['dir'],
# dir= '/西南计算机/北碚报送数据/3.25/图像类/C071500109B/Images_识别/77',
# recursive=True, # 递归遍历
# max_depth=9 # 最大深度
# )
@ -933,5 +1003,13 @@ def main():
# pile_dict = get_pile_dict(input_dir, config['username'], config['password'])
# print("-------------------------------------------")
# 检查文件是否存在
# path = '192.168.110.114/share_File/西南计算机/北碚报送数据/3.25/图像类/C071500109B/Images_识别/77/20251120115639/77.zip'
# flag = scanner.directory_exists(path)
# print(f"flag={flag}")
str = '\\\\192.168.110.114\\share_File\\西南计算机/北碚报送数据/3.25/图像类\\C071500109B\\Images_识别/77/20251120172316/77.zip'
print(f"standardized_path(str)={standardized_path(str)}")
if __name__ == "__main__":
main()

View File

@ -1,19 +1,19 @@
import os
import zipfile
import shutil
import cv2
import numpy as np
from collections import defaultdict
import smb
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment
from openpyxl.utils import get_column_letter
import glob
from datetime import datetime
from .smb_tool import *
# ---------------- 常量 ----------------
CELL_AREA = 0.0036 # 每格面积 (平方米)
CELL_WIDTH = 0.1 # 每格宽 (米)
CELL_HEIGHT = 0.1 # 每格高 (米)
CELL_AREA = 0.01 # 每格面积 (平方米)
GRID_WIDTH = 108 # 网格像素宽
GRID_HEIGHT = 102 # 网格像素高
COVER_RATIO = 0.01 # mask 覆盖比例阈值
@ -43,6 +43,18 @@ def num_to_coord(num, cols, cell_w, cell_h):
x2, y2 = x1 + cell_w, y1 + cell_h
return x1, y1, x2, y2
def calc_grid_param(pic_width, pic_height, actual_width, actual_height, grid_x_count=38, grid_y_count=20) :
calc_cell_area = CELL_WIDTH * CELL_HEIGHT # 每格面积 (平方米)
calc_grid_width = 108 # 网格像素宽
calc_grid_height = 102 # 网格像素高
grid_x_count = actual_width / CELL_WIDTH
grid_y_count = actual_height / CELL_HEIGHT
calc_grid_width = round(pic_width / grid_x_count)
calc_grid_height = round(pic_height / grid_y_count)
return calc_cell_area, calc_grid_width, calc_grid_height
def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGHT), save_path=None):
image = cv2.imread(image_path)
if image is None: return
@ -52,7 +64,7 @@ def draw_grid_on_image(image_path, grid_cells, cell_size=(GRID_WIDTH, GRID_HEIGH
overlay = image.copy()
for cname, nums in grid_cells.items():
color = (np.random.randint(64,255),np.random.randint(64,255),np.random.randint(64,255))
for num in nums:
for num in nums[0]:
x1,y1,x2,y2 = num_to_coord(num, cols, cell_w, cell_h)
cv2.rectangle(overlay,(x1,y1),(x2,y2),color,-1)
cv2.addWeighted(overlay,0.4,image,0.6,0,image)
@ -85,8 +97,12 @@ def get_road_info(road_dict, pile_dict, img_file_name):
road_code = parts[0]
road_info = road_dict.get(road_code)
if road_info :
data = road_info['data']
return data
pile_no = convert_special_format(parts[1]) * 1000
for road in road_info :
data = road['data']
if data.get('起桩号(米)',0) > pile_no and pile_no > data.get('止桩号(米)', 0) or data.get('起桩号(米)', 0) < pile_no and pile_no < data.get('止桩号(米)', 0) :
return data
return {}
def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name):
@ -100,7 +116,7 @@ def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name):
pile_no = parts[1]
road_info = road_dict.get(road_code)
if road_info :
data = road_info['data']
data = road_info[0]['data']
pile_no = parts[1]
road_type_cn = data['路面类型(沥青/水泥/砂石)']
identify_width = data['识别宽度(米)']
@ -112,7 +128,7 @@ def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name):
road_type = "gravel"
return road_code, pile_no, road_type
def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,cover_ratio=COVER_RATIO):
def yoloseg_to_grid_share_dir(road_dict,pile_dict,image_path,label_file,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT,cover_ratio=COVER_RATIO):
"""将YOLO-Seg标签转换成格子编号和类别"""
img_file_name = os.path.basename(image_path)
road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name)
@ -268,14 +284,18 @@ def convert_special_format(input_str):
def in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
if increment > 0 :
if cur_pile_no >= tmp_start and cur_pile_no < tmp_end :
return True
return 1
elif cur_pile_no < tmp_start :
return -1 # 会导致死循环
else :
return False
return 0
else :
if cur_pile_no > tmp_end and cur_pile_no <= tmp_start :
return True
return 1
elif cur_pile_no > tmp_start :
return -1 # 会导致死循环
else :
return False
return 0
# 指定间隔区间,输出基本信息+灾害数据
def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interval=10,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT) :
@ -288,34 +308,46 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
road_code = pile_dict.get(img_file_name)[0]
road_level = road_data.get('技术等级', '')
road_type_cn = road_data.get('路面类型(沥青/水泥/砂石)', '沥青')
first_pile_no = round(road_data.get('起桩号(米)',0) / 1000, 3)
last_pile_no = round(road_data.get('止桩号(米)', 0) / 1000, 3)
group_list = summary_data
increment = float(0.001 * interval)
pile_no_start = float(0.000)
pile_no_end = float(0.000) + increment
group_list = []
# 上行/下行
if up_or_down == '下行' :
group_list = list(summary_data)[::-1]
increment = float(-0.001 * interval)
tmp_pile_no = convert_special_format(group_list[len(group_list)-1][1])
group_list = summary_data
list(reversed(group_list))
increment = float(-0.001) * interval
tmp_pile_no = first_pile_no
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no + (tmp_pile_no % increment)
pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment))
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
pile_no_end = tmp_pile_no + increment
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
group_list = summary_data
increment = float(0.001) * interval
pile_no_start = first_pile_no
pile_no_end = pile_no_start + increment
index = 0
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
if tmp_end < last_pile_no :
tmp_end = last_pile_no
print(f"process_info:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
pile_no_s = f"{tmp_start:0.3f}"
pile_no_e = f"{tmp_end:0.3f}"
pile_no_s = format_number_to_k_code(tmp_start)
pile_no_e = format_number_to_k_code(tmp_end)
up_or_down_code = "B" if up_or_down == '下行' else "A"
if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 0 :
# 没在刻度内直接输出无病害数据
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','','']
if road_type=="asphalt":
@ -328,7 +360,7 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
# row.append(f"{0:.2f}")
process_info_data.append(row)
# f.write(','.join(row)+'\n')
else :
elif in_interval_val == 1 :
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','']
subRows = []
while index < len(group_list):
@ -336,8 +368,8 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
if in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
pile_no = f"{tmp_start:0.3f}"
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 1 :
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
@ -358,13 +390,12 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
# f.write(','.join(row)+'\n')
row += ['','','','','','','','','','','','','','','']
process_info_data.append(row)
else :
break;
tmp_start = tmp_end
tmp_end = tmp_start + increment
tmp_end = round(tmp_end, 3)
print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
return process_info_data
def adjust_column_width(worksheet):
@ -435,6 +466,23 @@ def create_multiple_sheets_with_multiple_headers(file_path, excel_data):
wb.save(file_path)
print(f"文件已保存为 {file_path}")
def format_number_to_k_code(number):
"""
将数字转换为 K0000+000 格式
"""
num_str = str(number)
# 处理整数情况(如 1.0 -> 1
if '.' not in num_str:
integer_part = int(num_str)
decimal_part = "000"
else:
integer_part, decimal_part = num_str.split('.')
integer_part = int(integer_part)
# 确保小数部分有3位不足补0
decimal_part = decimal_part.ljust(3, '0')[:3]
return f"K{integer_part:04d}+{decimal_part}"
# ---------------- 主函数-共享目录 ----------------
def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=GRID_WIDTH,grid_height=GRID_HEIGHT):
os.makedirs(dir,exist_ok=True)
@ -456,7 +504,7 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
with open(grid_txt_path,'w',encoding='utf-8') as f:
f.write(out_txt)
# 生成网格可视化
# draw_grid_on_image(image_path,class_cells,save_path=os.path.splitext(image_path)[0]+"_grid.jpg")
draw_grid_on_image(image_path,class_cells,cell_size=(GRID_WIDTH, GRID_HEIGHT),save_path=os.path.splitext(image_path)[0]+"_grid.jpg")
# 统计各类面积
counts = {k:[len(v[0])*cell_area, v[1][0], v[1][1]] for k,v in class_cells.items()}
# total_area = sum(counts.values())
@ -486,7 +534,8 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
road_type = summary_data[0][3]
min_pile, max_pile = get_min_max_pile(summary_data)
out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
header = generate_header(road_type)
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
@ -503,13 +552,15 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
f.write(','.join(row)+'\n')
print(f"输出完成: {out_file}")
# 灾害数据.txt
# 灾害数据.txt
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
road_code = pile_dict.get(img_file_name)[0]
first_pile_no = round(road_data.get('起桩号(米)',0) / 1000, 3)
last_pile_no = round(road_data.get('止桩号(米)', 0) / 1000, 3)
group_by_road_type = {}
for data in summary_data:
group_by_road_type.setdefault(data[3], []).append(data)
@ -517,23 +568,29 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
for road_type, group in group_by_road_type.items():
min_pile, max_pile = get_min_max_pile(group)
print(f"DR/{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
out_file = os.path.join(dir,f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
header = generate_header(road_type)
group_list = group
increment = float(0.010)
pile_no_start = float(0.000)
pile_no_end = float(0.000) + increment
group_list = []
# 上行/下行
if up_or_down == '下行' :
group_list = list(group)[::-1]
group_list = group
list(reversed(group_list))
increment = float(-0.010)
tmp_pile_no = convert_special_format(group_list[len(group_list)-1][1])
tmp_pile_no = first_pile_no
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no + (tmp_pile_no % increment)
pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment))
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
pile_no_end = tmp_pile_no + increment
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
group_list = group
increment = float(0.010)
pile_no_start = first_pile_no
pile_no_end = pile_no_start + increment
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
@ -541,13 +598,19 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
if tmp_end < last_pile_no :
tmp_end = last_pile_no
print(f"process_dir:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
if not in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 0 :
# 没在刻度内直接输出无病害数据
pile_no = f"{tmp_start:0.3f}"
row = [pile_no,identify_width,f"{0:.2f}"]
pile_no = format_number_to_k_code(tmp_start)
row = [pile_no,f"{identify_width}",f"{0:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
@ -557,16 +620,16 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
for k in keys:
row.append(f"{0:.2f}")
f.write(','.join(row)+'\n')
else :
row = [f"{tmp_start:0.3f}", identify_width]
elif in_interval_val == 1 :
row = [format_number_to_k_code(tmp_start), f"{identify_width}"]
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
if in_interval(increment, cur_pile_no, tmp_start, tmp_end) :
pile_no = f"{tmp_start:0.3f}"
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 1 :
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
@ -585,17 +648,16 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)]
row += column_sums
f.write(','.join(row)+'\n')
else :
break
tmp_start = tmp_end
tmp_end = tmp_start + increment
print(f"tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
tmp_end = round(tmp_end, 3)
print(f"输出完成: {out_file}")
# 病害明显列表.xlsx
# 病害明细列表.xlsx
print("输出:病害明细列表.xlsx")
headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注']
data_list = []
if summary_data:
@ -613,9 +675,10 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
data_list.append(excel_data)
all_data = [headers] + data_list
smb.write_to_excel_pandas(all_data, img_file_path + '/病害明细列表.xlsx')
write_to_excel_pandas(all_data, img_file_path + '/excel/病害明细列表.xlsx')
# 综合明细表.xlsx
print("输出:综合明细表.xlsx")
heads = ['路线编码','起点','终点','车道编码','上下行','公路等级','路面类型','PQI','PQI等级','DR(%)','PCI','PCI等级','IRI','RQI','RQI等级','RD','RDI','RDI等级','SMTD','PBI','PBI等级','WR','PWI','PWI等级','备注']
data1 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,10)
data2 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,100)
@ -641,7 +704,8 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
'data': data3
}
]
create_multiple_sheets_with_multiple_headers(img_file_path + '/综合明细表.xlsx', excel_data)
create_multiple_sheets_with_multiple_headers(img_file_path + '/excel/综合明细表.xlsx', excel_data)
@ -666,6 +730,8 @@ def process_zip(zip_path,pile_map_file,output_dir="output",cell_area=CELL_AREA,g
summary_data = []
for root,_,files in os.walk(output_dir):
for file in files:
if file.lower().endswith(("_grid.jpg","_grid.png","_grid.jpeg","_grid.bmp")) :
continue
if file.lower().endswith((".jpg",".png",".jpeg",".bmp")) :
image_path = os.path.join(root,file)
label_file = os.path.splitext(image_path)[0]+".txt"
@ -800,7 +866,13 @@ if __name__=="__main__":
# road_dict = smb.get_road_dict("192.168.110.114/share_File/西南计算机", "administrator", "abc@1234")
# process_dir(road_dict, pile_dict, output_dir)
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/7"
# calc_cell_area, calc_grid_width, calc_grid_height = calc_grid_param(2048, 4096, 3.6, 2)
# print(f"calc_cell_area={calc_cell_area}, calc_grid_width={calc_grid_width}, calc_grid_height={calc_grid_height}")
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/77"
pile_dict = get_pile_dict(output_dir)
road_dict = get_road_dict(output_dir)
process_dir(road_dict, pile_dict, output_dir)