2025-07-10 10:44:35 +08:00

406 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import psycopg2
import json
import numpy as np
import cv2
from pyproj import Transformer
import math
from miniohelp import *
from pylab import *
from dataclasses import dataclass
###################################################################################################################
def rotate_points_around_axis(points, axis, angle):
"""
绕任意轴旋转三维点。
:param points: 点集,形状为 (N, 3)
:param axis: 旋转轴,形状为 (3,)
:param angle: 旋转角度,单位为度
:return: 旋转后的点集,形状为 (N, 3)
"""
axis = axis.squeeze()
axis = axis / np.linalg.norm(axis) # 归一化旋转轴
theta = np.radians(angle) # 转换为弧度
# 使用罗德里格斯公式构造旋转矩阵
K = np.array([[0, -axis[2], axis[1]],
[axis[2], 0, -axis[0]],
[-axis[1], axis[0], 0]])
I = np.eye(3)
R = I + np.sin(theta) * K + (1 - np.cos(theta)) * np.dot(K, K)
# 应用旋转矩阵
rotated_points = np.dot(R.T, points.T)
return rotated_points
def rotate_points(points, center, theta):
"""
在二维平面内绕任意中心旋转点。
:param points: 点集,形状为 (N, 2)
:param center: 旋转中心 (x, y)
:param theta: 旋转角度(弧度)
:return: 旋转后的点集,形状为 (N, 2)
"""
translated = points - center
rotation_matrix = np.array([[np.cos(theta), np.sin(theta)],
[-np.sin(theta), np.cos(theta)]])
rotated = translated @ rotation_matrix.T
return rotated + center
def compute_rotation_matrix(pitch, roll, yaw):
"""
计算三维旋转矩阵。
:param pitch: 绕x轴旋转角弧度
:param roll: 绕y轴旋转角弧度
:param yaw: 绕z轴旋转角弧度
:return: 3x3旋转矩阵 和 4x4齐次旋转矩阵
"""
Rx = np.array([[1, 0, 0],
[0, math.cos(pitch), -math.sin(pitch)],
[0, math.sin(pitch), math.cos(pitch)]], dtype=np.float32)
Ry = np.array([[math.cos(roll), 0, math.sin(roll)],
[0, 1, 0],
[-math.sin(roll), 0, math.cos(roll)]], dtype=np.float32)
Rz = np.array([[math.cos(yaw), -math.sin(yaw), 0],
[math.sin(yaw), math.cos(yaw), 0],
[0, 0, 1]], dtype=np.float32)
R = Rx @ Ry @ Rz
R_homo = np.hstack([R, np.zeros((3, 1))])
R_homo = np.vstack([R_homo, [0, 0, 0, 1]])
return R, R_homo
def create_transformation_matrix(tvec):
"""
构造4x4的平移矩阵。
:param tvec: 平移向量
:return: 平移矩阵
"""
T = np.eye(4)
T[:3, 3] = tvec.flatten()
return T
def dms2dd(x, y, z):
"""
DMS (度/分/秒) 转十进制度。
"""
return x + y / 60 + z / 3600
@dataclass
class ProjectTrans:
def __init__(self, f_mm, sensor_width_mm, sensor_height_mm, image_width_px, image_height_px, k1, k2, k3, k4, k5):
# 相机参数初始化
self.f_mm = f_mm
self.sensor_width_mm = sensor_width_mm
self.sensor_height_mm = sensor_height_mm
self.image_width_px = image_width_px
self.image_height_px = image_height_px
self.Ji = np.array([[k1], [k2], [k3], [k4], [k5]], dtype=np.float32)
self.camera_matrix = None
self.image_points = None
self.tvec = None
self.rvec = None
self.object_point = None
def apply_transformation(self, c2w, transform_matrix):
"""
对三维点应用旋转和平移
:return: 变换后的点 (3, N)
"""
homogeneous_points = np.hstack([self.object_point, np.ones((self.object_point.shape[0], 1))])
transformed = transform_matrix @ homogeneous_points.T
transformed = c2w.T @ transformed
return transformed[:3, :]
def compute_camera_matrix(self):
"""
计算相机内参矩阵。
"""
fx = floor((self.f_mm * self.image_width_px) / self.sensor_width_mm)
fy = floor((self.f_mm * self.image_height_px) / self.sensor_height_mm)
cx = self.image_width_px / 2
cy = self.image_height_px / 2
self.camera_matrix = np.array([
[fx, 0, cx],
[0, fy, cy],
[0, 0, 1]
], dtype=np.float32)
def projectPoints(self, object_point, tvec, rvec):
"""
将三维世界坐标点投影到图像平面。
:param object_point: 三维点 (N, 3)
:param tvec: 平移向量
:param rvec: 旋转角度(三元组,单位为度)
:return: 投影后的图像二维坐标 (N, 2)
"""
self.object_point = object_point
self.tvec = -tvec
self.rvec = np.radians(rvec)
# 步骤 1: 构造旋转矩阵和变换矩阵
c2w, c2w1 = compute_rotation_matrix(np.pi - 0.1 / 180 * np.pi, -1 / 180 * np.pi, 1 / 180 * np.pi)
transform_matrix = create_transformation_matrix(self.tvec)
# 步骤 2: 应用世界坐标变换
self.object_point = self.apply_transformation(c2w1, transform_matrix)
# 步骤 3: 执行两次旋转
axis = c2w.T @ np.array([[0], [0], [1]], dtype=np.float32)
self.object_point = rotate_points_around_axis(self.object_point.T, axis.T, -rvec[1])
self.object_point = rotate_points_around_axis(
self.object_point.T,
np.array((0.9999, -0.00174533, -0.0148)),
rvec[0] + 90
)
self.object_point = self.object_point.T
# 步骤 4: 计算内参矩阵
self.compute_camera_matrix()
# 步骤 5: 使用OpenCV进行投影
rvec_cv = np.zeros((3,), dtype=np.float32)
tvec_cv = np.zeros((3,), dtype=np.float32)
self.image_points, _ = cv2.projectPoints(
self.object_point, rvec_cv, tvec_cv, self.camera_matrix, self.Ji
)
self.image_points = self.image_points.squeeze()
return self.image_points
######################################################### 数据库查询函数:获取 MinIO 中对象路径 #######################################################
def sql_connect(target_id, endpoint, port, dbname, user, password):
"""
根据 target_id 从 PostgreSQL 数据库中的 waylinedetail 表查询 stylefile 字段MinIO 中的对象路径)
"""
conn = None
try:
conn = psycopg2.connect(
host=endpoint,
port=port,
dbname=dbname,
user=user,
password=password,
)
with conn.cursor() as cursor:
cursor.execute("SELECT stylefile FROM waylinedetail WHERE id = %s;", (target_id,))
result = cursor.fetchone()
if result:
print("MinIO 对象路径:", result[0])
return result[0]
except psycopg2.Error as e:
print(f"数据库查询失败: {e}")
finally:
if conn:
conn.close()
################################################################# 坐从 MinIO 获取 JSON 对象内容 ##################################################
def get_minio_json(bucket_name, object_path, yaml_name='config'):
"""
从 MinIO 获取指定对象并解析为 JSON
"""
try:
client =load_config(yaml_name)
response = client.get_object(bucket_name, object_path)
raw_data = response.read()
try:
json_data = json.loads(raw_data)
return json_data
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
except UnicodeDecodeError as e:
print(f"编码错误: {e}")
except Exception as e:
print(f"MinIO 操作失败: {e}")
finally:
if 'response' in locals():
response.close()
response.release_conn()
##############################################坐标转换WGS84 → CGCS2000EPSG:4326 → EPSG:4544###################################################
def transform_coordinate(points):
"""
经纬度坐标转为投影坐标CGCS2000
输入: points = [lat, lon, height]
返回: 转换后的点 [x, y, height]
"""
transformer = Transformer.from_crs("epsg:4326", "epsg:4544")
points[1], points[0] = transformer.transform(points[0], points[1])
return points
########################################################## 工具函数 ###############################################
def split_by_custom_sizes(lst, sizes):
"""按照 sizes 中的每一项将 lst 切片成多段子列表"""
result = []
index = 0
for size in sizes:
if index + size > len(lst):
break
result.append(lst[index : index + size])
index += size
return result
def flatten(nested_list):
"""将嵌套列表打平成一维列表"""
flattened = []
for item in nested_list:
if isinstance(item, list):
flattened.extend(item)
else:
flattened.append(item)
return flattened
#############################################################解析 JSON 中的几何数据(点、线、多边形、红线)############################################
def get_json_data(data):
"""
提取 JSON 中所有几何对象坐标,并返回为 NumPy 数组和各对象长度
返回:
- 所有坐标点np.array
- 各类对象的坐标数列表Point, LineString, Polygon, hongxian
"""
points_list, LineString_list, Polygon_list, hongxian_list = [], [], [], []
length, length_LineString, length_Polygon, length_hongxian = [], [], [], []
# Point
if len(data['Point']) > 0:
length.append(len(data['Point']))
for pt in data['Point']:
points_list.append(pt['coordinates'])
else:
length.append(0)
# LineString
if len(data['LineString']) > 0:
for line in data['LineString']:
LineString_list.append(line['coordinates'])
length_LineString.append(len(line['coordinates']))
length.append(length_LineString)
else:
length.append(0)
LineString_list = flatten(LineString_list)
# Polygon
if len(data['Polygon']) > 0:
for poly in data['Polygon']:
Polygon_list.append(poly['coordinates'])
length_Polygon.append(len(poly['coordinates']))
length.append(length_Polygon)
else:
length.append(0)
Polygon_list = flatten(Polygon_list)
# 红线
if len(data['hongxian']) > 0:
for h in data['hongxian']:
hongxian_list.append(h['coordinates'])
length_hongxian.append(len(h['coordinates']))
length.append(length_hongxian)
else:
length.append(0)
hongxian_list = flatten(hongxian_list)
all_points_list = points_list + LineString_list + Polygon_list + hongxian_list
all_points_numpy = np.array(all_points_list)
return all_points_numpy, length
########################################################主处理函数:投影 → 像素坐标#################################################
def send_data(data_json):
"""
将三维地理坐标投影到图像像素坐标,并写入回原 JSON 数据结构中
"""
# 初始化投影转换器(假设用于像素投影)
zhuanhuan11 = ProjectTrans(
f_mm=12.29,
sensor_width_mm=17.4,
sensor_height_mm=13.0,
image_width_px=8000,
image_height_px=6000,
k1=3956,
k2=-0.11257524,
k3=0.01487443,
k4=-0.027064,
k5=-0.00008572
)
bucket_name = data_json['bucket_name']
sql_yaml_path = data_json['sql_yaml_path']
target_id = data_json['target_id']
# 读取 SQL 配置
sql_config =read_sql_config(sql_yaml_path)
endpoint = sql_config.get('endpoint')
port = sql_config.get('port')
dbname = sql_config.get('dbname')
user = sql_config.get('user')
password = sql_config.get('password')
# 获取路径和数据
object_path = sql_connect(target_id,endpoint,port,dbname,user,password)
data = get_minio_json(bucket_name, object_path)
# 获取所有地理点坐标
points, length = get_json_data(data)
# 调整顺序为 [经度, 纬度, 高度]
points[:, [1, 0, 2]] = points[:, [0, 1, 2]]
points[:, [0, 1]] = points[:, [1, 0]]
# 相机姿态和位置参数
rvec = np.array([
data_json['gimbal_pitch'],
data_json['gimbal_yaw'],
data_json['gimbal_roll']
], dtype=np.float32)
tvec = np.array([
data_json['latitude'],
data_json['longitude'],
data_json['height']
], dtype=np.float32)
# 经纬度 → 投影坐标
tvec = transform_coordinate(tvec)
# 投影点坐标 → 图像像素坐标
pixels_point = zhuanhuan11.projectPoints(points, tvec, rvec).tolist()
# 根据原始结构还原嵌套像素坐标结构
length_flattened = flatten(length)
pixels_point = split_by_custom_sizes(pixels_point, length_flattened)
# 插入像素点到原始结构
pixel_idx = 0
if len(data['Point']) > 0:
for i in range(len(data['Point'])):
data['Point'][i]['pixels_point'] = pixels_point[pixel_idx][i]
pixel_idx += 1
for i in range(len(data['LineString'])):
data['LineString'][i]['pixels_point'] = pixels_point[pixel_idx]
pixel_idx += 1
for i in range(len(data['Polygon'])):
data['Polygon'][i]['pixels_point'] = pixels_point[pixel_idx]
pixel_idx += 1
for i in range(len(data['hongxian'])):
data['hongxian'][i]['pixels_point'] = pixels_point[pixel_idx]
pixel_idx += 1
return data
if __name__ == '__main__':
data_json = {
"bucket_name": "300bdf2b-a150-406e-be63-d28bd29b409f",
"sql_yaml_path": "bdzl",
"target_id": 1,
"gimbal_pitch": 10, # 单位度
"gimbal_yaw": 20,
"gimbal_roll": 5,
"latitude": 30.5, #经度
"longitude": 114.3, #纬度
"height": 100
}
result = send_data(data_json)
print(json.dumps(result, indent=2, ensure_ascii=False))