ai_project_v1/b3dm/earthwork_calculator_3d_tiles.py

526 lines
20 KiB
Python
Raw Normal View History

2026-01-14 11:37:35 +08:00
# earthwork_calculator.py
import numpy as np
from pyproj import Transformer, CRS
from scipy.spatial import Delaunay
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Union
import logging
from enum import Enum
from abc import ABC, abstractmethod
import math
logger = logging.getLogger(__name__)
class AlgorithmType(Enum):
"""计算算法类型"""
GRID = "grid"
TIN = "tin"
PRISM = "prism"
@dataclass
class EarthworkResult3dTiles:
"""土方量计算结果"""
cut_volume: float # 挖方量 (m³)
fill_volume: float # 填方量 (m³)
net_volume: float # 净方量 (m³)
area: float # 计算区域面积 (m²)
avg_elevation: float # 平均高程
min_elevation: float # 最低高程
max_elevation: float # 最高高程
points_count: int # 使用的点数
bounding_box: Dict[str, List[float]] # 边界框
volume_accuracy: float # 计算精度
algorithm: str # 使用的算法
resolution: float # 计算分辨率
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"volume": {
"cut": round(self.cut_volume, 3),
"fill": round(self.fill_volume, 3),
"net": round(self.net_volume, 3),
"unit": ""
},
"area": {
"value": round(self.area, 3),
"unit": ""
},
"elevation": {
"average": round(self.avg_elevation, 3),
"min": round(self.min_elevation, 3),
"max": round(self.max_elevation, 3),
"unit": "m"
},
"statistics": {
"points_count": self.points_count,
"accuracy": round(self.volume_accuracy, 3),
"algorithm": self.algorithm
},
"bounding_box": self.bounding_box,
"calculation_params": {
"resolution": self.resolution,
"accuracy": self.volume_accuracy
}
}
class TerrainDataSource(ABC):
"""地形数据源抽象类"""
@abstractmethod
async def get_points_in_polygon(self,
polygon_coords: List[List[float]],
z_range: Optional[Tuple[float, float]] = None) -> np.ndarray:
"""
获取多边形区域内的点云数据
Args:
polygon_coords: 多边形坐标 [[x1,y1], [x2,y2], ...]
z_range: 高程范围 (min_z, max_z)
Returns:
Nx3的numpy数组 [x, y, z]
"""
pass
@abstractmethod
async def get_data_bounds(self) -> Dict[str, List[float]]:
"""获取数据范围"""
pass
@abstractmethod
def get_crs(self) -> str:
"""获取数据坐标系"""
pass
class GeometryUtils:
"""几何计算工具类"""
@staticmethod
def calculate_polygon_area(polygon_coords: List[List[float]]) -> float:
"""计算多边形面积(平面面积)"""
polygon_np = np.array(polygon_coords)
x = polygon_np[:, 0]
y = polygon_np[:, 1]
return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1)))
@staticmethod
def is_point_in_polygon(point: np.ndarray, polygon: np.ndarray) -> bool:
"""判断点是否在多边形内"""
from matplotlib.path import Path
path = Path(polygon)
return path.contains_point(point)
@staticmethod
def calculate_triangle_area(points: np.ndarray) -> float:
"""计算三角形面积"""
a = np.linalg.norm(points[0] - points[1])
b = np.linalg.norm(points[1] - points[2])
c = np.linalg.norm(points[2] - points[0])
s = (a + b + c) / 2
return np.sqrt(s * (s - a) * (s - b) * (s - c))
@staticmethod
def create_grid(polygon: np.ndarray, resolution: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""创建规则格网"""
x_min, y_min = polygon.min(axis=0)
x_max, y_max = polygon.max(axis=0)
# 扩展一个格网单元
x_min -= resolution
x_max += resolution
y_min -= resolution
y_max += resolution
x_grid = np.arange(x_min, x_max + resolution, resolution)
y_grid = np.arange(y_min, y_max + resolution, resolution)
xx, yy = np.meshgrid(x_grid, y_grid)
return xx, yy, x_grid, y_grid
@staticmethod
def interpolate_grid(xx: np.ndarray, yy: np.ndarray,
points: np.ndarray, method: str = 'linear') -> np.ndarray:
"""格网插值"""
from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator
grid_points = np.column_stack([xx.ravel(), yy.ravel()])
if method == 'linear':
interpolator = LinearNDInterpolator(points[:, :2], points[:, 2])
elif method == 'cubic':
interpolator = CloughTocher2DInterpolator(points[:, :2], points[:, 2])
else:
raise ValueError(f"不支持的插值方法: {method}")
elevations = interpolator(grid_points)
return elevations.reshape(xx.shape)
class EarthworkCalculator3dTiles:
"""土方量计算器"""
def __init__(self, data_source: TerrainDataSource):
"""
初始化计算器
Args:
data_source: 地形数据源
"""
self.data_source = data_source
self._transformer_cache = {}
async def calculate(self,
polygon_coords: List[List[float]],
design_elevation: float,
algorithm: AlgorithmType = AlgorithmType.TIN,
resolution: float = 1.0,
target_crs: str = "EPSG:4326",
interpolation_method: str = 'linear') -> EarthworkResult3dTiles:
"""
计算土方量
Args:
polygon_coords: 多边形坐标
design_elevation: 设计高程
algorithm: 计算算法
resolution: 格网分辨率
target_crs: 目标坐标系
interpolation_method: 插值方法
Returns:
EarthworkResult: 计算结果
"""
try:
# 1. 获取数据
points = await self.data_source.get_points_in_polygon(polygon_coords)
if points.size == 0:
raise ValueError("区域内没有找到高程数据")
# 2. 坐标转换
points = await self._transform_coordinates(points, target_crs)
# 3. 执行计算
if algorithm == AlgorithmType.GRID:
result = await self._calculate_grid(points, polygon_coords,
design_elevation, resolution,
interpolation_method)
elif algorithm == AlgorithmType.TIN:
result = await self._calculate_tin(points, polygon_coords,
design_elevation)
elif algorithm == AlgorithmType.PRISM:
result = await self._calculate_prism(points, polygon_coords,
design_elevation, resolution)
else:
raise ValueError(f"不支持的算法: {algorithm}")
return result
except Exception as e:
logger.error(f"土方量计算失败: {str(e)}")
raise
async def _transform_coordinates(self, points: np.ndarray,
target_crs: str) -> np.ndarray:
"""坐标转换"""
source_crs = self.data_source.get_crs()
if source_crs == target_crs:
return points
cache_key = f"{source_crs}->{target_crs}"
if cache_key not in self._transformer_cache:
self._transformer_cache[cache_key] = Transformer.from_crs(
CRS.from_string(source_crs),
CRS.from_string(target_crs),
always_xy=True
)
transformer = self._transformer_cache[cache_key]
points_2d = transformer.transform(points[:, 0], points[:, 1])
return np.column_stack([points_2d[0], points_2d[1], points[:, 2]])
async def _calculate_grid(self, points: np.ndarray,
polygon_coords: List[List[float]],
design_elevation: float,
resolution: float,
interpolation_method: str) -> EarthworkResult3dTiles:
"""格网法计算"""
polygon_np = np.array(polygon_coords)
# 创建格网
xx, yy, x_grid, y_grid = GeometryUtils.create_grid(polygon_np, resolution)
# 插值
natural_elevations = GeometryUtils.interpolate_grid(xx, yy, points, interpolation_method)
# 初始化挖填量
cut_volume = 0.0
fill_volume = 0.0
total_area = 0.0
# 遍历每个格网单元
for i in range(len(x_grid) - 1):
for j in range(len(y_grid) - 1):
# 格网四个角点
cell_corners = np.array([
[x_grid[i], y_grid[j]],
[x_grid[i+1], y_grid[j]],
[x_grid[i+1], y_grid[j+1]],
[x_grid[i], y_grid[j+1]]
])
# 检查格网中心点是否在多边形内
cell_center = cell_corners.mean(axis=0)
if not GeometryUtils.is_point_in_polygon(cell_center, polygon_np):
continue
# 获取格网四个角点的高程
cell_elevations = [
natural_elevations[j, i],
natural_elevations[j, i+1],
natural_elevations[j+1, i+1],
natural_elevations[j+1, i]
]
# 检查是否有无效数据
if any(np.isnan(elev) for elev in cell_elevations):
continue
# 计算格网平均高程
avg_elevation = np.mean(cell_elevations)
cell_area = resolution * resolution
total_area += cell_area
# 计算挖填量
height_diff = design_elevation - avg_elevation
if height_diff > 0:
fill_volume += height_diff * cell_area
else:
cut_volume += abs(height_diff) * cell_area
# 计算统计信息
area = GeometryUtils.calculate_polygon_area(polygon_coords)
mask = ~np.isnan(natural_elevations)
valid_elevations = natural_elevations[mask]
return EarthworkResult3dTiles(
cut_volume=cut_volume,
fill_volume=fill_volume,
net_volume=cut_volume - fill_volume,
area=area,
avg_elevation=np.mean(valid_elevations) if valid_elevations.size > 0 else 0,
min_elevation=np.min(valid_elevations) if valid_elevations.size > 0 else 0,
max_elevation=np.max(valid_elevations) if valid_elevations.size > 0 else 0,
points_count=points.shape[0],
bounding_box={
"min": [x_grid[0], y_grid[0]],
"max": [x_grid[-1], y_grid[-1]]
},
volume_accuracy=self._calculate_accuracy(points, resolution),
algorithm=AlgorithmType.GRID.value,
resolution=resolution
).to_dict()
async def _calculate_tin(self, points: np.ndarray,
polygon_coords: List[List[float]],
design_elevation: float) -> EarthworkResult3dTiles:
"""三角网法计算"""
polygon_np = np.array(polygon_coords)
# 创建Delaunay三角网
triangulation = Delaunay(points[:, :2])
# 筛选多边形内的三角形
cut_volume = 0.0
fill_volume = 0.0
total_area = 0.0
for simplex in triangulation.simplices:
triangle_points = points[simplex]
triangle_center = triangle_points.mean(axis=0)[:2]
# 检查三角形中心是否在多边形内
if not GeometryUtils.is_point_in_polygon(triangle_center, polygon_np):
continue
# 计算三角形面积
area = GeometryUtils.calculate_triangle_area(triangle_points[:, :2])
total_area += area
# 计算平均高程(使用三个顶点的高程)
avg_elevation = triangle_points[:, 2].mean()
# 计算挖填量
height_diff = design_elevation - avg_elevation
if height_diff > 0:
fill_volume += height_diff * area
else:
cut_volume += abs(height_diff) * area
# 计算统计信息
area = GeometryUtils.calculate_polygon_area(polygon_coords)
return EarthworkResult3dTiles(
cut_volume=cut_volume,
fill_volume=fill_volume,
net_volume=cut_volume - fill_volume,
area=area,
avg_elevation=points[:, 2].mean(),
min_elevation=points[:, 2].min(),
max_elevation=points[:, 2].max(),
points_count=points.shape[0],
bounding_box={
"min": points.min(axis=0)[:2].tolist(),
"max": points.max(axis=0)[:2].tolist()
},
volume_accuracy=self._calculate_accuracy(points, 0),
algorithm=AlgorithmType.TIN.value,
resolution=0
).to_dict()
async def _calculate_prism(self, points: np.ndarray,
polygon_coords: List[List[float]],
design_elevation: float,
resolution: float) -> EarthworkResult3dTiles:
"""三棱柱法计算"""
# 先创建TIN
polygon_np = np.array(polygon_coords)
triangulation = Delaunay(points[:, :2])
cut_volume = 0.0
fill_volume = 0.0
total_area = 0.0
for simplex in triangulation.simplices:
triangle_points = points[simplex]
triangle_center = triangle_points.mean(axis=0)[:2]
if not GeometryUtils.is_point_in_polygon(triangle_center, polygon_np):
continue
# 计算三角形面积
area = GeometryUtils.calculate_triangle_area(triangle_points[:, :2])
total_area += area
# 对于每个三角形,计算三棱柱体积
# 这里简化处理,使用梯形公式
for i in range(3):
j = (i + 1) % 3
# 计算边的长度
edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2])
# 计算边两端点的挖填高度
height_i = design_elevation - triangle_points[i, 2]
height_j = design_elevation - triangle_points[j, 2]
# 计算边的平均挖填高度
avg_height = (abs(height_i) + abs(height_j)) / 2
# 计算边的面积假设边宽度为resolution
edge_area = edge_length * resolution
if height_i > 0 or height_j > 0:
fill_volume += avg_height * edge_area
else:
cut_volume += avg_height * edge_area
area = GeometryUtils.calculate_polygon_area(polygon_coords)
return EarthworkResult3dTiles(
cut_volume=cut_volume,
fill_volume=fill_volume,
net_volume=cut_volume - fill_volume,
area=area,
avg_elevation=points[:, 2].mean(),
min_elevation=points[:, 2].min(),
max_elevation=points[:, 2].max(),
points_count=points.shape[0],
bounding_box={
"min": points.min(axis=0)[:2].tolist(),
"max": points.max(axis=0)[:2].tolist()
},
volume_accuracy=self._calculate_accuracy(points, resolution),
algorithm=AlgorithmType.PRISM.value,
resolution=resolution
).to_dict()
def _calculate_accuracy(self, points: np.ndarray, resolution: float) -> float:
"""计算精度评估"""
if points.shape[0] < 10:
return 0.5
# 基于点密度和分辨率估算精度
if points.shape[0] > 0:
# 计算点密度
bounds = points[:, :2]
area = (bounds[:, 0].max() - bounds[:, 0].min()) * \
(bounds[:, 1].max() - bounds[:, 1].min())
if area > 0:
point_density = points.shape[0] / area
if point_density > 100 and resolution <= 0.5: # 高密度,高分辨率
return 0.05
elif point_density > 10 and resolution <= 1.0:
return 0.1
elif point_density > 1:
return 0.2
return 0.3
async def validate(self, polygon_coords: List[List[float]]) -> Dict:
"""验证计算参数"""
try:
points = await self.data_source.get_points_in_polygon(polygon_coords)
validation_result = {
"polygon_valid": len(polygon_coords) >= 3,
"area": GeometryUtils.calculate_polygon_area(polygon_coords),
"points_available": points.size > 0,
"points_count": points.shape[0] if points.size > 0 else 0,
"data_quality": "good" if points.shape[0] > 100 else "poor",
"suggested_algorithm": self._suggest_algorithm(points),
"estimated_accuracy": self._calculate_accuracy(points, 1.0) if points.size > 0 else None
}
if points.size > 0:
validation_result.update({
"elevation_range": {
"min": float(points[:, 2].min()),
"max": float(points[:, 2].max()),
"avg": float(points[:, 2].mean())
},
"bounding_box": {
"min": points.min(axis=0).tolist(),
"max": points.max(axis=0).tolist()
}
})
return validation_result
except Exception as e:
logger.error(f"验证失败: {str(e)}")
return {
"polygon_valid": False,
"error": str(e)
}
def _suggest_algorithm(self, points: np.ndarray) -> str:
"""建议计算算法"""
if points.shape[0] < 100:
return AlgorithmType.TIN.value
# 计算点的规则性
bounds = points[:, :2]
x_range = bounds[:, 0].max() - bounds[:, 0].min()
y_range = bounds[:, 1].max() - bounds[:, 1].min()
# 如果点分布相对规则,建议使用格网法
if x_range > 0 and y_range > 0:
aspect_ratio = max(x_range, y_range) / min(x_range, y_range)
if aspect_ratio < 2: # 相对规则
return AlgorithmType.GRID.value
return AlgorithmType.TIN.value