133 lines
5.8 KiB
Python
133 lines
5.8 KiB
Python
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
|
||
from torch.fx.experimental.unification.multipledispatch.dispatcher import source
|
||
|
||
from draw_numbers_and_boundary import draw_json_boundary
|
||
from middleware.minio_util import downFile, upload_file, upload_folder
|
||
from middleware.mqtt_pub import MQTTClient, global_message_queue
|
||
from middleware.read_json_cal_coord_by_pixel import parse_json_data
|
||
from middleware.util import get_current_date_and_milliseconds
|
||
from uv_prediction import predict_pic
|
||
|
||
# MQTT 代理地址和端口
|
||
broker = "8.137.54.85" # 公共 MQTT 代理(免费)
|
||
port = 1883 # MQTT 默认端口
|
||
# 主题
|
||
topic = "ai/tottle/uvmodule"
|
||
publish_topic = "thing/product/ai/events"
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
uv_source_url="uv_source_url"
|
||
|
||
# 配置日志
|
||
if __name__ == "__main__":
|
||
# 初始化 MQTT 客户端
|
||
mqtt_client = MQTTClient(broker, port, topic)
|
||
|
||
while True:
|
||
try:
|
||
raw_message = mqtt_client.get_messages() # 阻塞等待消息
|
||
print(f"uvmodule接收消息 {raw_message}")
|
||
if raw_message is None: # 终止信号
|
||
break
|
||
|
||
logger.debug(f"处理原始消息: {raw_message}")
|
||
task_id=raw_message["task_id"] # 任务id
|
||
list_s3_url=raw_message["list_s3_url"] # s3 img 地址集合
|
||
list_func_id=raw_message["list_func_id"] # 方法id集合
|
||
|
||
formate_date,ts=get_current_date_and_milliseconds()
|
||
|
||
local_img_url_list = []
|
||
minio_url_list=[]
|
||
local_file_path=[]
|
||
for img_url in list_s3_url:
|
||
print(f"img_url 路径{img_url}")
|
||
pic = downFile(img_url)
|
||
print(f"pic 路径{os.path.abspath(pic)}")
|
||
pic_path = os.path.abspath(pic)
|
||
# local_img_url_list.append(pic_path)
|
||
|
||
result_img=predict_pic(task_id, 12, pic_path)
|
||
print(f"draw_json_boundary 1")
|
||
final_vis_png_path, instance_results_json_path = draw_json_boundary(result_img)
|
||
print(f"draw_json_boundary 2")
|
||
crs_info, coor_json_path=parse_json_data(instance_results_json_path,pic_path)
|
||
print(f"parse_json_data 1")
|
||
minio_img_url,pic_type=upload_file(pic_path, None)
|
||
print(f"upload_file 1")
|
||
minio_result_img,result_pic_type=upload_file(result_img, None)
|
||
print(f"upload_file 2")
|
||
minio_final_vis_png_path,final_vis_pic_type=upload_file(final_vis_png_path, None)
|
||
print(f"upload_file 3")
|
||
# minio_instance_results_json_path,instance_json_type=upload_file(instance_results_json_path, None)
|
||
if coor_json_path is None:
|
||
minio_instance_results_json_path, instance_json_type = upload_file(instance_results_json_path, None)
|
||
local_file_path.append(instance_results_json_path)
|
||
else:
|
||
minio_instance_results_json_path, instance_json_type = upload_file(coor_json_path, None)
|
||
local_file_path.append(coor_json_path)
|
||
print(f"upload_file 4")
|
||
local_file_path.append(pic_path)
|
||
local_file_path.append(result_img)
|
||
local_file_path.append(final_vis_png_path)
|
||
print(f"upload_file 5")
|
||
# minio_instance_results_json_path,instance_json_type=upload_file(instance_results_json_path, None)
|
||
minio_url_list.append({
|
||
"minio_path_before":minio_img_url,
|
||
"minio_path_after":minio_result_img,
|
||
"minio_path_boundary":minio_final_vis_png_path,
|
||
"minio_path_json":minio_instance_results_json_path,
|
||
"crs_info":crs_info, #tif坐标系信息,不同坐标系三维坐标值差很多
|
||
"file_type":"pic"
|
||
})
|
||
local_img_url_list.append(minio_img_url)
|
||
local_img_url_list.append(minio_result_img)
|
||
local_img_url_list.append(minio_final_vis_png_path)
|
||
local_img_url_list.append(minio_instance_results_json_path)
|
||
|
||
for f in local_img_url_list:
|
||
if os.path.exists(f):
|
||
os.remove(f) # 对本地下载的文件,上传文成之后,要做删除
|
||
list_func_id = [20000, 20001] # 方法id集合
|
||
# result_img_list=predict(task_id, 12, local_img_url_list)
|
||
|
||
#
|
||
# for pic_url in local_img_url_list:
|
||
# upload_file(pic_url,None)
|
||
#
|
||
#
|
||
# for result_img in result_img_list:
|
||
#
|
||
# final_vis_png_path,instance_results_json_path=draw_json_boundary(result_img)
|
||
# upload_file(result_img, None)
|
||
# upload_file(final_vis_png_path, None)
|
||
# upload_file(instance_results_json_path, None)
|
||
#
|
||
#
|
||
# print(result_img)
|
||
|
||
message={
|
||
"task_id":task_id,
|
||
"minio":minio_url_list
|
||
}
|
||
|
||
json_message = json.dumps(message, indent=4, ensure_ascii=False)
|
||
mqtt_client.publish_uv_result(publish_topic,json_message)
|
||
time.sleep(0.5)
|
||
for file_path in local_file_path:
|
||
if os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
except KeyboardInterrupt:
|
||
logger.info("收到中断信号,停止消息处理...")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"消息处理循环发生未预期错误: {e}")
|
||
time.sleep(1) # 避免频繁错误导致CPU占用过高 |