灾害数据输出上行下行逻辑调整,共享目录识别,异步更新任务状态为已结束

This commit is contained in:
liyubo 2025-12-15 11:01:38 +08:00
parent 2a83054687
commit 6289a6d1e3
2 changed files with 309 additions and 208 deletions

View File

@ -16,6 +16,7 @@ import matplotlib.pyplot as plt
from ultralytics import YOLO
from middleware.recognition_task import RecognitionTaskDAO, RecognitionTask
from middleware.minio_util import upload_file, downFile, check_zip_size, upload_folder
from util.yolo2pix_new import *
from util.smb_tool import *
@ -643,7 +644,7 @@ class YOLOSegmentationInference:
InferenceResult]:
for input_dir in input_dir_list :
self.process_image_directory_share_dir(task_id,current_time,input_dir,user_name,pwd,output_dir,conf_threshold,iou_threshold,save_mask,save_label,show,result_save)
del_file_shutil(output_dir)
def process_image_directory_share_dir(self, task_id, current_time, input_dir, user_name, pwd, output_dir: Optional[str] = None,
conf_threshold: float = 0.25, iou_threshold: float = 0.5,
@ -666,7 +667,9 @@ class YOLOSegmentationInference:
推理结果列表
"""
results = []
print(f"正在处理共享目录: {input_dir} ")
tmp_output_dir = output_dir + "\\" + datetime.now().strftime("%Y%m%d%H%M%S")
print(f"正在处理共享目录: {input_dir} - {tmp_output_dir}")
try:
# 检查目录是否存在
config = get_conf(zip_url=input_dir, user_name=user_name, pwd=pwd)
@ -690,7 +693,7 @@ class YOLOSegmentationInference:
image_path=image_path,
user_name=user_name,
pwd=pwd,
output_dir=output_dir,
output_dir=tmp_output_dir,
conf_threshold=conf_threshold,
iou_threshold=iou_threshold,
save_mask=save_mask,
@ -704,14 +707,27 @@ class YOLOSegmentationInference:
tmpConfig = get_conf(input_dir, user_name, pwd)
pile_dict = get_pile_dict(input_dir, user_name, pwd)
road_dict = get_road_dict(f"{tmpConfig['ip']}/{tmpConfig['share']}/{tmpConfig['excel_dir']}", user_name, pwd)
process_dir(road_dict, pile_dict, output_dir)
process_dir(road_dict, pile_dict, tmp_output_dir)
# 识别生成的文件压缩成zip
zip_folder_shutil(output_dir)
move_file_shutil(output_dir+".zip", output_dir)
zip_folder_shutil(tmp_output_dir)
move_file_shutil(tmp_output_dir+".zip", tmp_output_dir + f"/{task_id}.zip")
remote_dir = f"{tmpConfig['dir']}_识别/{task_id}/{current_time}"
scanner.upload_directory(output_dir, config['share'], remote_dir=remote_dir)
del_file_shutil(output_dir)
scanner.upload_directory(tmp_output_dir, config['share'], remote_dir=remote_dir)
del_file_shutil(tmp_output_dir)
# 更新识别任务状态为已结束
DB_CONFIG = {
"dbname": "postgres",
"user": "admin",
"password": "admin.123",
"host": "127.0.0.1",
"port": "5432"
}
dao = RecognitionTaskDAO(DB_CONFIG)
recognition_task = RecognitionTask(
task_id=task_id,
status=2
)
dao.update_recognition_task(recognition_task)
return results
except PermissionError:

View File

@ -95,19 +95,39 @@ def detect_road_type_from_content(label_file):
if kw in content: return "gravel"
return "gravel"
def get_road_info(road_dict, pile_dict, img_file_name):
def get_road_info(summary_data, road_dict, pile_dict, img_file_name):
"""获取路线信息"""
parts = pile_dict.get(img_file_name)
if parts :
road_code = parts[0]
road_info = road_dict.get(road_code)
if road_info :
pile_no = convert_special_format(parts[1]) * 1000
pile_no_min = convert_special_format(summary_data[0][0]) * 1000
pile_no_max = convert_special_format(summary_data[len(summary_data)-1][0]) * 1000
if pile_no_max < pile_no_min : #上行下行
tmp = pile_no_max
pile_no_max = pile_no_min
pile_no_min = tmp
final_excel_start_pile_no = pile_no_min
final_excel_end_pile_no = pile_no_max
for road in road_info :
data = road['data']
if data.get('起桩号(米)',0) > pile_no and pile_no > data.get('止桩号(米)', 0) or data.get('起桩号(米)', 0) < pile_no and pile_no < data.get('止桩号(米)', 0) :
data = road.get("data")
excel_start_pile_no = data.get('起桩号(米)',0)
excel_end_pile_no = data.get('止桩号(米)',0)
if excel_start_pile_no < final_excel_start_pile_no :
final_excel_start_pile_no = excel_start_pile_no
if excel_end_pile_no > final_excel_end_pile_no :
final_excel_end_pile_no = excel_end_pile_no
# up_down = data.get("方向(上行/下行)")
if excel_start_pile_no <= pile_no_min and excel_end_pile_no >= pile_no_max:
return data
# 起始结束桩号不匹配
data = road_info[0].get("data")
data['起桩号(米)'] = final_excel_start_pile_no
data['止桩号(米)'] = final_excel_end_pile_no
return data
return {}
def detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name):
@ -307,7 +327,7 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
process_info_data = []
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
road_data = get_road_info(summary_data, road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
road_code = pile_dict.get(img_file_name)[0]
@ -322,7 +342,7 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
group_list = summary_data
list(reversed(group_list))
increment = float(-0.001) * interval
tmp_pile_no = first_pile_no
tmp_pile_no = last_pile_no
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment))
@ -340,20 +360,23 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
if tmp_end < last_pile_no :
# 上行
if (tmp_start < last_pile_no and tmp_end > last_pile_no) :
tmp_end = last_pile_no
if tmp_start >= last_pile_no and tmp_end >= last_pile_no :
break
# 下行
if (tmp_start > first_pile_no and tmp_end < first_pile_no) :
tmp_end = first_pile_no
if tmp_start <= first_pile_no and tmp_end <= first_pile_no :
break
print(f"process_info:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
pile_no_s = format_number_to_k_code(tmp_start)
pile_no_e = format_number_to_k_code(tmp_end)
up_or_down_code = "B" if up_or_down == '下行' else "A"
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 0 :
# 没在刻度内直接输出无病害数据
# 无灾害数据直接输出无病害数据
pile_no_s = format_number_to_k_code(tmp_start)
pile_no_e = format_number_to_k_code(tmp_end)
up_or_down_code = "B" if up_or_down == '下行' else "A"
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','','']
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
@ -361,42 +384,60 @@ def process_info(road_dict,pile_dict,summary_data,image_path,current_time,interv
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
# for k in keys:
# row.append(f"{0:.2f}")
process_info_data.append(row)
# f.write(','.join(row)+'\n')
elif in_interval_val == 1 :
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','']
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 1 :
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
# for k in keys:
# tmp_row.append(counts.get(k, [0,0,0])[0])
subRows.append(tmp_row)
index = index + 1
else :
break
# 同列汇总 10m一个区间--对应5张图
column_sums = [f"{(sum(column)/(interval / 2)):0.2f}" for column in zip(*subRows)]
row += column_sums
# f.write(','.join(row)+'\n')
row += ['','','','','','','','','','','','','','','']
process_info_data.append(row)
else :
break;
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
pile_no_s = format_number_to_k_code(tmp_start)
pile_no_e = format_number_to_k_code(tmp_end)
up_or_down_code = "B" if up_or_down == '下行' else "A"
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 0 :
# 没在刻度内直接输出无病害数据
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','',f"{0:.2f}",'','','','','','','','','','','','','','','']
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
# for k in keys:
# row.append(f"{0:.2f}")
process_info_data.append(row)
# f.write(','.join(row)+'\n')
elif in_interval_val == 1 :
row = [road_code, pile_no_s,pile_no_e,up_or_down_code,up_or_down,road_level,f"{road_type_cn}路面",'','']
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 1 :
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
# for k in keys:
# tmp_row.append(counts.get(k, [0,0,0])[0])
subRows.append(tmp_row)
index = index + 1
else :
break
# 同列汇总 10m一个区间--对应5张图
column_sums = [f"{(sum(column)/(interval / 2)):0.2f}" for column in zip(*subRows)]
row += column_sums
# f.write(','.join(row)+'\n')
row += ['','','','','','','','','','','','','','','']
process_info_data.append(row)
else :
break;
tmp_start = tmp_end
tmp_end = tmp_start + increment
tmp_end = round(tmp_end, 3)
@ -471,6 +512,12 @@ def create_multiple_sheets_with_multiple_headers(file_path, excel_data):
wb.save(file_path)
print(f"文件已保存为 {file_path}")
def format_number_three_decimal_places(number) :
"""
将数字转换为 三位小数字符串
"""
return f"{number:0.3f}"
def format_number_to_k_code(number):
"""
将数字转换为 K0000+000 格式
@ -531,160 +578,21 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
summary_data.append((pile_no, DR, counts, road_type))
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
# 写桩号问题列表.txt
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
road_code = pile_dict.get(img_file_name)[0]
road_type = summary_data[0][3]
min_pile, max_pile = get_min_max_pile(summary_data)
print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
os.makedirs(f"{dir}/DR", exist_ok=True)
out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
header = generate_header(road_type)
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
for pile_no,DR,counts,rt in summary_data:
row = [pile_no,"3.6",f"{DR:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{counts.get(k,[0,0,0])[0]:.2f}")
f.write(','.join(row)+'\n')
print(f"输出完成: {out_file}")
# 写桩号问题列表.txt
process_damage_detail_txt(road_dict, pile_dict, dir, summary_data, image_path, current_time)
# 灾害数据.txt
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
road_code = pile_dict.get(img_file_name)[0]
first_pile_no = round(road_data.get('起桩号(米)',0) / 1000, 3)
last_pile_no = round(road_data.get('止桩号(米)', 0) / 1000, 3)
group_by_road_type = {}
for data in summary_data:
group_by_road_type.setdefault(data[3], []).append(data)
for road_type, group in group_by_road_type.items():
min_pile, max_pile = get_min_max_pile(group)
print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
header = generate_header(road_type)
group_list = []
# 上行/下行
if up_or_down == '下行' :
group_list = group
list(reversed(group_list))
increment = float(-0.010)
tmp_pile_no = first_pile_no
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment))
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
pile_no_end = tmp_pile_no + increment
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
group_list = group
increment = float(0.010)
pile_no_start = first_pile_no
pile_no_end = pile_no_start + increment
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
index = 0
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
if tmp_end < last_pile_no :
tmp_end = last_pile_no
print(f"process_dir:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
break
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 0 :
# 没在刻度内直接输出无病害数据
pile_no = format_number_to_k_code(tmp_start)
row = [pile_no,f"{identify_width}",f"{0:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{0:.2f}")
f.write(','.join(row)+'\n')
elif in_interval_val == 1 :
row = [format_number_to_k_code(tmp_start), f"{identify_width}"]
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 1 :
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
tmp_row.append(counts.get(k, [0,0,0])[0])
subRows.append(tmp_row)
index = index + 1
else :
break
# 同列汇总 10m一个区间--对应5张图
column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)]
row += column_sums
f.write(','.join(row)+'\n')
else :
break
tmp_start = tmp_end
tmp_end = tmp_start + increment
tmp_end = round(tmp_end, 3)
print(f"输出完成: {out_file}")
process_damage_txt(road_dict, pile_dict, dir, summary_data, image_path, current_time)
# 病害明细列表.xlsx
print("输出:病害明细列表.xlsx")
os.makedirs(f"{dir}/excel", exist_ok=True)
headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注']
data_list = []
if summary_data:
img_file_path = os.path.dirname(image_path)
img_file_name = os.path.basename(image_path)
road_data = get_road_info(road_dict, pile_dict, img_file_name)
road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
excel_index = 1
for data in summary_data:
damage_data = data[2]
for attr_name, attr_value in damage_data.items():
excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value[1]*cell_area, attr_value[2]*cell_area, attr_value[0], '', '']
data_list.append(excel_data)
all_data = [headers] + data_list
smb_tool.write_to_excel_pandas(all_data, img_file_path + '/excel/病害明细列表.xlsx')
img_file_path = process_damage_detail_excel(road_dict, pile_dict, dir, cell_area, summary_data, image_path)
# 综合明细表.xlsx
process_damage_composite_excel(road_dict, pile_dict, summary_data, image_path, current_time, img_file_path)
def process_damage_composite_excel(road_dict, pile_dict, summary_data, image_path, current_time, img_file_path):
print("输出:综合明细表.xlsx")
heads = ['路线编码','起点','终点','车道编码','上下行','公路等级','路面类型','PQI','PQI等级','DR(%)','PCI','PCI等级','IRI','RQI','RQI等级','RD','RDI','RDI等级','SMTD','PBI','PBI等级','WR','PWI','PWI等级','备注']
data1 = process_info(road_dict,pile_dict,summary_data,image_path,current_time,10)
@ -714,6 +622,183 @@ def process_dir(road_dict,pile_dict,dir="output",cell_area=CELL_AREA,grid_width=
create_multiple_sheets_with_multiple_headers(img_file_path + '/excel/综合明细表.xlsx', excel_data)
def process_damage_detail_txt(road_dict, pile_dict, dir, summary_data, image_path, current_time):
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(summary_data, road_dict, pile_dict, img_file_name)
road_code = pile_dict.get(img_file_name)[0]
road_type = summary_data[0][3]
min_pile, max_pile = get_min_max_pile(summary_data)
print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
os.makedirs(f"{dir}/DR", exist_ok=True)
out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-detail-{current_time}.txt")
header = generate_header(road_type)
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
for pile_no,DR,counts,rt in summary_data:
row = [pile_no,"3.6",f"{DR:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{counts.get(k,[0,0,0])[0]:.2f}")
f.write(','.join(row)+'\n')
print(f"输出完成: {out_file}")
def process_damage_detail_excel(road_dict, pile_dict, dir, cell_area, summary_data, image_path):
print("输出:病害明细列表.xlsx")
os.makedirs(f"{dir}/excel", exist_ok=True)
headers = ['序号','路线编码','方向','桩号','路面类型','病害名称','程度','长度(m)',' 宽度(m)',' 面积(㎡)',' 横向位置','备注']
data_list = []
if summary_data:
img_file_path = os.path.dirname(image_path)
img_file_name = os.path.basename(image_path)
road_data = get_road_info(summary_data, road_dict, pile_dict, img_file_name)
road_code, pile_no, road_type = detect_road_type_from_road_dict(road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
excel_index = 1
for data in summary_data:
damage_data = data[2]
for attr_name, attr_value in damage_data.items():
excel_data = [excel_index, road_code, up_or_down, f"K000{data[0]}", ROAD_TYPE_EN_TO_CN.get(road_type), attr_name, '', attr_value[1]*cell_area, attr_value[2]*cell_area, attr_value[0], '', '']
data_list.append(excel_data)
all_data = [headers] + data_list
smb_tool.write_to_excel_pandas(all_data, img_file_path + '/excel/病害明细列表.xlsx')
return img_file_path
def process_damage_txt(road_dict, pile_dict, dir, summary_data, image_path, current_time):
if summary_data:
img_file_name = os.path.basename(image_path)
road_data = get_road_info(summary_data, road_dict, pile_dict, img_file_name)
identify_width = road_data.get('识别宽度(米)', '3.6')
up_or_down = road_data.get('方向(上行/下行)', '上行')
road_code = pile_dict.get(img_file_name)[0]
first_pile_no = round(road_data.get('起桩号(米)',0) / 1000, 3)
last_pile_no = round(road_data.get('止桩号(米)', 0) / 1000, 3)
group_by_road_type = {}
for data in summary_data:
group_by_road_type.setdefault(data[3], []).append(data)
for road_type, group in group_by_road_type.items():
min_pile, max_pile = get_min_max_pile(group)
print(f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
out_file = os.path.join(f"{dir}/DR",f"{road_code}-DR-{min_pile:0.3f}-{max_pile:0.3f}-{current_time}.txt")
header = generate_header(road_type)
group_list = []
# 上行/下行
if up_or_down == '下行' :
group_list = group
list(reversed(group_list))
increment = float(-0.010)
tmp_pile_no = last_pile_no
pile_no_start = tmp_pile_no
if tmp_pile_no % increment != 0 :
pile_no_end = tmp_pile_no - (tmp_pile_no % (-increment))
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
pile_no_end = tmp_pile_no + increment
pile_no_end = round(pile_no_end, 3) #处理精度问题
else :
group_list = group
increment = float(0.010)
pile_no_start = first_pile_no
pile_no_end = pile_no_start + increment
with open(out_file,'w',encoding='utf-8') as f:
f.write(header+'\n')
index = 0
tmp_start = pile_no_start
tmp_end = pile_no_end
while True :
# 上行
if (tmp_start < last_pile_no and tmp_end > last_pile_no) :
tmp_end = last_pile_no
if tmp_start >= last_pile_no and tmp_end >= last_pile_no :
break
# 下行
if (tmp_start > first_pile_no and tmp_end < first_pile_no) :
tmp_end = first_pile_no
if tmp_start <= first_pile_no and tmp_end <= first_pile_no :
break
print(f"process_dir:tmp_start={tmp_start}, tmp_end={tmp_end}, index={index}, len(group_list)={len(group_list)}")
if index >= len(group_list) :
# 无灾害直接输出无病害数据
pile_no = format_number_three_decimal_places(tmp_start)
row = [pile_no,f"{identify_width}",f"{0:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{0:.2f}")
f.write(','.join(row)+'\n')
else :
# 每10m一个区间在区间内进行灾害计算
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 0 :
# 没在刻度内直接输出无病害数据
pile_no = format_number_three_decimal_places(tmp_start)
row = [pile_no,f"{identify_width}",f"{0:.2f}"]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
row.append(f"{0:.2f}")
f.write(','.join(row)+'\n')
elif in_interval_val == 1 :
row = [format_number_three_decimal_places(tmp_start), f"{identify_width}"]
subRows = []
while index < len(group_list):
pile_no, DR, counts, road_type = group_list[index]
cur_pile_no = convert_special_format(pile_no)
tmp_row = []
in_interval_val = in_interval(increment, cur_pile_no, tmp_start, tmp_end)
if in_interval_val == 1 :
tmp_row = [DR]
if road_type=="asphalt":
keys = list(CLASS_MAP_ASPHALT.keys())
elif road_type=="cement":
keys = list(CLASS_MAP_CEMENT.keys())
else:
keys = list(CLASS_MAP_GRAVEL.keys())
for k in keys:
tmp_row.append(counts.get(k, [0,0,0])[0])
subRows.append(tmp_row)
index = index + 1
else :
break
# 同列汇总 10m一个区间--对应5张图
column_sums = [f"{(sum(column)/5):0.2f}" for column in zip(*subRows)]
row += column_sums
f.write(','.join(row)+'\n')
else :
break
tmp_start = tmp_end
tmp_end = tmp_start + increment
tmp_end = round(tmp_end, 3)
print(f"输出完成: {out_file}")
# ---------------- 主函数 ----------------
@ -876,7 +961,7 @@ if __name__=="__main__":
# calc_cell_area, calc_grid_width, calc_grid_height = calc_grid_param(2048, 4096, 3.6, 2)
# print(f"calc_cell_area={calc_cell_area}, calc_grid_width={calc_grid_width}, calc_grid_height={calc_grid_height}")
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/77"
output_dir = "D:/devForBdzlWork/ai-train_platform/predictions/CV78500155B"
pile_dict = get_pile_dict(output_dir)
road_dict = get_road_dict(output_dir)
process_dir(road_dict, pile_dict, output_dir)