406 lines
14 KiB
Python
406 lines
14 KiB
Python
import psycopg2
|
||
import json
|
||
import numpy as np
|
||
import cv2
|
||
from pyproj import Transformer
|
||
import math
|
||
from miniohelp import *
|
||
from pylab import *
|
||
from dataclasses import dataclass
|
||
|
||
###################################################################################################################
|
||
|
||
def rotate_points_around_axis(points, axis, angle):
|
||
"""
|
||
绕任意轴旋转三维点。
|
||
:param points: 点集,形状为 (N, 3)
|
||
:param axis: 旋转轴,形状为 (3,)
|
||
:param angle: 旋转角度,单位为度
|
||
:return: 旋转后的点集,形状为 (N, 3)
|
||
"""
|
||
axis = axis.squeeze()
|
||
axis = axis / np.linalg.norm(axis) # 归一化旋转轴
|
||
theta = np.radians(angle) # 转换为弧度
|
||
|
||
# 使用罗德里格斯公式构造旋转矩阵
|
||
K = np.array([[0, -axis[2], axis[1]],
|
||
[axis[2], 0, -axis[0]],
|
||
[-axis[1], axis[0], 0]])
|
||
I = np.eye(3)
|
||
R = I + np.sin(theta) * K + (1 - np.cos(theta)) * np.dot(K, K)
|
||
|
||
# 应用旋转矩阵
|
||
rotated_points = np.dot(R.T, points.T)
|
||
return rotated_points
|
||
|
||
|
||
def rotate_points(points, center, theta):
|
||
"""
|
||
在二维平面内绕任意中心旋转点。
|
||
:param points: 点集,形状为 (N, 2)
|
||
:param center: 旋转中心 (x, y)
|
||
:param theta: 旋转角度(弧度)
|
||
:return: 旋转后的点集,形状为 (N, 2)
|
||
"""
|
||
translated = points - center
|
||
rotation_matrix = np.array([[np.cos(theta), np.sin(theta)],
|
||
[-np.sin(theta), np.cos(theta)]])
|
||
rotated = translated @ rotation_matrix.T
|
||
return rotated + center
|
||
|
||
|
||
def compute_rotation_matrix(pitch, roll, yaw):
|
||
"""
|
||
计算三维旋转矩阵。
|
||
:param pitch: 绕x轴旋转角(弧度)
|
||
:param roll: 绕y轴旋转角(弧度)
|
||
:param yaw: 绕z轴旋转角(弧度)
|
||
:return: 3x3旋转矩阵 和 4x4齐次旋转矩阵
|
||
"""
|
||
Rx = np.array([[1, 0, 0],
|
||
[0, math.cos(pitch), -math.sin(pitch)],
|
||
[0, math.sin(pitch), math.cos(pitch)]], dtype=np.float32)
|
||
|
||
Ry = np.array([[math.cos(roll), 0, math.sin(roll)],
|
||
[0, 1, 0],
|
||
[-math.sin(roll), 0, math.cos(roll)]], dtype=np.float32)
|
||
|
||
Rz = np.array([[math.cos(yaw), -math.sin(yaw), 0],
|
||
[math.sin(yaw), math.cos(yaw), 0],
|
||
[0, 0, 1]], dtype=np.float32)
|
||
|
||
R = Rx @ Ry @ Rz
|
||
R_homo = np.hstack([R, np.zeros((3, 1))])
|
||
R_homo = np.vstack([R_homo, [0, 0, 0, 1]])
|
||
return R, R_homo
|
||
|
||
|
||
def create_transformation_matrix(tvec):
|
||
"""
|
||
构造4x4的平移矩阵。
|
||
:param tvec: 平移向量
|
||
:return: 平移矩阵
|
||
"""
|
||
T = np.eye(4)
|
||
T[:3, 3] = tvec.flatten()
|
||
return T
|
||
|
||
|
||
def dms2dd(x, y, z):
|
||
"""
|
||
DMS (度/分/秒) 转十进制度。
|
||
"""
|
||
return x + y / 60 + z / 3600
|
||
|
||
|
||
@dataclass
|
||
class ProjectTrans:
|
||
def __init__(self, f_mm, sensor_width_mm, sensor_height_mm, image_width_px, image_height_px, k1, k2, k3, k4, k5):
|
||
# 相机参数初始化
|
||
self.f_mm = f_mm
|
||
self.sensor_width_mm = sensor_width_mm
|
||
self.sensor_height_mm = sensor_height_mm
|
||
self.image_width_px = image_width_px
|
||
self.image_height_px = image_height_px
|
||
self.Ji = np.array([[k1], [k2], [k3], [k4], [k5]], dtype=np.float32)
|
||
self.camera_matrix = None
|
||
self.image_points = None
|
||
self.tvec = None
|
||
self.rvec = None
|
||
self.object_point = None
|
||
|
||
def apply_transformation(self, c2w, transform_matrix):
|
||
"""
|
||
对三维点应用旋转和平移
|
||
:return: 变换后的点 (3, N)
|
||
"""
|
||
homogeneous_points = np.hstack([self.object_point, np.ones((self.object_point.shape[0], 1))])
|
||
transformed = transform_matrix @ homogeneous_points.T
|
||
transformed = c2w.T @ transformed
|
||
return transformed[:3, :]
|
||
|
||
def compute_camera_matrix(self):
|
||
"""
|
||
计算相机内参矩阵。
|
||
"""
|
||
fx = floor((self.f_mm * self.image_width_px) / self.sensor_width_mm)
|
||
fy = floor((self.f_mm * self.image_height_px) / self.sensor_height_mm)
|
||
cx = self.image_width_px / 2
|
||
cy = self.image_height_px / 2
|
||
|
||
self.camera_matrix = np.array([
|
||
[fx, 0, cx],
|
||
[0, fy, cy],
|
||
[0, 0, 1]
|
||
], dtype=np.float32)
|
||
|
||
def projectPoints(self, object_point, tvec, rvec):
|
||
"""
|
||
将三维世界坐标点投影到图像平面。
|
||
:param object_point: 三维点 (N, 3)
|
||
:param tvec: 平移向量
|
||
:param rvec: 旋转角度(三元组,单位为度)
|
||
:return: 投影后的图像二维坐标 (N, 2)
|
||
"""
|
||
self.object_point = object_point
|
||
self.tvec = -tvec
|
||
self.rvec = np.radians(rvec)
|
||
|
||
# 步骤 1: 构造旋转矩阵和变换矩阵
|
||
c2w, c2w1 = compute_rotation_matrix(np.pi - 0.1 / 180 * np.pi, -1 / 180 * np.pi, 1 / 180 * np.pi)
|
||
transform_matrix = create_transformation_matrix(self.tvec)
|
||
|
||
# 步骤 2: 应用世界坐标变换
|
||
self.object_point = self.apply_transformation(c2w1, transform_matrix)
|
||
|
||
# 步骤 3: 执行两次旋转
|
||
axis = c2w.T @ np.array([[0], [0], [1]], dtype=np.float32)
|
||
self.object_point = rotate_points_around_axis(self.object_point.T, axis.T, -rvec[1])
|
||
self.object_point = rotate_points_around_axis(
|
||
self.object_point.T,
|
||
np.array((0.9999, -0.00174533, -0.0148)),
|
||
rvec[0] + 90
|
||
)
|
||
self.object_point = self.object_point.T
|
||
|
||
# 步骤 4: 计算内参矩阵
|
||
self.compute_camera_matrix()
|
||
|
||
# 步骤 5: 使用OpenCV进行投影
|
||
rvec_cv = np.zeros((3,), dtype=np.float32)
|
||
tvec_cv = np.zeros((3,), dtype=np.float32)
|
||
self.image_points, _ = cv2.projectPoints(
|
||
self.object_point, rvec_cv, tvec_cv, self.camera_matrix, self.Ji
|
||
)
|
||
self.image_points = self.image_points.squeeze()
|
||
return self.image_points
|
||
######################################################### 数据库查询函数:获取 MinIO 中对象路径 #######################################################
|
||
def sql_connect(target_id, endpoint, port, dbname, user, password):
|
||
"""
|
||
根据 target_id 从 PostgreSQL 数据库中的 waylinedetail 表查询 stylefile 字段(MinIO 中的对象路径)
|
||
"""
|
||
conn = None
|
||
try:
|
||
conn = psycopg2.connect(
|
||
host=endpoint,
|
||
port=port,
|
||
dbname=dbname,
|
||
user=user,
|
||
password=password,
|
||
)
|
||
with conn.cursor() as cursor:
|
||
cursor.execute("SELECT stylefile FROM waylinedetail WHERE id = %s;", (target_id,))
|
||
result = cursor.fetchone()
|
||
if result:
|
||
print("MinIO 对象路径:", result[0])
|
||
return result[0]
|
||
except psycopg2.Error as e:
|
||
print(f"数据库查询失败: {e}")
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
################################################################# 坐从 MinIO 获取 JSON 对象内容 ##################################################
|
||
def get_minio_json(bucket_name, object_path, yaml_name='config'):
|
||
"""
|
||
从 MinIO 获取指定对象并解析为 JSON
|
||
"""
|
||
try:
|
||
client =load_config(yaml_name)
|
||
response = client.get_object(bucket_name, object_path)
|
||
raw_data = response.read()
|
||
try:
|
||
json_data = json.loads(raw_data)
|
||
return json_data
|
||
except json.JSONDecodeError as e:
|
||
print(f"JSON 解析失败: {e}")
|
||
except UnicodeDecodeError as e:
|
||
print(f"编码错误: {e}")
|
||
except Exception as e:
|
||
print(f"MinIO 操作失败: {e}")
|
||
finally:
|
||
if 'response' in locals():
|
||
response.close()
|
||
response.release_conn()
|
||
##############################################坐标转换:WGS84 → CGCS2000(EPSG:4326 → EPSG:4544)###################################################
|
||
def transform_coordinate(points):
|
||
"""
|
||
经纬度坐标转为投影坐标(CGCS2000)
|
||
输入: points = [lat, lon, height]
|
||
返回: 转换后的点 [x, y, height]
|
||
"""
|
||
transformer = Transformer.from_crs("epsg:4326", "epsg:4544")
|
||
points[1], points[0] = transformer.transform(points[0], points[1])
|
||
return points
|
||
########################################################## 工具函数 ###############################################
|
||
def split_by_custom_sizes(lst, sizes):
|
||
"""按照 sizes 中的每一项将 lst 切片成多段子列表"""
|
||
result = []
|
||
index = 0
|
||
for size in sizes:
|
||
if index + size > len(lst):
|
||
break
|
||
result.append(lst[index : index + size])
|
||
index += size
|
||
return result
|
||
|
||
def flatten(nested_list):
|
||
"""将嵌套列表打平成一维列表"""
|
||
flattened = []
|
||
for item in nested_list:
|
||
if isinstance(item, list):
|
||
flattened.extend(item)
|
||
else:
|
||
flattened.append(item)
|
||
return flattened
|
||
|
||
#############################################################解析 JSON 中的几何数据(点、线、多边形、红线)############################################
|
||
def get_json_data(data):
|
||
"""
|
||
提取 JSON 中所有几何对象坐标,并返回为 NumPy 数组和各对象长度
|
||
返回:
|
||
- 所有坐标点(np.array)
|
||
- 各类对象的坐标数列表(Point, LineString, Polygon, hongxian)
|
||
"""
|
||
points_list, LineString_list, Polygon_list, hongxian_list = [], [], [], []
|
||
length, length_LineString, length_Polygon, length_hongxian = [], [], [], []
|
||
|
||
# Point
|
||
if len(data['Point']) > 0:
|
||
length.append(len(data['Point']))
|
||
for pt in data['Point']:
|
||
points_list.append(pt['coordinates'])
|
||
else:
|
||
length.append(0)
|
||
|
||
# LineString
|
||
if len(data['LineString']) > 0:
|
||
for line in data['LineString']:
|
||
LineString_list.append(line['coordinates'])
|
||
length_LineString.append(len(line['coordinates']))
|
||
length.append(length_LineString)
|
||
else:
|
||
length.append(0)
|
||
LineString_list = flatten(LineString_list)
|
||
|
||
# Polygon
|
||
if len(data['Polygon']) > 0:
|
||
for poly in data['Polygon']:
|
||
Polygon_list.append(poly['coordinates'])
|
||
length_Polygon.append(len(poly['coordinates']))
|
||
length.append(length_Polygon)
|
||
else:
|
||
length.append(0)
|
||
Polygon_list = flatten(Polygon_list)
|
||
|
||
# 红线
|
||
if len(data['hongxian']) > 0:
|
||
for h in data['hongxian']:
|
||
hongxian_list.append(h['coordinates'])
|
||
length_hongxian.append(len(h['coordinates']))
|
||
length.append(length_hongxian)
|
||
else:
|
||
length.append(0)
|
||
hongxian_list = flatten(hongxian_list)
|
||
|
||
all_points_list = points_list + LineString_list + Polygon_list + hongxian_list
|
||
all_points_numpy = np.array(all_points_list)
|
||
return all_points_numpy, length
|
||
|
||
########################################################主处理函数:投影 → 像素坐标#################################################
|
||
def send_data(data_json):
|
||
"""
|
||
将三维地理坐标投影到图像像素坐标,并写入回原 JSON 数据结构中
|
||
"""
|
||
# 初始化投影转换器(假设用于像素投影)
|
||
zhuanhuan11 = ProjectTrans(
|
||
f_mm=12.29,
|
||
sensor_width_mm=17.4,
|
||
sensor_height_mm=13.0,
|
||
image_width_px=8000,
|
||
image_height_px=6000,
|
||
k1=3956,
|
||
k2=-0.11257524,
|
||
k3=0.01487443,
|
||
k4=-0.027064,
|
||
k5=-0.00008572
|
||
)
|
||
bucket_name = data_json['bucket_name']
|
||
sql_yaml_path = data_json['sql_yaml_path']
|
||
target_id = data_json['target_id']
|
||
|
||
# 读取 SQL 配置
|
||
sql_config =read_sql_config(sql_yaml_path)
|
||
endpoint = sql_config.get('endpoint')
|
||
port = sql_config.get('port')
|
||
dbname = sql_config.get('dbname')
|
||
user = sql_config.get('user')
|
||
password = sql_config.get('password')
|
||
|
||
# 获取路径和数据
|
||
object_path = sql_connect(target_id,endpoint,port,dbname,user,password)
|
||
|
||
data = get_minio_json(bucket_name, object_path)
|
||
|
||
# 获取所有地理点坐标
|
||
points, length = get_json_data(data)
|
||
|
||
# 调整顺序为 [经度, 纬度, 高度]
|
||
points[:, [1, 0, 2]] = points[:, [0, 1, 2]]
|
||
points[:, [0, 1]] = points[:, [1, 0]]
|
||
|
||
# 相机姿态和位置参数
|
||
rvec = np.array([
|
||
data_json['gimbal_pitch'],
|
||
data_json['gimbal_yaw'],
|
||
data_json['gimbal_roll']
|
||
], dtype=np.float32)
|
||
|
||
tvec = np.array([
|
||
data_json['latitude'],
|
||
data_json['longitude'],
|
||
data_json['height']
|
||
], dtype=np.float32)
|
||
|
||
# 经纬度 → 投影坐标
|
||
tvec = transform_coordinate(tvec)
|
||
|
||
# 投影点坐标 → 图像像素坐标
|
||
pixels_point = zhuanhuan11.projectPoints(points, tvec, rvec).tolist()
|
||
|
||
# 根据原始结构还原嵌套像素坐标结构
|
||
length_flattened = flatten(length)
|
||
pixels_point = split_by_custom_sizes(pixels_point, length_flattened)
|
||
|
||
# 插入像素点到原始结构
|
||
pixel_idx = 0
|
||
if len(data['Point']) > 0:
|
||
for i in range(len(data['Point'])):
|
||
data['Point'][i]['pixels_point'] = pixels_point[pixel_idx][i]
|
||
pixel_idx += 1
|
||
|
||
for i in range(len(data['LineString'])):
|
||
data['LineString'][i]['pixels_point'] = pixels_point[pixel_idx]
|
||
pixel_idx += 1
|
||
for i in range(len(data['Polygon'])):
|
||
data['Polygon'][i]['pixels_point'] = pixels_point[pixel_idx]
|
||
pixel_idx += 1
|
||
for i in range(len(data['hongxian'])):
|
||
data['hongxian'][i]['pixels_point'] = pixels_point[pixel_idx]
|
||
pixel_idx += 1
|
||
|
||
return data
|
||
|
||
if __name__ == '__main__':
|
||
data_json = {
|
||
"bucket_name": "300bdf2b-a150-406e-be63-d28bd29b409f",
|
||
"sql_yaml_path": "bdzl",
|
||
"target_id": 1,
|
||
"gimbal_pitch": 10, # 单位度
|
||
"gimbal_yaw": 20,
|
||
"gimbal_roll": 5,
|
||
"latitude": 30.5, #经度
|
||
"longitude": 114.3, #纬度
|
||
"height": 100
|
||
}
|
||
result = send_data(data_json)
|
||
print(json.dumps(result, indent=2, ensure_ascii=False)) |