133 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import os
import time
from torch.fx.experimental.unification.multipledispatch.dispatcher import source
from draw_numbers_and_boundary import draw_json_boundary
from middleware.minio_util import downFile, upload_file, upload_folder
from middleware.mqtt_pub import MQTTClient, global_message_queue
from middleware.read_json_cal_coord_by_pixel import parse_json_data
from middleware.util import get_current_date_and_milliseconds
from uv_prediction import predict_pic
# MQTT 代理地址和端口
broker = "8.137.54.85" # 公共 MQTT 代理(免费)
port = 1883 # MQTT 默认端口
# 主题
topic = "ai/tottle/uvmodule"
publish_topic = "thing/product/ai/events"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
uv_source_url="uv_source_url"
# 配置日志
if __name__ == "__main__":
# 初始化 MQTT 客户端
mqtt_client = MQTTClient(broker, port, topic)
while True:
try:
raw_message = mqtt_client.get_messages() # 阻塞等待消息
print(f"uvmodule接收消息 {raw_message}")
if raw_message is None: # 终止信号
break
logger.debug(f"处理原始消息: {raw_message}")
task_id=raw_message["task_id"] # 任务id
list_s3_url=raw_message["list_s3_url"] # s3 img 地址集合
list_func_id=raw_message["list_func_id"] # 方法id集合
formate_date,ts=get_current_date_and_milliseconds()
local_img_url_list = []
minio_url_list=[]
local_file_path=[]
for img_url in list_s3_url:
print(f"img_url 路径{img_url}")
pic = downFile(img_url)
print(f"pic 路径{os.path.abspath(pic)}")
pic_path = os.path.abspath(pic)
# local_img_url_list.append(pic_path)
result_img=predict_pic(task_id, 12, pic_path)
print(f"draw_json_boundary 1")
final_vis_png_path, instance_results_json_path = draw_json_boundary(result_img)
print(f"draw_json_boundary 2")
crs_info, coor_json_path=parse_json_data(instance_results_json_path,pic_path)
print(f"parse_json_data 1")
minio_img_url,pic_type=upload_file(pic_path, None)
print(f"upload_file 1")
minio_result_img,result_pic_type=upload_file(result_img, None)
print(f"upload_file 2")
minio_final_vis_png_path,final_vis_pic_type=upload_file(final_vis_png_path, None)
print(f"upload_file 3")
# minio_instance_results_json_path,instance_json_type=upload_file(instance_results_json_path, None)
if coor_json_path is None:
minio_instance_results_json_path, instance_json_type = upload_file(instance_results_json_path, None)
local_file_path.append(instance_results_json_path)
else:
minio_instance_results_json_path, instance_json_type = upload_file(coor_json_path, None)
local_file_path.append(coor_json_path)
print(f"upload_file 4")
local_file_path.append(pic_path)
local_file_path.append(result_img)
local_file_path.append(final_vis_png_path)
print(f"upload_file 5")
# minio_instance_results_json_path,instance_json_type=upload_file(instance_results_json_path, None)
minio_url_list.append({
"minio_path_before":minio_img_url,
"minio_path_after":minio_result_img,
"minio_path_boundary":minio_final_vis_png_path,
"minio_path_json":minio_instance_results_json_path,
"crs_info":crs_info, #tif坐标系信息不同坐标系三维坐标值差很多
"file_type":"pic"
})
local_img_url_list.append(minio_img_url)
local_img_url_list.append(minio_result_img)
local_img_url_list.append(minio_final_vis_png_path)
local_img_url_list.append(minio_instance_results_json_path)
for f in local_img_url_list:
if os.path.exists(f):
os.remove(f) # 对本地下载的文件,上传文成之后,要做删除
list_func_id = [20000, 20001] # 方法id集合
# result_img_list=predict(task_id, 12, local_img_url_list)
#
# for pic_url in local_img_url_list:
# upload_file(pic_url,None)
#
#
# for result_img in result_img_list:
#
# final_vis_png_path,instance_results_json_path=draw_json_boundary(result_img)
# upload_file(result_img, None)
# upload_file(final_vis_png_path, None)
# upload_file(instance_results_json_path, None)
#
#
# print(result_img)
message={
"task_id":task_id,
"minio":minio_url_list
}
json_message = json.dumps(message, indent=4, ensure_ascii=False)
mqtt_client.publish_uv_result(publish_topic,json_message)
time.sleep(0.5)
for file_path in local_file_path:
if os.path.exists(file_path):
os.remove(file_path)
except KeyboardInterrupt:
logger.info("收到中断信号,停止消息处理...")
break
except Exception as e:
logger.error(f"消息处理循环发生未预期错误: {e}")
time.sleep(1) # 避免频繁错误导致CPU占用过高