2025-07-10 09:41:26 +08:00
|
|
|
|
import psycopg2
|
|
|
|
|
import json
|
|
|
|
|
import numpy as np
|
|
|
|
|
import cv2
|
|
|
|
|
from pyproj import Transformer
|
|
|
|
|
import math
|
|
|
|
|
from miniohelp import *
|
|
|
|
|
from pylab import *
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
|
|
|
|
###################################################################################################################
|
|
|
|
|
|
|
|
|
|
def rotate_points_around_axis(points, axis, angle):
|
|
|
|
|
"""
|
|
|
|
|
绕任意轴旋转三维点。
|
|
|
|
|
:param points: 点集,形状为 (N, 3)
|
|
|
|
|
:param axis: 旋转轴,形状为 (3,)
|
|
|
|
|
:param angle: 旋转角度,单位为度
|
|
|
|
|
:return: 旋转后的点集,形状为 (N, 3)
|
|
|
|
|
"""
|
|
|
|
|
axis = axis.squeeze()
|
|
|
|
|
axis = axis / np.linalg.norm(axis) # 归一化旋转轴
|
|
|
|
|
theta = np.radians(angle) # 转换为弧度
|
|
|
|
|
|
|
|
|
|
# 使用罗德里格斯公式构造旋转矩阵
|
|
|
|
|
K = np.array([[0, -axis[2], axis[1]],
|
|
|
|
|
[axis[2], 0, -axis[0]],
|
|
|
|
|
[-axis[1], axis[0], 0]])
|
|
|
|
|
I = np.eye(3)
|
|
|
|
|
R = I + np.sin(theta) * K + (1 - np.cos(theta)) * np.dot(K, K)
|
|
|
|
|
|
|
|
|
|
# 应用旋转矩阵
|
|
|
|
|
rotated_points = np.dot(R.T, points.T)
|
|
|
|
|
return rotated_points
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rotate_points(points, center, theta):
|
|
|
|
|
"""
|
|
|
|
|
在二维平面内绕任意中心旋转点。
|
|
|
|
|
:param points: 点集,形状为 (N, 2)
|
|
|
|
|
:param center: 旋转中心 (x, y)
|
|
|
|
|
:param theta: 旋转角度(弧度)
|
|
|
|
|
:return: 旋转后的点集,形状为 (N, 2)
|
|
|
|
|
"""
|
|
|
|
|
translated = points - center
|
|
|
|
|
rotation_matrix = np.array([[np.cos(theta), np.sin(theta)],
|
|
|
|
|
[-np.sin(theta), np.cos(theta)]])
|
|
|
|
|
rotated = translated @ rotation_matrix.T
|
|
|
|
|
return rotated + center
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_rotation_matrix(pitch, roll, yaw):
|
|
|
|
|
"""
|
|
|
|
|
计算三维旋转矩阵。
|
|
|
|
|
:param pitch: 绕x轴旋转角(弧度)
|
|
|
|
|
:param roll: 绕y轴旋转角(弧度)
|
|
|
|
|
:param yaw: 绕z轴旋转角(弧度)
|
|
|
|
|
:return: 3x3旋转矩阵 和 4x4齐次旋转矩阵
|
|
|
|
|
"""
|
|
|
|
|
Rx = np.array([[1, 0, 0],
|
|
|
|
|
[0, math.cos(pitch), -math.sin(pitch)],
|
|
|
|
|
[0, math.sin(pitch), math.cos(pitch)]], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
Ry = np.array([[math.cos(roll), 0, math.sin(roll)],
|
|
|
|
|
[0, 1, 0],
|
|
|
|
|
[-math.sin(roll), 0, math.cos(roll)]], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
Rz = np.array([[math.cos(yaw), -math.sin(yaw), 0],
|
|
|
|
|
[math.sin(yaw), math.cos(yaw), 0],
|
|
|
|
|
[0, 0, 1]], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
R = Rx @ Ry @ Rz
|
|
|
|
|
R_homo = np.hstack([R, np.zeros((3, 1))])
|
|
|
|
|
R_homo = np.vstack([R_homo, [0, 0, 0, 1]])
|
|
|
|
|
return R, R_homo
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_transformation_matrix(tvec):
|
|
|
|
|
"""
|
|
|
|
|
构造4x4的平移矩阵。
|
|
|
|
|
:param tvec: 平移向量
|
|
|
|
|
:return: 平移矩阵
|
|
|
|
|
"""
|
|
|
|
|
T = np.eye(4)
|
|
|
|
|
T[:3, 3] = tvec.flatten()
|
|
|
|
|
return T
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dms2dd(x, y, z):
|
|
|
|
|
"""
|
|
|
|
|
DMS (度/分/秒) 转十进制度。
|
|
|
|
|
"""
|
|
|
|
|
return x + y / 60 + z / 3600
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
class ProjectTrans:
|
|
|
|
|
def __init__(self, f_mm, sensor_width_mm, sensor_height_mm, image_width_px, image_height_px, k1, k2, k3, k4, k5):
|
|
|
|
|
# 相机参数初始化
|
|
|
|
|
self.f_mm = f_mm
|
|
|
|
|
self.sensor_width_mm = sensor_width_mm
|
|
|
|
|
self.sensor_height_mm = sensor_height_mm
|
|
|
|
|
self.image_width_px = image_width_px
|
|
|
|
|
self.image_height_px = image_height_px
|
|
|
|
|
self.Ji = np.array([[k1], [k2], [k3], [k4], [k5]], dtype=np.float32)
|
|
|
|
|
self.camera_matrix = None
|
|
|
|
|
self.image_points = None
|
|
|
|
|
self.tvec = None
|
|
|
|
|
self.rvec = None
|
|
|
|
|
self.object_point = None
|
|
|
|
|
|
|
|
|
|
def apply_transformation(self, c2w, transform_matrix):
|
|
|
|
|
"""
|
|
|
|
|
对三维点应用旋转和平移
|
|
|
|
|
:return: 变换后的点 (3, N)
|
|
|
|
|
"""
|
|
|
|
|
homogeneous_points = np.hstack([self.object_point, np.ones((self.object_point.shape[0], 1))])
|
|
|
|
|
transformed = transform_matrix @ homogeneous_points.T
|
|
|
|
|
transformed = c2w.T @ transformed
|
|
|
|
|
return transformed[:3, :]
|
|
|
|
|
|
|
|
|
|
def compute_camera_matrix(self):
|
|
|
|
|
"""
|
|
|
|
|
计算相机内参矩阵。
|
|
|
|
|
"""
|
|
|
|
|
fx = floor((self.f_mm * self.image_width_px) / self.sensor_width_mm)
|
|
|
|
|
fy = floor((self.f_mm * self.image_height_px) / self.sensor_height_mm)
|
|
|
|
|
cx = self.image_width_px / 2
|
|
|
|
|
cy = self.image_height_px / 2
|
|
|
|
|
|
|
|
|
|
self.camera_matrix = np.array([
|
|
|
|
|
[fx, 0, cx],
|
|
|
|
|
[0, fy, cy],
|
|
|
|
|
[0, 0, 1]
|
|
|
|
|
], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
def projectPoints(self, object_point, tvec, rvec):
|
|
|
|
|
"""
|
|
|
|
|
将三维世界坐标点投影到图像平面。
|
|
|
|
|
:param object_point: 三维点 (N, 3)
|
|
|
|
|
:param tvec: 平移向量
|
|
|
|
|
:param rvec: 旋转角度(三元组,单位为度)
|
|
|
|
|
:return: 投影后的图像二维坐标 (N, 2)
|
|
|
|
|
"""
|
|
|
|
|
self.object_point = object_point
|
|
|
|
|
self.tvec = -tvec
|
|
|
|
|
self.rvec = np.radians(rvec)
|
|
|
|
|
|
|
|
|
|
# 步骤 1: 构造旋转矩阵和变换矩阵
|
|
|
|
|
c2w, c2w1 = compute_rotation_matrix(np.pi - 0.1 / 180 * np.pi, -1 / 180 * np.pi, 1 / 180 * np.pi)
|
|
|
|
|
transform_matrix = create_transformation_matrix(self.tvec)
|
|
|
|
|
|
|
|
|
|
# 步骤 2: 应用世界坐标变换
|
|
|
|
|
self.object_point = self.apply_transformation(c2w1, transform_matrix)
|
|
|
|
|
|
|
|
|
|
# 步骤 3: 执行两次旋转
|
|
|
|
|
axis = c2w.T @ np.array([[0], [0], [1]], dtype=np.float32)
|
|
|
|
|
self.object_point = rotate_points_around_axis(self.object_point.T, axis.T, -rvec[1])
|
|
|
|
|
self.object_point = rotate_points_around_axis(
|
|
|
|
|
self.object_point.T,
|
|
|
|
|
np.array((0.9999, -0.00174533, -0.0148)),
|
|
|
|
|
rvec[0] + 90
|
|
|
|
|
)
|
|
|
|
|
self.object_point = self.object_point.T
|
|
|
|
|
|
|
|
|
|
# 步骤 4: 计算内参矩阵
|
|
|
|
|
self.compute_camera_matrix()
|
|
|
|
|
|
|
|
|
|
# 步骤 5: 使用OpenCV进行投影
|
|
|
|
|
rvec_cv = np.zeros((3,), dtype=np.float32)
|
|
|
|
|
tvec_cv = np.zeros((3,), dtype=np.float32)
|
|
|
|
|
self.image_points, _ = cv2.projectPoints(
|
|
|
|
|
self.object_point, rvec_cv, tvec_cv, self.camera_matrix, self.Ji
|
|
|
|
|
)
|
|
|
|
|
self.image_points = self.image_points.squeeze()
|
|
|
|
|
return self.image_points
|
|
|
|
|
######################################################### 数据库查询函数:获取 MinIO 中对象路径 #######################################################
|
|
|
|
|
def sql_connect(target_id, endpoint, port, dbname, user, password):
|
|
|
|
|
"""
|
|
|
|
|
根据 target_id 从 PostgreSQL 数据库中的 waylinedetail 表查询 stylefile 字段(MinIO 中的对象路径)
|
|
|
|
|
"""
|
|
|
|
|
conn = None
|
|
|
|
|
try:
|
|
|
|
|
conn = psycopg2.connect(
|
|
|
|
|
host=endpoint,
|
|
|
|
|
port=port,
|
|
|
|
|
dbname=dbname,
|
|
|
|
|
user=user,
|
|
|
|
|
password=password,
|
|
|
|
|
)
|
|
|
|
|
with conn.cursor() as cursor:
|
|
|
|
|
cursor.execute("SELECT stylefile FROM waylinedetail WHERE id = %s;", (target_id,))
|
|
|
|
|
result = cursor.fetchone()
|
|
|
|
|
if result:
|
|
|
|
|
print("MinIO 对象路径:", result[0])
|
|
|
|
|
return result[0]
|
|
|
|
|
except psycopg2.Error as e:
|
|
|
|
|
print(f"数据库查询失败: {e}")
|
|
|
|
|
finally:
|
|
|
|
|
if conn:
|
|
|
|
|
conn.close()
|
|
|
|
|
################################################################# 坐从 MinIO 获取 JSON 对象内容 ##################################################
|
2025-07-10 10:04:45 +08:00
|
|
|
|
def get_minio_json(bucket_name, object_path, yaml_name='config'):
|
2025-07-10 09:41:26 +08:00
|
|
|
|
"""
|
|
|
|
|
从 MinIO 获取指定对象并解析为 JSON
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
client =load_config(yaml_name)
|
|
|
|
|
response = client.get_object(bucket_name, object_path)
|
|
|
|
|
raw_data = response.read()
|
|
|
|
|
try:
|
|
|
|
|
json_data = json.loads(raw_data)
|
|
|
|
|
return json_data
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
print(f"JSON 解析失败: {e}")
|
|
|
|
|
except UnicodeDecodeError as e:
|
|
|
|
|
print(f"编码错误: {e}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"MinIO 操作失败: {e}")
|
|
|
|
|
finally:
|
|
|
|
|
if 'response' in locals():
|
|
|
|
|
response.close()
|
|
|
|
|
response.release_conn()
|
|
|
|
|
##############################################坐标转换:WGS84 → CGCS2000(EPSG:4326 → EPSG:4544)###################################################
|
|
|
|
|
def transform_coordinate(points):
|
|
|
|
|
"""
|
|
|
|
|
经纬度坐标转为投影坐标(CGCS2000)
|
|
|
|
|
输入: points = [lat, lon, height]
|
|
|
|
|
返回: 转换后的点 [x, y, height]
|
|
|
|
|
"""
|
|
|
|
|
transformer = Transformer.from_crs("epsg:4326", "epsg:4544")
|
|
|
|
|
points[1], points[0] = transformer.transform(points[0], points[1])
|
|
|
|
|
return points
|
|
|
|
|
########################################################## 工具函数 ###############################################
|
|
|
|
|
def split_by_custom_sizes(lst, sizes):
|
|
|
|
|
"""按照 sizes 中的每一项将 lst 切片成多段子列表"""
|
|
|
|
|
result = []
|
|
|
|
|
index = 0
|
|
|
|
|
for size in sizes:
|
|
|
|
|
if index + size > len(lst):
|
|
|
|
|
break
|
|
|
|
|
result.append(lst[index : index + size])
|
|
|
|
|
index += size
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def flatten(nested_list):
|
|
|
|
|
"""将嵌套列表打平成一维列表"""
|
|
|
|
|
flattened = []
|
|
|
|
|
for item in nested_list:
|
|
|
|
|
if isinstance(item, list):
|
|
|
|
|
flattened.extend(item)
|
|
|
|
|
else:
|
|
|
|
|
flattened.append(item)
|
|
|
|
|
return flattened
|
|
|
|
|
|
|
|
|
|
#############################################################解析 JSON 中的几何数据(点、线、多边形、红线)############################################
|
|
|
|
|
def get_json_data(data):
|
|
|
|
|
"""
|
|
|
|
|
提取 JSON 中所有几何对象坐标,并返回为 NumPy 数组和各对象长度
|
|
|
|
|
返回:
|
|
|
|
|
- 所有坐标点(np.array)
|
|
|
|
|
- 各类对象的坐标数列表(Point, LineString, Polygon, hongxian)
|
|
|
|
|
"""
|
|
|
|
|
points_list, LineString_list, Polygon_list, hongxian_list = [], [], [], []
|
|
|
|
|
length, length_LineString, length_Polygon, length_hongxian = [], [], [], []
|
|
|
|
|
|
|
|
|
|
# Point
|
|
|
|
|
if len(data['Point']) > 0:
|
|
|
|
|
length.append(len(data['Point']))
|
|
|
|
|
for pt in data['Point']:
|
|
|
|
|
points_list.append(pt['coordinates'])
|
|
|
|
|
else:
|
|
|
|
|
length.append(0)
|
|
|
|
|
|
|
|
|
|
# LineString
|
|
|
|
|
if len(data['LineString']) > 0:
|
|
|
|
|
for line in data['LineString']:
|
|
|
|
|
LineString_list.append(line['coordinates'])
|
|
|
|
|
length_LineString.append(len(line['coordinates']))
|
|
|
|
|
length.append(length_LineString)
|
|
|
|
|
else:
|
|
|
|
|
length.append(0)
|
|
|
|
|
LineString_list = flatten(LineString_list)
|
|
|
|
|
|
|
|
|
|
# Polygon
|
|
|
|
|
if len(data['Polygon']) > 0:
|
|
|
|
|
for poly in data['Polygon']:
|
|
|
|
|
Polygon_list.append(poly['coordinates'])
|
|
|
|
|
length_Polygon.append(len(poly['coordinates']))
|
|
|
|
|
length.append(length_Polygon)
|
|
|
|
|
else:
|
|
|
|
|
length.append(0)
|
|
|
|
|
Polygon_list = flatten(Polygon_list)
|
|
|
|
|
|
|
|
|
|
# 红线
|
|
|
|
|
if len(data['hongxian']) > 0:
|
|
|
|
|
for h in data['hongxian']:
|
|
|
|
|
hongxian_list.append(h['coordinates'])
|
|
|
|
|
length_hongxian.append(len(h['coordinates']))
|
|
|
|
|
length.append(length_hongxian)
|
|
|
|
|
else:
|
|
|
|
|
length.append(0)
|
|
|
|
|
hongxian_list = flatten(hongxian_list)
|
|
|
|
|
|
|
|
|
|
all_points_list = points_list + LineString_list + Polygon_list + hongxian_list
|
|
|
|
|
all_points_numpy = np.array(all_points_list)
|
|
|
|
|
return all_points_numpy, length
|
|
|
|
|
|
|
|
|
|
########################################################主处理函数:投影 → 像素坐标#################################################
|
|
|
|
|
def send_data(data_json):
|
|
|
|
|
"""
|
|
|
|
|
将三维地理坐标投影到图像像素坐标,并写入回原 JSON 数据结构中
|
|
|
|
|
"""
|
|
|
|
|
# 初始化投影转换器(假设用于像素投影)
|
|
|
|
|
zhuanhuan11 = ProjectTrans(
|
|
|
|
|
f_mm=12.29,
|
|
|
|
|
sensor_width_mm=17.4,
|
|
|
|
|
sensor_height_mm=13.0,
|
|
|
|
|
image_width_px=8000,
|
|
|
|
|
image_height_px=6000,
|
|
|
|
|
k1=3956,
|
|
|
|
|
k2=-0.11257524,
|
|
|
|
|
k3=0.01487443,
|
|
|
|
|
k4=-0.027064,
|
|
|
|
|
k5=-0.00008572
|
|
|
|
|
)
|
|
|
|
|
bucket_name = data_json['bucket_name']
|
|
|
|
|
sql_yaml_path = data_json['sql_yaml_path']
|
|
|
|
|
target_id = data_json['target_id']
|
|
|
|
|
|
|
|
|
|
# 读取 SQL 配置
|
|
|
|
|
sql_config =read_sql_config(sql_yaml_path)
|
|
|
|
|
endpoint = sql_config.get('endpoint')
|
|
|
|
|
port = sql_config.get('port')
|
|
|
|
|
dbname = sql_config.get('dbname')
|
|
|
|
|
user = sql_config.get('user')
|
|
|
|
|
password = sql_config.get('password')
|
|
|
|
|
|
|
|
|
|
# 获取路径和数据
|
|
|
|
|
object_path = sql_connect(target_id,endpoint,port,dbname,user,password)
|
|
|
|
|
|
|
|
|
|
data = get_minio_json(bucket_name, object_path)
|
|
|
|
|
|
|
|
|
|
# 获取所有地理点坐标
|
|
|
|
|
points, length = get_json_data(data)
|
|
|
|
|
|
|
|
|
|
# 调整顺序为 [经度, 纬度, 高度]
|
|
|
|
|
points[:, [1, 0, 2]] = points[:, [0, 1, 2]]
|
|
|
|
|
points[:, [0, 1]] = points[:, [1, 0]]
|
|
|
|
|
|
|
|
|
|
# 相机姿态和位置参数
|
|
|
|
|
rvec = np.array([
|
|
|
|
|
data_json['gimbal_pitch'],
|
|
|
|
|
data_json['gimbal_yaw'],
|
|
|
|
|
data_json['gimbal_roll']
|
|
|
|
|
], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
tvec = np.array([
|
|
|
|
|
data_json['latitude'],
|
|
|
|
|
data_json['longitude'],
|
|
|
|
|
data_json['height']
|
|
|
|
|
], dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
# 经纬度 → 投影坐标
|
|
|
|
|
tvec = transform_coordinate(tvec)
|
|
|
|
|
|
|
|
|
|
# 投影点坐标 → 图像像素坐标
|
|
|
|
|
pixels_point = zhuanhuan11.projectPoints(points, tvec, rvec).tolist()
|
|
|
|
|
|
|
|
|
|
# 根据原始结构还原嵌套像素坐标结构
|
|
|
|
|
length_flattened = flatten(length)
|
|
|
|
|
pixels_point = split_by_custom_sizes(pixels_point, length_flattened)
|
|
|
|
|
|
|
|
|
|
# 插入像素点到原始结构
|
|
|
|
|
pixel_idx = 0
|
|
|
|
|
if len(data['Point']) > 0:
|
|
|
|
|
for i in range(len(data['Point'])):
|
|
|
|
|
data['Point'][i]['pixels_point'] = pixels_point[pixel_idx][i]
|
|
|
|
|
pixel_idx += 1
|
|
|
|
|
|
|
|
|
|
for i in range(len(data['LineString'])):
|
|
|
|
|
data['LineString'][i]['pixels_point'] = pixels_point[pixel_idx]
|
|
|
|
|
pixel_idx += 1
|
|
|
|
|
for i in range(len(data['Polygon'])):
|
|
|
|
|
data['Polygon'][i]['pixels_point'] = pixels_point[pixel_idx]
|
|
|
|
|
pixel_idx += 1
|
|
|
|
|
for i in range(len(data['hongxian'])):
|
|
|
|
|
data['hongxian'][i]['pixels_point'] = pixels_point[pixel_idx]
|
|
|
|
|
pixel_idx += 1
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
data_json = {
|
|
|
|
|
"bucket_name": "300bdf2b-a150-406e-be63-d28bd29b409f",
|
|
|
|
|
"sql_yaml_path": "bdzl",
|
|
|
|
|
"target_id": 1,
|
|
|
|
|
"gimbal_pitch": 10, # 单位度
|
|
|
|
|
"gimbal_yaw": 20,
|
|
|
|
|
"gimbal_roll": 5,
|
|
|
|
|
"latitude": 30.5, #经度
|
|
|
|
|
"longitude": 114.3, #纬度
|
|
|
|
|
"height": 100
|
|
|
|
|
}
|
|
|
|
|
result = send_data(data_json)
|
|
|
|
|
print(json.dumps(result, indent=2, ensure_ascii=False))
|