ai_project_v1/touying/getmq_sendresult.py

124 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
import time
from typing import Dict, List, Any
from middleware.entity.camera_para import read_camera_params
from touying.ImageReproject_python.cal_func import red_line_reproject, read_txt
from touying.read_hongxian_file import process_geojson_data, convert_processed_to_points, \
add_result_to_prcessdata
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 自定义计算类 - 处理接收到的MQTT消息
class CalTouYing:
def __init__(self,local_file_path, camera_file_path,img_width=1920, img_height=1080):
self.img_width = img_width
self.img_height = img_height
# self.points = read_txt(r"D:\project\test-touying\ImageReproject_python\points.txt")
self.processed_data=process_geojson_data(local_file_path) #将json文本读取到内存
self.camera_para=read_camera_params(camera_file_path)
self.processed_points=convert_processed_to_points(self.processed_data) #做格式转为Point的集合
def process(self, data: Dict) -> None | dict[str, int | float | Any] | dict[str, str | dict]:
"""处理接收到的数据并进行计算"""
try:
# 提取参数(如果不存在则使用默认值)
# 提取参数(如果不存在则使用默认值)
# print(f"datadatadata2 {data.get("data")}")
# print(f"datadatadata3 {data.get("data").get("gimbal_yaw", 0)}")
# if data.get("data").get("gimbal_yaw") is None:
gimbal_yaw = data.get("data").get("gimbal_yaw")
# print("1")
gimbal_pitch = data.get("data").get("gimbal_pitch")
# print("2")
gimbal_roll = data.get("data").get("gimbal_roll")
# print("3")
height = data.get("data").get("height")
# print("4")
cam_longitude = data.get("data").get("longitude")
# print("5")
cam_altitude = data.get("data").get("latitude")
# print("6")
# # # 提取参数(根据实际 MQTT 消息结构调整)
# gimbal_yaw = data.get("data", {}).get("gimbal_yaw", 66.40)
# gimbal_pitch = data.get("data", {}).get("gimbal_pitch", -90)
# gimbal_roll = data.get("data", {}).get("gimbal_roll", 0)
# height = data.get("data", {}).get("height", 590.834)
# cam_longtitude = data.get("data", {}).get("cam_longtitude", 103.592606)
# cam_altitude = data.get("data", {}).get("cam_altitude", 30.454648)
# img_width = data.get("data", {}).get("img_width", 4032)
# img_height = data.get("data", {}).get("img_height", 3024)
if gimbal_yaw is not None and gimbal_pitch is not None and gimbal_roll is not None and height is not None and cam_longitude is not None and cam_altitude is not None:
# gimbal_yaw=float(gimbal_yaw)
# gimbal_pitch=float(gimbal_pitch)
# print(f"datadatadata1 {data}")
# # 临时特殊处理gimbal_pitch收到了0的异常值
# if gimbal_pitch <0.01:
# gimbal_pitch=0.01
# gimbal_roll=float(gimbal_roll)
# # height=float(height)-30
# height=float(height)
# cam_longitude=float(cam_longitude)
# cam_altitude=float(cam_altitude)
# 坐标转换计算示例
# 这里是一个简化的示例,实际应用中可能需要更复杂的算法
# result = red_line_reproject(
# gimbal_yaw, gimbal_pitch, gimbal_roll,
# height, longitude, latitude,
# 1440, 1080,
# points
# )
result = red_line_reproject(
gimbal_yaw, gimbal_pitch, gimbal_roll,
height, cam_longitude, cam_altitude,
self.img_width, self.img_height,
# self.points
self.processed_points,
self.camera_para
)
# print("3")
# print(f"result {result}")
# if len(result)<1:
# return None
procc_data=self.processed_data
add_result_to_prcessdata(procc_data, result)
# 构建发送给前端的消息
response = {
"step": 1,
# "param": mess, # 原始参数
"data":data.get("data"),
"result": procc_data, # 计算结果
# "result": result, # 计算结果
"timestamp": time.time()
}
return response
except Exception as e:
return {
"status": "error",
"message": str(e),
"original_data": data
}