ai_project_v1/touying/getmq_sendresult.py

124 lines
5.0 KiB
Python
Raw Permalink Normal View History

import asyncio
import json
import logging
import time
from typing import Dict, List, Any
from middleware.entity.camera_para import read_camera_params
from touying.ImageReproject_python.cal_func import red_line_reproject, read_txt
from touying.read_hongxian_file import process_geojson_data, convert_processed_to_points, \
add_result_to_prcessdata
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 自定义计算类 - 处理接收到的MQTT消息
class CalTouYing:
def __init__(self,local_file_path, camera_file_path,img_width=1920, img_height=1080):
self.img_width = img_width
self.img_height = img_height
# self.points = read_txt(r"D:\project\test-touying\ImageReproject_python\points.txt")
self.processed_data=process_geojson_data(local_file_path) #将json文本读取到内存
self.camera_para=read_camera_params(camera_file_path)
self.processed_points=convert_processed_to_points(self.processed_data) #做格式转为Point的集合
def process(self, data: Dict) -> None | dict[str, int | float | Any] | dict[str, str | dict]:
"""处理接收到的数据并进行计算"""
try:
# 提取参数(如果不存在则使用默认值)
# 提取参数(如果不存在则使用默认值)
# print(f"datadatadata2 {data.get("data")}")
# print(f"datadatadata3 {data.get("data").get("gimbal_yaw", 0)}")
# if data.get("data").get("gimbal_yaw") is None:
gimbal_yaw = data.get("data").get("gimbal_yaw")
# print("1")
gimbal_pitch = data.get("data").get("gimbal_pitch")
# print("2")
gimbal_roll = data.get("data").get("gimbal_roll")
# print("3")
height = data.get("data").get("height")
# print("4")
cam_longitude = data.get("data").get("longitude")
# print("5")
cam_altitude = data.get("data").get("latitude")
# print("6")
# # # 提取参数(根据实际 MQTT 消息结构调整)
# gimbal_yaw = data.get("data", {}).get("gimbal_yaw", 66.40)
# gimbal_pitch = data.get("data", {}).get("gimbal_pitch", -90)
# gimbal_roll = data.get("data", {}).get("gimbal_roll", 0)
# height = data.get("data", {}).get("height", 590.834)
# cam_longtitude = data.get("data", {}).get("cam_longtitude", 103.592606)
# cam_altitude = data.get("data", {}).get("cam_altitude", 30.454648)
# img_width = data.get("data", {}).get("img_width", 4032)
# img_height = data.get("data", {}).get("img_height", 3024)
if gimbal_yaw is not None and gimbal_pitch is not None and gimbal_roll is not None and height is not None and cam_longitude is not None and cam_altitude is not None:
# gimbal_yaw=float(gimbal_yaw)
# gimbal_pitch=float(gimbal_pitch)
# print(f"datadatadata1 {data}")
# # 临时特殊处理gimbal_pitch收到了0的异常值
# if gimbal_pitch <0.01:
# gimbal_pitch=0.01
# gimbal_roll=float(gimbal_roll)
# # height=float(height)-30
# height=float(height)
# cam_longitude=float(cam_longitude)
# cam_altitude=float(cam_altitude)
# 坐标转换计算示例
# 这里是一个简化的示例,实际应用中可能需要更复杂的算法
# result = red_line_reproject(
# gimbal_yaw, gimbal_pitch, gimbal_roll,
# height, longitude, latitude,
# 1440, 1080,
# points
# )
result = red_line_reproject(
gimbal_yaw, gimbal_pitch, gimbal_roll,
height, cam_longitude, cam_altitude,
self.img_width, self.img_height,
# self.points
self.processed_points,
self.camera_para
)
# print("3")
# print(f"result {result}")
# if len(result)<1:
# return None
procc_data=self.processed_data
add_result_to_prcessdata(procc_data, result)
# 构建发送给前端的消息
response = {
"step": 1,
# "param": mess, # 原始参数
"data":data.get("data"),
"result": procc_data, # 计算结果
# "result": result, # 计算结果
"timestamp": time.time()
}
return response
except Exception as e:
return {
"status": "error",
"message": str(e),
"original_data": data
}