ai_project_v1/middleware/entity/up_osd_info_push.py

142 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
import asyncio
from typing import Union, Optional
from typing import Any
@dataclass
class OSDInfo:
attitude_head: float
elevation: float
gimbal_pitch: float
gimbal_roll: float
gimbal_yaw: float
height: float
home_distance: float
horizontal_speed: float
latitude: float
longitude: float
speed_x: float
speed_y: float
speed_z: float
ultrasonic_height: float
vertical_speed: float
wind_direction: float
wind_speed: float
@dataclass
class OSDInfo_v1:
attitude_head: float
latitude: float
longitude: float
height: float
speed_x: float
speed_y: float
speed_z: float
gimbal_pitch: float
gimbal_roll: float
gimbal_yaw: float
@dataclass
class OSDMessage:
data: Any # 可以是两种类型之一
method: str
seq: int
timestamp: int
def parse_osd_message(json_str: Optional[str]) -> Optional[OSDMessage]:
if not json_str:
return None
data = json_str
try:
osd_info = OSDInfo(**data["data"])
data_seq=data["seq"]
except (TypeError, KeyError) as e:
# 如果OSDInfo格式失败尝试使用OSDInfo_v1格式
try:
osd_info = OSDInfo_v1(**data["data"])
data_seq = 0
except Exception as e2:
print(f"Error parsing OSD message with both formats: {e2}")
return None
return OSDMessage(
data=osd_info,
method=data["method"],
seq=data_seq,
timestamp=data["timestamp"]
)
# try:
# data=json_str
# # osd_info = OSDInfo(**data["data"])
# # if osd_info is None:
# # osd_info = OSDInfo_v1(**data["data"]) #适配东西湖区一代飞机的格式
# try:
# osd_info = OSDInfo(**data["data"])
# except (TypeError, KeyError) as e:
# # 如果OSDInfo格式失败尝试使用OSDInfo_v1格式
# try:
# osd_info = OSDInfo_v1(**data["data"])
# except Exception as e2:
# print(f"Error parsing OSD message with both formats: {e2}")
# return None
# return OSDMessage(
# data=osd_info,
# method=data["method"],
# seq=data["seq"],
# timestamp=data["timestamp"]
# )
# except Exception as e:
# print(f"Error parsing OSD message: {e}")
# return None
async def main():
# 这里应该是获取latest_osd_info的实际逻辑
latest_osd_info = '''{
"data": {
"attitude_head": -31.6,
"elevation": 100.5,
"gimbal_pitch": -46.9,
"gimbal_roll": 0,
"gimbal_yaw": -31.507293194051584,
"height": 581.8057861328125,
"home_distance": 0.045968499034643173,
"horizontal_speed": 0,
"latitude": 30.762902409980541,
"longitude": 103.99185861184533,
"speed_x": 0,
"speed_y": 0,
"speed_z": 0,
"ultrasonic_height": -1,
"vertical_speed": -0,
"wind_direction": 5,
"wind_speed": 33
},
"method": "osd_info_push",
"seq": 257050,
"timestamp": 1754910838301
}'''
message = parse_osd_message(latest_osd_info)
if message:
print(f"Latitude: {message.data.latitude}")
else:
print("Failed to parse OSD message")
if __name__ == '__main__':
# 设置Windows兼容事件循环如果需要
try:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except:
pass
asyncio.run(main())