526 lines
20 KiB
Python
526 lines
20 KiB
Python
# earthwork_calculator.py
|
||
import numpy as np
|
||
from pyproj import Transformer, CRS
|
||
from scipy.spatial import Delaunay
|
||
from dataclasses import dataclass
|
||
from typing import List, Dict, Optional, Tuple, Union
|
||
import logging
|
||
from enum import Enum
|
||
from abc import ABC, abstractmethod
|
||
import math
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class AlgorithmType(Enum):
|
||
"""计算算法类型"""
|
||
GRID = "grid"
|
||
TIN = "tin"
|
||
PRISM = "prism"
|
||
|
||
@dataclass
|
||
class EarthworkResult3dTiles:
|
||
"""土方量计算结果"""
|
||
cut_volume: float # 挖方量 (m³)
|
||
fill_volume: float # 填方量 (m³)
|
||
net_volume: float # 净方量 (m³)
|
||
area: float # 计算区域面积 (m²)
|
||
avg_elevation: float # 平均高程
|
||
min_elevation: float # 最低高程
|
||
max_elevation: float # 最高高程
|
||
points_count: int # 使用的点数
|
||
bounding_box: Dict[str, List[float]] # 边界框
|
||
volume_accuracy: float # 计算精度
|
||
algorithm: str # 使用的算法
|
||
resolution: float # 计算分辨率
|
||
|
||
def to_dict(self) -> Dict:
|
||
"""转换为字典"""
|
||
return {
|
||
"volume": {
|
||
"cut": round(self.cut_volume, 3),
|
||
"fill": round(self.fill_volume, 3),
|
||
"net": round(self.net_volume, 3),
|
||
"unit": "m³"
|
||
},
|
||
"area": {
|
||
"value": round(self.area, 3),
|
||
"unit": "m²"
|
||
},
|
||
"elevation": {
|
||
"average": round(self.avg_elevation, 3),
|
||
"min": round(self.min_elevation, 3),
|
||
"max": round(self.max_elevation, 3),
|
||
"unit": "m"
|
||
},
|
||
"statistics": {
|
||
"points_count": self.points_count,
|
||
"accuracy": round(self.volume_accuracy, 3),
|
||
"algorithm": self.algorithm
|
||
},
|
||
"bounding_box": self.bounding_box,
|
||
"calculation_params": {
|
||
"resolution": self.resolution,
|
||
"accuracy": self.volume_accuracy
|
||
}
|
||
}
|
||
|
||
class TerrainDataSource(ABC):
|
||
"""地形数据源抽象类"""
|
||
|
||
@abstractmethod
|
||
async def get_points_in_polygon(self,
|
||
polygon_coords: List[List[float]],
|
||
z_range: Optional[Tuple[float, float]] = None) -> np.ndarray:
|
||
"""
|
||
获取多边形区域内的点云数据
|
||
|
||
Args:
|
||
polygon_coords: 多边形坐标 [[x1,y1], [x2,y2], ...]
|
||
z_range: 高程范围 (min_z, max_z)
|
||
|
||
Returns:
|
||
Nx3的numpy数组 [x, y, z]
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
async def get_data_bounds(self) -> Dict[str, List[float]]:
|
||
"""获取数据范围"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_crs(self) -> str:
|
||
"""获取数据坐标系"""
|
||
pass
|
||
|
||
class GeometryUtils:
|
||
"""几何计算工具类"""
|
||
|
||
@staticmethod
|
||
def calculate_polygon_area(polygon_coords: List[List[float]]) -> float:
|
||
"""计算多边形面积(平面面积)"""
|
||
polygon_np = np.array(polygon_coords)
|
||
x = polygon_np[:, 0]
|
||
y = polygon_np[:, 1]
|
||
return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1)))
|
||
|
||
@staticmethod
|
||
def is_point_in_polygon(point: np.ndarray, polygon: np.ndarray) -> bool:
|
||
"""判断点是否在多边形内"""
|
||
from matplotlib.path import Path
|
||
path = Path(polygon)
|
||
return path.contains_point(point)
|
||
|
||
@staticmethod
|
||
def calculate_triangle_area(points: np.ndarray) -> float:
|
||
"""计算三角形面积"""
|
||
a = np.linalg.norm(points[0] - points[1])
|
||
b = np.linalg.norm(points[1] - points[2])
|
||
c = np.linalg.norm(points[2] - points[0])
|
||
s = (a + b + c) / 2
|
||
return np.sqrt(s * (s - a) * (s - b) * (s - c))
|
||
|
||
@staticmethod
|
||
def create_grid(polygon: np.ndarray, resolution: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""创建规则格网"""
|
||
x_min, y_min = polygon.min(axis=0)
|
||
x_max, y_max = polygon.max(axis=0)
|
||
|
||
# 扩展一个格网单元
|
||
x_min -= resolution
|
||
x_max += resolution
|
||
y_min -= resolution
|
||
y_max += resolution
|
||
|
||
x_grid = np.arange(x_min, x_max + resolution, resolution)
|
||
y_grid = np.arange(y_min, y_max + resolution, resolution)
|
||
xx, yy = np.meshgrid(x_grid, y_grid)
|
||
|
||
return xx, yy, x_grid, y_grid
|
||
|
||
@staticmethod
|
||
def interpolate_grid(xx: np.ndarray, yy: np.ndarray,
|
||
points: np.ndarray, method: str = 'linear') -> np.ndarray:
|
||
"""格网插值"""
|
||
from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator
|
||
|
||
grid_points = np.column_stack([xx.ravel(), yy.ravel()])
|
||
|
||
if method == 'linear':
|
||
interpolator = LinearNDInterpolator(points[:, :2], points[:, 2])
|
||
elif method == 'cubic':
|
||
interpolator = CloughTocher2DInterpolator(points[:, :2], points[:, 2])
|
||
else:
|
||
raise ValueError(f"不支持的插值方法: {method}")
|
||
|
||
elevations = interpolator(grid_points)
|
||
return elevations.reshape(xx.shape)
|
||
|
||
class EarthworkCalculator3dTiles:
|
||
"""土方量计算器"""
|
||
|
||
def __init__(self, data_source: TerrainDataSource):
|
||
"""
|
||
初始化计算器
|
||
|
||
Args:
|
||
data_source: 地形数据源
|
||
"""
|
||
self.data_source = data_source
|
||
self._transformer_cache = {}
|
||
|
||
async def calculate(self,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float,
|
||
algorithm: AlgorithmType = AlgorithmType.TIN,
|
||
resolution: float = 1.0,
|
||
target_crs: str = "EPSG:4326",
|
||
interpolation_method: str = 'linear') -> EarthworkResult3dTiles:
|
||
"""
|
||
计算土方量
|
||
|
||
Args:
|
||
polygon_coords: 多边形坐标
|
||
design_elevation: 设计高程
|
||
algorithm: 计算算法
|
||
resolution: 格网分辨率(米)
|
||
target_crs: 目标坐标系
|
||
interpolation_method: 插值方法
|
||
|
||
Returns:
|
||
EarthworkResult: 计算结果
|
||
"""
|
||
try:
|
||
# 1. 获取数据
|
||
points = await self.data_source.get_points_in_polygon(polygon_coords)
|
||
|
||
if points.size == 0:
|
||
raise ValueError("区域内没有找到高程数据")
|
||
|
||
# 2. 坐标转换
|
||
points = await self._transform_coordinates(points, target_crs)
|
||
|
||
# 3. 执行计算
|
||
if algorithm == AlgorithmType.GRID:
|
||
result = await self._calculate_grid(points, polygon_coords,
|
||
design_elevation, resolution,
|
||
interpolation_method)
|
||
elif algorithm == AlgorithmType.TIN:
|
||
result = await self._calculate_tin(points, polygon_coords,
|
||
design_elevation)
|
||
elif algorithm == AlgorithmType.PRISM:
|
||
result = await self._calculate_prism(points, polygon_coords,
|
||
design_elevation, resolution)
|
||
else:
|
||
raise ValueError(f"不支持的算法: {algorithm}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"土方量计算失败: {str(e)}")
|
||
raise
|
||
|
||
async def _transform_coordinates(self, points: np.ndarray,
|
||
target_crs: str) -> np.ndarray:
|
||
"""坐标转换"""
|
||
source_crs = self.data_source.get_crs()
|
||
|
||
if source_crs == target_crs:
|
||
return points
|
||
|
||
cache_key = f"{source_crs}->{target_crs}"
|
||
if cache_key not in self._transformer_cache:
|
||
self._transformer_cache[cache_key] = Transformer.from_crs(
|
||
CRS.from_string(source_crs),
|
||
CRS.from_string(target_crs),
|
||
always_xy=True
|
||
)
|
||
|
||
transformer = self._transformer_cache[cache_key]
|
||
points_2d = transformer.transform(points[:, 0], points[:, 1])
|
||
|
||
return np.column_stack([points_2d[0], points_2d[1], points[:, 2]])
|
||
|
||
async def _calculate_grid(self, points: np.ndarray,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float,
|
||
resolution: float,
|
||
interpolation_method: str) -> EarthworkResult3dTiles:
|
||
"""格网法计算"""
|
||
polygon_np = np.array(polygon_coords)
|
||
|
||
# 创建格网
|
||
xx, yy, x_grid, y_grid = GeometryUtils.create_grid(polygon_np, resolution)
|
||
|
||
# 插值
|
||
natural_elevations = GeometryUtils.interpolate_grid(xx, yy, points, interpolation_method)
|
||
|
||
# 初始化挖填量
|
||
cut_volume = 0.0
|
||
fill_volume = 0.0
|
||
total_area = 0.0
|
||
|
||
# 遍历每个格网单元
|
||
for i in range(len(x_grid) - 1):
|
||
for j in range(len(y_grid) - 1):
|
||
# 格网四个角点
|
||
cell_corners = np.array([
|
||
[x_grid[i], y_grid[j]],
|
||
[x_grid[i+1], y_grid[j]],
|
||
[x_grid[i+1], y_grid[j+1]],
|
||
[x_grid[i], y_grid[j+1]]
|
||
])
|
||
|
||
# 检查格网中心点是否在多边形内
|
||
cell_center = cell_corners.mean(axis=0)
|
||
if not GeometryUtils.is_point_in_polygon(cell_center, polygon_np):
|
||
continue
|
||
|
||
# 获取格网四个角点的高程
|
||
cell_elevations = [
|
||
natural_elevations[j, i],
|
||
natural_elevations[j, i+1],
|
||
natural_elevations[j+1, i+1],
|
||
natural_elevations[j+1, i]
|
||
]
|
||
|
||
# 检查是否有无效数据
|
||
if any(np.isnan(elev) for elev in cell_elevations):
|
||
continue
|
||
|
||
# 计算格网平均高程
|
||
avg_elevation = np.mean(cell_elevations)
|
||
cell_area = resolution * resolution
|
||
total_area += cell_area
|
||
|
||
# 计算挖填量
|
||
height_diff = design_elevation - avg_elevation
|
||
if height_diff > 0:
|
||
fill_volume += height_diff * cell_area
|
||
else:
|
||
cut_volume += abs(height_diff) * cell_area
|
||
|
||
# 计算统计信息
|
||
area = GeometryUtils.calculate_polygon_area(polygon_coords)
|
||
mask = ~np.isnan(natural_elevations)
|
||
valid_elevations = natural_elevations[mask]
|
||
|
||
return EarthworkResult3dTiles(
|
||
cut_volume=cut_volume,
|
||
fill_volume=fill_volume,
|
||
net_volume=cut_volume - fill_volume,
|
||
area=area,
|
||
avg_elevation=np.mean(valid_elevations) if valid_elevations.size > 0 else 0,
|
||
min_elevation=np.min(valid_elevations) if valid_elevations.size > 0 else 0,
|
||
max_elevation=np.max(valid_elevations) if valid_elevations.size > 0 else 0,
|
||
points_count=points.shape[0],
|
||
bounding_box={
|
||
"min": [x_grid[0], y_grid[0]],
|
||
"max": [x_grid[-1], y_grid[-1]]
|
||
},
|
||
volume_accuracy=self._calculate_accuracy(points, resolution),
|
||
algorithm=AlgorithmType.GRID.value,
|
||
resolution=resolution
|
||
).to_dict()
|
||
|
||
async def _calculate_tin(self, points: np.ndarray,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float) -> EarthworkResult3dTiles:
|
||
"""三角网法计算"""
|
||
polygon_np = np.array(polygon_coords)
|
||
|
||
# 创建Delaunay三角网
|
||
triangulation = Delaunay(points[:, :2])
|
||
|
||
# 筛选多边形内的三角形
|
||
cut_volume = 0.0
|
||
fill_volume = 0.0
|
||
total_area = 0.0
|
||
|
||
for simplex in triangulation.simplices:
|
||
triangle_points = points[simplex]
|
||
triangle_center = triangle_points.mean(axis=0)[:2]
|
||
|
||
# 检查三角形中心是否在多边形内
|
||
if not GeometryUtils.is_point_in_polygon(triangle_center, polygon_np):
|
||
continue
|
||
|
||
# 计算三角形面积
|
||
area = GeometryUtils.calculate_triangle_area(triangle_points[:, :2])
|
||
total_area += area
|
||
|
||
# 计算平均高程(使用三个顶点的高程)
|
||
avg_elevation = triangle_points[:, 2].mean()
|
||
|
||
# 计算挖填量
|
||
height_diff = design_elevation - avg_elevation
|
||
if height_diff > 0:
|
||
fill_volume += height_diff * area
|
||
else:
|
||
cut_volume += abs(height_diff) * area
|
||
|
||
# 计算统计信息
|
||
area = GeometryUtils.calculate_polygon_area(polygon_coords)
|
||
|
||
return EarthworkResult3dTiles(
|
||
cut_volume=cut_volume,
|
||
fill_volume=fill_volume,
|
||
net_volume=cut_volume - fill_volume,
|
||
area=area,
|
||
avg_elevation=points[:, 2].mean(),
|
||
min_elevation=points[:, 2].min(),
|
||
max_elevation=points[:, 2].max(),
|
||
points_count=points.shape[0],
|
||
bounding_box={
|
||
"min": points.min(axis=0)[:2].tolist(),
|
||
"max": points.max(axis=0)[:2].tolist()
|
||
},
|
||
volume_accuracy=self._calculate_accuracy(points, 0),
|
||
algorithm=AlgorithmType.TIN.value,
|
||
resolution=0
|
||
).to_dict()
|
||
|
||
async def _calculate_prism(self, points: np.ndarray,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float,
|
||
resolution: float) -> EarthworkResult3dTiles:
|
||
"""三棱柱法计算"""
|
||
# 先创建TIN
|
||
polygon_np = np.array(polygon_coords)
|
||
triangulation = Delaunay(points[:, :2])
|
||
|
||
cut_volume = 0.0
|
||
fill_volume = 0.0
|
||
total_area = 0.0
|
||
|
||
for simplex in triangulation.simplices:
|
||
triangle_points = points[simplex]
|
||
triangle_center = triangle_points.mean(axis=0)[:2]
|
||
|
||
if not GeometryUtils.is_point_in_polygon(triangle_center, polygon_np):
|
||
continue
|
||
|
||
# 计算三角形面积
|
||
area = GeometryUtils.calculate_triangle_area(triangle_points[:, :2])
|
||
total_area += area
|
||
|
||
# 对于每个三角形,计算三棱柱体积
|
||
# 这里简化处理,使用梯形公式
|
||
for i in range(3):
|
||
j = (i + 1) % 3
|
||
# 计算边的长度
|
||
edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2])
|
||
|
||
# 计算边两端点的挖填高度
|
||
height_i = design_elevation - triangle_points[i, 2]
|
||
height_j = design_elevation - triangle_points[j, 2]
|
||
|
||
# 计算边的平均挖填高度
|
||
avg_height = (abs(height_i) + abs(height_j)) / 2
|
||
|
||
# 计算边的面积(假设边宽度为resolution)
|
||
edge_area = edge_length * resolution
|
||
|
||
if height_i > 0 or height_j > 0:
|
||
fill_volume += avg_height * edge_area
|
||
else:
|
||
cut_volume += avg_height * edge_area
|
||
|
||
area = GeometryUtils.calculate_polygon_area(polygon_coords)
|
||
|
||
return EarthworkResult3dTiles(
|
||
cut_volume=cut_volume,
|
||
fill_volume=fill_volume,
|
||
net_volume=cut_volume - fill_volume,
|
||
area=area,
|
||
avg_elevation=points[:, 2].mean(),
|
||
min_elevation=points[:, 2].min(),
|
||
max_elevation=points[:, 2].max(),
|
||
points_count=points.shape[0],
|
||
bounding_box={
|
||
"min": points.min(axis=0)[:2].tolist(),
|
||
"max": points.max(axis=0)[:2].tolist()
|
||
},
|
||
volume_accuracy=self._calculate_accuracy(points, resolution),
|
||
algorithm=AlgorithmType.PRISM.value,
|
||
resolution=resolution
|
||
).to_dict()
|
||
|
||
def _calculate_accuracy(self, points: np.ndarray, resolution: float) -> float:
|
||
"""计算精度评估"""
|
||
if points.shape[0] < 10:
|
||
return 0.5
|
||
|
||
# 基于点密度和分辨率估算精度
|
||
if points.shape[0] > 0:
|
||
# 计算点密度
|
||
bounds = points[:, :2]
|
||
area = (bounds[:, 0].max() - bounds[:, 0].min()) * \
|
||
(bounds[:, 1].max() - bounds[:, 1].min())
|
||
|
||
if area > 0:
|
||
point_density = points.shape[0] / area
|
||
|
||
if point_density > 100 and resolution <= 0.5: # 高密度,高分辨率
|
||
return 0.05
|
||
elif point_density > 10 and resolution <= 1.0:
|
||
return 0.1
|
||
elif point_density > 1:
|
||
return 0.2
|
||
|
||
return 0.3
|
||
|
||
async def validate(self, polygon_coords: List[List[float]]) -> Dict:
|
||
"""验证计算参数"""
|
||
try:
|
||
points = await self.data_source.get_points_in_polygon(polygon_coords)
|
||
|
||
validation_result = {
|
||
"polygon_valid": len(polygon_coords) >= 3,
|
||
"area": GeometryUtils.calculate_polygon_area(polygon_coords),
|
||
"points_available": points.size > 0,
|
||
"points_count": points.shape[0] if points.size > 0 else 0,
|
||
"data_quality": "good" if points.shape[0] > 100 else "poor",
|
||
"suggested_algorithm": self._suggest_algorithm(points),
|
||
"estimated_accuracy": self._calculate_accuracy(points, 1.0) if points.size > 0 else None
|
||
}
|
||
|
||
if points.size > 0:
|
||
validation_result.update({
|
||
"elevation_range": {
|
||
"min": float(points[:, 2].min()),
|
||
"max": float(points[:, 2].max()),
|
||
"avg": float(points[:, 2].mean())
|
||
},
|
||
"bounding_box": {
|
||
"min": points.min(axis=0).tolist(),
|
||
"max": points.max(axis=0).tolist()
|
||
}
|
||
})
|
||
|
||
return validation_result
|
||
|
||
except Exception as e:
|
||
logger.error(f"验证失败: {str(e)}")
|
||
return {
|
||
"polygon_valid": False,
|
||
"error": str(e)
|
||
}
|
||
|
||
def _suggest_algorithm(self, points: np.ndarray) -> str:
|
||
"""建议计算算法"""
|
||
if points.shape[0] < 100:
|
||
return AlgorithmType.TIN.value
|
||
|
||
# 计算点的规则性
|
||
bounds = points[:, :2]
|
||
x_range = bounds[:, 0].max() - bounds[:, 0].min()
|
||
y_range = bounds[:, 1].max() - bounds[:, 1].min()
|
||
|
||
# 如果点分布相对规则,建议使用格网法
|
||
if x_range > 0 and y_range > 0:
|
||
aspect_ratio = max(x_range, y_range) / min(x_range, y_range)
|
||
if aspect_ratio < 2: # 相对规则
|
||
return AlgorithmType.GRID.value
|
||
|
||
return AlgorithmType.TIN.value |