ai_project_v1/b3dm/volume_calculator.py
2026-01-14 11:37:35 +08:00

684 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
三维模型体积计算器
基于指定地理范围内的三维模型数据,使用三角构网方法计算体积
"""
import json
import struct
import os
import sys
from pathlib import Path
import numpy as np
from scipy.spatial import Delaunay
import math
from tqdm import tqdm
try:
import DracoPy
except ImportError:
print("警告: DracoPy库未安装无法处理Draco压缩的数据")
print("请运行: pip install DracoPy")
DracoPy = None
class VolumeCalculator:
def __init__(self, location_file):
self.location_bounds = self.load_location_bounds(location_file)
self.all_vertices = []
self.filtered_vertices = []
def load_location_bounds(self, location_file):
"""加载地理范围边界"""
try:
with open(location_file, 'r', encoding='utf-8') as f:
coords = json.load(f)
# 提取经纬度范围
lons = [coord[0] for coord in coords]
lats = [coord[1] for coord in coords]
elevs = [coord[2] for coord in coords]
bounds = {
'min_lon': min(lons),
'max_lon': max(lons),
'min_lat': min(lats),
'max_lat': max(lats),
'min_elev': min(elevs),
'max_elev': max(elevs),
'coords': coords
}
print(f"地理范围边界:")
print(f" 经度: {bounds['min_lon']:.8f} ~ {bounds['max_lon']:.8f}")
print(f" 纬度: {bounds['min_lat']:.8f} ~ {bounds['max_lat']:.8f}")
print(f" 高程: {bounds['min_elev']:.2f} ~ {bounds['max_elev']:.2f}")
return bounds
except Exception as e:
print(f"加载地理范围文件失败: {e}")
return None
def wgs84_to_cartesian(self, lon, lat, elev):
"""WGS84坐标转换为笛卡尔坐标高精度算法"""
# WGS84椭球参数EPSG:4326
a = 6378137.0 # 长半轴 (米)
f = 1/298.257223563 # 扁率
e2 = 2*f - f*f # 第一偏心率平方
# 角度转弧度
lon_rad = math.radians(lon)
lat_rad = math.radians(lat)
# 计算卯酉圈曲率半径
sin_lat = math.sin(lat_rad)
cos_lat = math.cos(lat_rad)
sin_lon = math.sin(lon_rad)
cos_lon = math.cos(lon_rad)
N = a / math.sqrt(1 - e2 * sin_lat * sin_lat)
# 高精度笛卡尔坐标计算
x = (N + elev) * cos_lat * cos_lon
y = (N + elev) * cos_lat * sin_lon
z = (N * (1 - e2) + elev) * sin_lat
return [x, y, z]
def cartesian_to_wgs84(self, x, y, z):
"""笛卡尔坐标转换为WGS84坐标高精度迭代法"""
# WGS84椭球参数
a = 6378137.0 # 长半轴
f = 1/298.257223563 # 扁率
e2 = 2*f - f*f # 第一偏心率平方
ep2 = e2 / (1 - e2) # 第二偏心率平方
# 计算经度(精确值)
lon = math.atan2(y, x)
# 计算纬度和高程使用Bowring迭代法
p = math.sqrt(x*x + y*y)
if p == 0:
# 极点情况
lat = math.pi/2 if z > 0 else -math.pi/2
elev = abs(z) - a * math.sqrt(1 - e2)
return [math.degrees(lon), math.degrees(lat), elev]
# 初始估计
theta = math.atan2(z, p * (1 - f))
lat_prev = math.atan2(z + ep2 * a * (1 - f) * math.sin(theta)**3,
p - e2 * a * math.cos(theta)**3)
# 迭代求解纬度
max_iterations = 10
tolerance = 1e-12
for i in range(max_iterations):
N = a / math.sqrt(1 - e2 * math.sin(lat_prev)**2)
elev = p / math.cos(lat_prev) - N
# 更新纬度估计
lat_new = math.atan2(z + e2 * N * math.sin(lat_prev), p)
# 检查收敛性
if abs(lat_new - lat_prev) < tolerance:
break
lat_prev = lat_new
# 最终计算高程
N = a / math.sqrt(1 - e2 * math.sin(lat_prev)**2)
elev = p / math.cos(lat_prev) - N
return [math.degrees(lon), math.degrees(lat_prev), elev]
def is_point_in_bounds(self, vertex):
"""检查点是否在指定的地理范围内"""
if not self.location_bounds:
return True
# 将笛卡尔坐标转换为WGS84
try:
lon, lat, elev = self.cartesian_to_wgs84(vertex[0], vertex[1], vertex[2])
# 检查是否在边界范围内
return (self.location_bounds['min_lon'] <= lon <= self.location_bounds['max_lon'] and
self.location_bounds['min_lat'] <= lat <= self.location_bounds['max_lat'])
except:
return False
def multiply_matrix_vector(self, matrix, vector):
"""4x4矩阵与4D向量相乘"""
m = [
[matrix[0], matrix[4], matrix[8], matrix[12]],
[matrix[1], matrix[5], matrix[9], matrix[13]],
[matrix[2], matrix[6], matrix[10], matrix[14]],
[matrix[3], matrix[7], matrix[11], matrix[15]]
]
v = [vector[0], vector[1], vector[2], 1.0]
result = [
m[0][0]*v[0] + m[0][1]*v[1] + m[0][2]*v[2] + m[0][3]*v[3],
m[1][0]*v[0] + m[1][1]*v[1] + m[1][2]*v[2] + m[1][3]*v[3],
m[2][0]*v[0] + m[2][1]*v[1] + m[2][2]*v[2] + m[2][3]*v[3]
]
return result
def multiply_matrices(self, m1, m2):
"""两个4x4矩阵相乘"""
def list_to_matrix(lst):
return [
[lst[0], lst[4], lst[8], lst[12]],
[lst[1], lst[5], lst[9], lst[13]],
[lst[2], lst[6], lst[10], lst[14]],
[lst[3], lst[7], lst[11], lst[15]]
]
def matrix_to_list(mat):
return [
mat[0][0], mat[1][0], mat[2][0], mat[3][0],
mat[0][1], mat[1][1], mat[2][1], mat[3][1],
mat[0][2], mat[1][2], mat[2][2], mat[3][2],
mat[0][3], mat[1][3], mat[2][3], mat[3][3]
]
a = list_to_matrix(m1)
b = list_to_matrix(m2)
result = [[0 for _ in range(4)] for _ in range(4)]
for i in range(4):
for j in range(4):
for k in range(4):
result[i][j] += a[i][k] * b[k][j]
return matrix_to_list(result)
def apply_transform_to_vertices(self, vertices, transform_matrix):
"""对顶点应用变换矩阵"""
if not transform_matrix:
return vertices
transformed_vertices = []
for vertex in vertices:
transformed = self.multiply_matrix_vector(transform_matrix, vertex)
transformed_vertices.append(transformed)
return transformed_vertices
def parse_tileset_json(self, tileset_path, parent_transform=None):
"""解析tileset.json文件收集B3DM文件和变换矩阵"""
try:
with open(tileset_path, 'r', encoding='utf-8') as f:
tileset_data = json.load(f)
b3dm_files = []
def process_node(node, base_path, accumulated_transform):
current_transform = node.get('transform')
if current_transform and accumulated_transform:
final_transform = self.multiply_matrices(accumulated_transform, current_transform)
elif current_transform:
final_transform = current_transform
else:
final_transform = accumulated_transform
if 'content' in node and 'uri' in node['content']:
uri = node['content']['uri']
if uri.endswith('.b3dm'):
full_path = os.path.join(base_path, uri)
if os.path.exists(full_path):
b3dm_files.append((full_path, final_transform))
elif uri.endswith('.json'):
sub_tileset_path = os.path.join(base_path, uri)
if os.path.exists(sub_tileset_path):
sub_files = self.parse_tileset_json(sub_tileset_path, final_transform)
b3dm_files.extend(sub_files)
if 'children' in node:
for child in node['children']:
process_node(child, base_path, final_transform)
base_path = os.path.dirname(tileset_path)
if 'root' in tileset_data:
process_node(tileset_data['root'], base_path, parent_transform)
return b3dm_files
except Exception as e:
print(f"解析tileset.json时出错: {e}")
return []
def parse_b3dm_file(self, file_path):
"""解析B3DM文件"""
try:
with open(file_path, 'rb') as f:
magic = f.read(4)
if magic != b'b3dm':
return None
version = struct.unpack('<I', f.read(4))[0]
byte_length = struct.unpack('<I', f.read(4))[0]
feature_table_json_byte_length = struct.unpack('<I', f.read(4))[0]
feature_table_binary_byte_length = struct.unpack('<I', f.read(4))[0]
batch_table_json_byte_length = struct.unpack('<I', f.read(4))[0]
batch_table_binary_byte_length = struct.unpack('<I', f.read(4))[0]
f.seek(28 + feature_table_json_byte_length + feature_table_binary_byte_length +
batch_table_json_byte_length + batch_table_binary_byte_length)
gltf_data = f.read()
return self.parse_gltf_data(gltf_data)
except Exception as e:
print(f"解析B3DM文件 {file_path} 失败: {e}")
return None
def parse_gltf_data(self, gltf_data):
"""解析glTF数据"""
try:
if gltf_data[:4] == b'glTF':
return self.parse_glb_data(gltf_data)
else:
gltf_json = json.loads(gltf_data.decode('utf-8'))
return self.extract_vertices_from_gltf(gltf_json, None)
except Exception as e:
print(f"解析glTF数据失败: {e}")
return None
def parse_glb_data(self, glb_data):
"""解析GLB格式的glTF数据"""
try:
magic = glb_data[:4]
if magic != b'glTF':
return None
version = struct.unpack('<I', glb_data[4:8])[0]
total_length = struct.unpack('<I', glb_data[8:12])[0]
offset = 12
json_data = None
binary_data = None
while offset < len(glb_data):
if offset + 8 > len(glb_data):
break
chunk_length = struct.unpack('<I', glb_data[offset:offset+4])[0]
chunk_type = glb_data[offset+4:offset+8]
chunk_data = glb_data[offset+8:offset+8+chunk_length]
if chunk_type == b'JSON':
json_data = json.loads(chunk_data.decode('utf-8'))
elif chunk_type == b'BIN\x00':
binary_data = chunk_data
offset += 8 + chunk_length
offset = (offset + 3) & ~3
if json_data:
return self.extract_vertices_from_gltf(json_data, binary_data)
except Exception as e:
print(f"解析GLB数据失败: {e}")
return None
def extract_vertices_from_gltf(self, gltf_json, binary_data):
"""从glTF JSON中提取顶点数据"""
vertices = []
try:
if 'extensionsUsed' in gltf_json and 'KHR_draco_mesh_compression' in gltf_json['extensionsUsed']:
if DracoPy is None:
print("警告: 检测到Draco压缩但DracoPy未安装")
return vertices
return self.extract_draco_vertices(gltf_json, binary_data)
if 'meshes' not in gltf_json:
return vertices
for mesh in gltf_json['meshes']:
for primitive in mesh['primitives']:
if 'attributes' in primitive and 'POSITION' in primitive['attributes']:
position_accessor_index = primitive['attributes']['POSITION']
if 'accessors' in gltf_json and position_accessor_index < len(gltf_json['accessors']):
accessor = gltf_json['accessors'][position_accessor_index]
buffer_view_index = accessor['bufferView']
if 'bufferViews' in gltf_json and buffer_view_index < len(gltf_json['bufferViews']):
buffer_view = gltf_json['bufferViews'][buffer_view_index]
buffer_index = buffer_view['buffer']
byte_offset = buffer_view.get('byteOffset', 0) + accessor.get('byteOffset', 0)
if binary_data and buffer_index == 0:
component_type = accessor['componentType']
count = accessor['count']
if component_type == 5126: # FLOAT
vertex_data = struct.unpack(f'<{count*3}f',
binary_data[byte_offset:byte_offset+count*12])
for i in range(0, len(vertex_data), 3):
vertices.append([vertex_data[i], vertex_data[i+1], vertex_data[i+2]])
except Exception as e:
print(f"提取顶点数据失败: {e}")
return vertices
def extract_draco_vertices(self, gltf_json, binary_data):
"""提取Draco压缩的顶点数据"""
vertices = []
try:
if 'meshes' not in gltf_json:
return vertices
for mesh in gltf_json['meshes']:
for primitive in mesh['primitives']:
if 'extensions' in primitive and 'KHR_draco_mesh_compression' in primitive['extensions']:
draco_ext = primitive['extensions']['KHR_draco_mesh_compression']
buffer_view_index = draco_ext['bufferView']
if 'bufferViews' in gltf_json and buffer_view_index < len(gltf_json['bufferViews']):
buffer_view = gltf_json['bufferViews'][buffer_view_index]
byte_offset = buffer_view.get('byteOffset', 0)
byte_length = buffer_view['byteLength']
if binary_data:
draco_data = binary_data[byte_offset:byte_offset+byte_length]
mesh_data = DracoPy.decode(draco_data)
if hasattr(mesh_data, 'points'):
points = mesh_data.points
for point in points:
vertices.append([float(point[0]), float(point[1]), float(point[2])])
except Exception as e:
print(f"解码Draco数据失败: {e}")
return vertices
def calculate_triangle_angles(self, p1, p2, p3):
"""计算三角形的三个内角(度)"""
# 计算三边长度
a = np.linalg.norm(p2 - p3) # 边a对应角A(p1)
b = np.linalg.norm(p1 - p3) # 边b对应角B(p2)
c = np.linalg.norm(p1 - p2) # 边c对应角C(p3)
# 避免除零错误
if a == 0 or b == 0 or c == 0:
return [0, 0, 0]
# 使用余弦定理计算角度
try:
# 角A = arccos((b²+c²-a²)/(2bc))
cos_A = (b*b + c*c - a*a) / (2*b*c)
cos_B = (a*a + c*c - b*b) / (2*a*c)
cos_C = (a*a + b*b - c*c) / (2*a*b)
# 限制余弦值范围,避免数值误差
cos_A = np.clip(cos_A, -1.0, 1.0)
cos_B = np.clip(cos_B, -1.0, 1.0)
cos_C = np.clip(cos_C, -1.0, 1.0)
angle_A = np.arccos(cos_A) * 180 / np.pi
angle_B = np.arccos(cos_B) * 180 / np.pi
angle_C = np.arccos(cos_C) * 180 / np.pi
return [angle_A, angle_B, angle_C]
except:
return [0, 0, 0]
def is_valid_triangle(self, p1, p2, p3, min_angle=10.0, max_aspect_ratio=10.0):
"""验证三角形质量,基于角度约束和长宽比"""
angles = self.calculate_triangle_angles(p1, p2, p3)
# 检查最小角度约束参考C#代码中的10度限制
if min(angles) < min_angle:
return False
# 计算三边长度
a = np.linalg.norm(p2 - p3)
b = np.linalg.norm(p1 - p3)
c = np.linalg.norm(p1 - p2)
# 检查长宽比约束
max_side = max(a, b, c)
min_side = min(a, b, c)
if min_side > 0 and max_side / min_side > max_aspect_ratio:
return False
return True
def calculate_circumcenter_and_radius(self, p1, p2, p3):
"""计算三角形外接圆圆心和半径(高精度算法)"""
try:
x1, y1 = p1[0], p1[1]
x2, y2 = p2[0], p2[1]
x3, y3 = p3[0], p3[1]
# 使用C#代码中的高精度外接圆计算公式
d = 2 * (x1 * (y2 - y3) + x2 * (y3 - y1) + x3 * (y1 - y2))
if abs(d) < 1e-10: # 三点共线
return None, float('inf')
ux = ((x1*x1 + y1*y1) * (y2 - y3) + (x2*x2 + y2*y2) * (y3 - y1) + (x3*x3 + y3*y3) * (y1 - y2)) / d
uy = ((x1*x1 + y1*y1) * (x3 - x2) + (x2*x2 + y2*y2) * (x1 - y3) + (x3*x3 + y3*y3) * (x2 - x1)) / d
# 计算半径
radius = np.sqrt((ux - x1)**2 + (uy - y1)**2)
return np.array([ux, uy]), radius
except:
return None, float('inf')
def calculate_volume_delaunay(self, vertices, base_elevation=None, min_angle=10.0, use_quality_filter=True):
"""使用Delaunay三角剖分计算体积"""
if len(vertices) < 4:
print("顶点数量不足,无法进行三角剖分")
return 0.0
try:
points = np.array(vertices)
if base_elevation is None:
base_elevation = np.min(points[:, 2])
print(f"Delaunay方法使用基准面高程: {base_elevation:.2f}")
print(f"质量控制参数: 最小角度={min_angle}°, 质量过滤={'开启' if use_quality_filter else '关闭'}")
adjusted_points = points.copy()
adjusted_points[:, 2] = np.maximum(points[:, 2] - base_elevation, 0)
print("正在进行Delaunay三角剖分...")
tri = Delaunay(adjusted_points)
valid_simplices = []
total_simplices = len(tri.simplices)
if use_quality_filter:
print("正在进行三角形质量检查...")
for simplex in tqdm(tri.simplices, desc="质量检查", unit=""):
p0 = adjusted_points[simplex[0]]
p1 = adjusted_points[simplex[1]]
p2 = adjusted_points[simplex[2]]
p3 = adjusted_points[simplex[3]]
faces = [(p0, p1, p2), (p0, p1, p3), (p0, p2, p3), (p1, p2, p3)]
valid_faces = 0
for face in faces:
if self.is_valid_triangle(face[0], face[1], face[2], min_angle):
valid_faces += 1
if valid_faces >= 3:
valid_simplices.append(simplex)
print(f"质量过滤: {len(valid_simplices)}/{total_simplices} 个四面体通过质量检查")
else:
valid_simplices = tri.simplices
total_volume = 0.0
valid_volume_count = 0
print(f"计算 {len(valid_simplices)} 个四面体的体积...")
for simplex in tqdm(valid_simplices, desc="计算四面体体积", unit=""):
p0 = adjusted_points[simplex[0]]
p1 = adjusted_points[simplex[1]]
p2 = adjusted_points[simplex[2]]
p3 = adjusted_points[simplex[3]]
v1 = p1 - p0
v2 = p2 - p0
v3 = p3 - p0
det = np.linalg.det(np.array([v1, v2, v3]))
volume = abs(det) / 6.0
if volume > 1e-12:
total_volume += volume
valid_volume_count += 1
print(f"有效体积计算: {valid_volume_count}/{len(valid_simplices)} 个四面体")
return total_volume
except Exception as e:
print(f"Delaunay三角剖分计算体积失败: {e}")
return 0.0
def load_and_filter_vertices(self, tileset_path):
"""加载并过滤指定范围内的顶点"""
print(f"开始处理tileset: {tileset_path}")
# 解析tileset获取所有B3DM文件
b3dm_files = self.parse_tileset_json(tileset_path)
print(f"找到 {len(b3dm_files)} 个B3DM文件")
if not b3dm_files:
print("未找到任何B3DM文件")
return False
# 处理每个B3DM文件
processed_count = 0
total_vertices = 0
filtered_count = 0
for i, (b3dm_file, transform_matrix) in enumerate(tqdm(b3dm_files, desc="处理B3DM文件", unit="文件")):
# print(f"处理文件 {i+1}/{len(b3dm_files)}: {os.path.basename(b3dm_file)}")
vertices = self.parse_b3dm_file(b3dm_file)
if vertices:
# 应用变换矩阵
if transform_matrix:
vertices = self.apply_transform_to_vertices(vertices, transform_matrix)
# 过滤范围内的顶点
for vertex in vertices:
total_vertices += 1
if self.is_point_in_bounds(vertex):
self.filtered_vertices.append(vertex)
filtered_count += 1
self.all_vertices.extend(vertices)
processed_count += 1
# tqdm.write(f" {os.path.basename(b3dm_file)}: 提取到 {len(vertices)} 个顶点")
print(f"\n成功处理 {processed_count}/{len(b3dm_files)} 个文件")
print(f"总计提取 {total_vertices} 个顶点")
print(f"范围内顶点 {filtered_count}")
return len(self.filtered_vertices) > 0
def calculate_volume(self, tileset_path, base_elevation=None, min_angle=10.0, use_quality_filter=True):
"""计算指定范围内三维模型的体积
Args:
tileset_path: tileset.json文件路径
base_elevation: 基准面高程
min_angle: 最小角度约束(度)
use_quality_filter: 是否启用质量过滤
"""
if not self.load_and_filter_vertices(tileset_path):
print("未找到范围内的顶点数据")
return 0.0
print(f"\n开始计算体积使用Delaunay三角剖分方法")
print(f"参与计算的顶点数: {len(self.filtered_vertices)}")
points = np.array(self.filtered_vertices)
if base_elevation is None:
base_elevation = np.min(points[:, 2])
print(f"统一基准面高程: {base_elevation:.2f}")
volume = self.calculate_volume_delaunay(self.filtered_vertices, base_elevation, min_angle, use_quality_filter)
print(f"\n计算结果:")
print(f"体积: {volume:.6f} 立方米")
print(f"体积: {volume/1000000:.6f} 立方千米")
return volume
def main():
if len(sys.argv) < 3:
print("用法: python volume_calculator.py <lboundary.json路径> <tileset.json路径> [基准面高程] [最小角度] [质量过滤]")
print("基准面高程: 基准面高程(米),默认使用最低点")
print("最小角度: 三角形最小角度约束(度)默认10.0")
print("质量过滤: 是否启用质量过滤(true/false)默认true")
print("示例: python volume_calculator.py boundary.json tileset.json 100.0 15.0 true")
return
location_file = sys.argv[1]
tileset_path = sys.argv[2]
# 解析可选参数
base_elevation = None
min_angle = 10.0
use_quality_filter = True
if len(sys.argv) > 3:
try:
base_elevation = float(sys.argv[3])
except ValueError:
print("错误:基准面高程必须是数字")
return
if len(sys.argv) > 4:
try:
min_angle = float(sys.argv[4])
except ValueError:
print("错误:最小角度必须是数字")
return
if len(sys.argv) > 5:
use_quality_filter = sys.argv[5].lower() in ['true', '1', 'yes', 'on']
if not os.path.exists(location_file):
print(f"错误: 位置文件不存在 {location_file}")
return
if not os.path.exists(tileset_path):
print(f"错误: tileset文件不存在 {tileset_path}")
return
calculator = VolumeCalculator(location_file)
volume = calculator.calculate_volume(tileset_path, base_elevation, min_angle, use_quality_filter)
if volume > 0:
print("\n体积计算完成!")
else:
print("\n体积计算失败!")
if __name__ == "__main__":
main()