842 lines
31 KiB
Python
842 lines
31 KiB
Python
# earthwork_calculator.py
|
||
import numpy as np
|
||
from pyproj import Transformer, CRS
|
||
from scipy.spatial import Delaunay
|
||
from dataclasses import dataclass
|
||
from typing import List, Dict, Optional, Tuple, Union
|
||
import logging
|
||
from enum import Enum
|
||
from abc import ABC, abstractmethod
|
||
import math
|
||
from pyproj import Geod
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class AlgorithmType(Enum):
|
||
"""计算算法类型"""
|
||
GRID = "grid"
|
||
TIN = "tin"
|
||
PRISM = "prism"
|
||
|
||
@dataclass
|
||
class EarthworkResult3dTiles:
|
||
"""土方量计算结果"""
|
||
cut_volume: float # 挖方量 (m³)
|
||
fill_volume: float # 填方量 (m³)
|
||
net_volume: float # 净方量 (m³)
|
||
area: float # 计算区域面积 (m²)
|
||
avg_elevation: float # 平均高程
|
||
min_elevation: float # 最低高程
|
||
max_elevation: float # 最高高程
|
||
points_count: int # 使用的点数
|
||
bounding_box: Dict[str, List[float]] # 边界框
|
||
volume_accuracy: float # 计算精度
|
||
algorithm: str # 使用的算法
|
||
resolution: float # 计算分辨率
|
||
|
||
def to_dict(self) -> Dict:
|
||
"""转换为字典"""
|
||
return {
|
||
"volume": {
|
||
"cut": round(self.cut_volume, 8),
|
||
"fill": round(self.fill_volume, 8),
|
||
"net": round(self.net_volume, 8),
|
||
"unit": "m³"
|
||
},
|
||
"area": {
|
||
"value": round(self.area, 8),
|
||
"unit": "m²"
|
||
},
|
||
"elevation": {
|
||
"average": round(self.avg_elevation, 3),
|
||
"min": round(self.min_elevation, 3),
|
||
"max": round(self.max_elevation, 3),
|
||
"unit": "m"
|
||
},
|
||
"statistics": {
|
||
"points_count": self.points_count,
|
||
"accuracy": round(self.volume_accuracy, 3),
|
||
"algorithm": self.algorithm
|
||
},
|
||
"bounding_box": self.bounding_box,
|
||
"calculation_params": {
|
||
"resolution": self.resolution,
|
||
"accuracy": self.volume_accuracy
|
||
}
|
||
}
|
||
|
||
class TerrainDataSource(ABC):
|
||
"""地形数据源抽象类"""
|
||
|
||
@abstractmethod
|
||
async def get_points_in_polygon(self,
|
||
polygon_coords: List[List[float]],
|
||
z_range: Optional[Tuple[float, float]] = None) -> np.ndarray:
|
||
"""
|
||
获取多边形区域内的点云数据
|
||
|
||
Args:
|
||
polygon_coords: 多边形坐标 [[x1,y1], [x2,y2], ...]
|
||
z_range: 高程范围 (min_z, max_z)
|
||
|
||
Returns:
|
||
Nx3的numpy数组 [x, y, z]
|
||
"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
async def get_data_bounds(self) -> Dict[str, List[float]]:
|
||
"""获取数据范围"""
|
||
pass
|
||
|
||
@abstractmethod
|
||
def get_crs(self) -> str:
|
||
"""获取数据坐标系"""
|
||
pass
|
||
|
||
class GeometryUtils:
|
||
"""地理空间几何计算工具类(支持经纬度坐标)"""
|
||
|
||
def __init__(self, source_crs: str = "EPSG:4326", target_crs: str = "EPSG:3857"):
|
||
"""
|
||
初始化
|
||
Args:
|
||
source_crs: 源坐标系(通常是EPSG:4326)
|
||
target_crs: 目标投影坐标系(用于平面计算)
|
||
"""
|
||
self.source_crs = source_crs
|
||
self.target_crs = target_crs
|
||
self.geod = Geod(ellps="WGS84")
|
||
|
||
# 创建坐标转换器
|
||
self.transformer_to_proj = Transformer.from_crs(
|
||
source_crs, target_crs, always_xy=True
|
||
)
|
||
self.transformer_to_geo = Transformer.from_crs(
|
||
target_crs, source_crs, always_xy=True
|
||
)
|
||
|
||
def calculate_polygon_area(self, polygon_coords: List[List[float]]) -> float:
|
||
"""
|
||
计算多边形的地面实际面积(平方米)
|
||
|
||
Args:
|
||
polygon_coords: 经纬度坐标列表 [[lon1, lat1], ...]
|
||
|
||
Returns:
|
||
面积(平方米)
|
||
"""
|
||
if len(polygon_coords) < 3:
|
||
return 0.0
|
||
|
||
# 确保多边形闭合
|
||
closed_coords = self._ensure_closed_polygon(polygon_coords)
|
||
|
||
# 提取经纬度
|
||
lons = [coord[0] for coord in closed_coords]
|
||
lats = [coord[1] for coord in closed_coords]
|
||
|
||
# 使用测地线计算面积
|
||
area, _ = self.geod.polygon_area_perimeter(lons, lats)
|
||
return abs(area)
|
||
|
||
def is_point_in_polygon(self, point: Tuple[float, float],
|
||
polygon_coords: List[List[float]],
|
||
use_spherical: bool = True) -> bool:
|
||
"""
|
||
判断点是否在多边形内(支持地球表面判断)
|
||
|
||
Args:
|
||
point: 点坐标 (lon, lat)
|
||
polygon_coords: 多边形顶点坐标
|
||
use_spherical: 是否使用球面算法
|
||
|
||
Returns:
|
||
是否在多边形内
|
||
"""
|
||
if len(polygon_coords) < 3:
|
||
return False
|
||
|
||
if use_spherical:
|
||
# 方法1:球面射线法(更准确)
|
||
return self._is_point_in_polygon_spherical(point, polygon_coords)
|
||
else:
|
||
# 方法2:投影到平面后判断(更快)
|
||
return self._is_point_in_polygon_planar(point, polygon_coords)
|
||
|
||
def calculate_triangle_area(self, points: np.ndarray) -> float:
|
||
"""
|
||
计算三角形的地面面积(平方米)
|
||
|
||
Args:
|
||
points: 3×2数组,每行是[lon, lat]
|
||
|
||
Returns:
|
||
三角形地面面积(平方米)
|
||
"""
|
||
if points.shape != (3, 2):
|
||
raise ValueError("需要3个点的坐标")
|
||
|
||
# 转换为球面坐标计算
|
||
lons = points[:, 0]
|
||
lats = points[:, 1]
|
||
|
||
# 使用球面三角形面积公式
|
||
R = 6378137.0 # WGS84地球半径(米)
|
||
|
||
# 转换为弧度
|
||
lon_rad = np.radians(lons)
|
||
lat_rad = np.radians(lats)
|
||
|
||
# 计算球面三角形的面积
|
||
# 使用L'Huilier公式
|
||
a = self._spherical_distance(lon_rad[0], lat_rad[0], lon_rad[1], lat_rad[1])
|
||
b = self._spherical_distance(lon_rad[1], lat_rad[1], lon_rad[2], lat_rad[2])
|
||
c = self._spherical_distance(lon_rad[2], lat_rad[2], lon_rad[0], lat_rad[0])
|
||
|
||
s = (a + b + c) / 2
|
||
|
||
# 防止数值误差
|
||
tan_e2 = np.tan(s/2) * np.tan((s-a)/2) * np.tan((s-b)/2) * np.tan((s-c)/2)
|
||
tan_e2 = max(tan_e2, 0) # 避免负值
|
||
|
||
if tan_e2 > 0:
|
||
E = 4 * np.arctan(np.sqrt(tan_e2))
|
||
else:
|
||
E = 0
|
||
|
||
area = R * R * E
|
||
return area
|
||
|
||
def create_grid(self, polygon_coords: List[List[float]],
|
||
resolution_m: float,
|
||
use_projection: bool = True) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""
|
||
创建规则格网(地面距离为单位的网格)
|
||
|
||
Args:
|
||
polygon_coords: 多边形坐标
|
||
resolution_m: 网格分辨率(米)
|
||
use_projection: 是否使用投影坐标系
|
||
|
||
Returns:
|
||
xx, yy: 网格坐标
|
||
grid_coords_geo: 网格点的地理坐标
|
||
"""
|
||
if use_projection:
|
||
# 方法1:投影到平面坐标系创建网格
|
||
return self._create_grid_projected(polygon_coords, resolution_m)
|
||
else:
|
||
# 方法2:直接在经纬度上创建近似网格(小区域可用)
|
||
return self._create_grid_geographic(polygon_coords, resolution_m)
|
||
|
||
def interpolate_grid(self, xx: np.ndarray, yy: np.ndarray,
|
||
points: np.ndarray,
|
||
method: str = 'linear',
|
||
return_geo: bool = False) -> np.ndarray:
|
||
"""
|
||
格网插值
|
||
|
||
Args:
|
||
xx, yy: 网格坐标(投影坐标系)
|
||
points: 已知点,每行是[lon, lat, elevation]或[x_proj, y_proj, elevation]
|
||
method: 插值方法 'linear' 或 'cubic'
|
||
return_geo: 是否返回地理坐标
|
||
|
||
Returns:
|
||
插值后的高程网格
|
||
"""
|
||
# 确保points是投影坐标
|
||
if points.shape[1] != 3:
|
||
raise ValueError("points应为3列: x, y, z")
|
||
|
||
# 如果输入是地理坐标,转换为投影坐标
|
||
if np.max(np.abs(points[:, 0])) > 180: # 粗略判断
|
||
# 已经是投影坐标
|
||
points_proj = points
|
||
else:
|
||
# 转换为投影坐标
|
||
x_proj, y_proj = self.transformer_to_proj.transform(
|
||
points[:, 0], points[:, 1]
|
||
)
|
||
points_proj = np.column_stack([x_proj, y_proj, points[:, 2]])
|
||
|
||
# 创建插值器
|
||
if method == 'linear':
|
||
interpolator = LinearNDInterpolator(
|
||
points_proj[:, :2],
|
||
points_proj[:, 2],
|
||
fill_value=np.nan
|
||
)
|
||
elif method == 'cubic':
|
||
interpolator = CloughTocher2DInterpolator(
|
||
points_proj[:, :2],
|
||
points_proj[:, 2],
|
||
fill_value=np.nan
|
||
)
|
||
else:
|
||
raise ValueError(f"不支持的插值方法: {method}")
|
||
|
||
# 插值
|
||
grid_points = np.column_stack([xx.ravel(), yy.ravel()])
|
||
elevations = interpolator(grid_points)
|
||
result = elevations.reshape(xx.shape)
|
||
|
||
if return_geo:
|
||
# 如果需要,将网格点转回地理坐标
|
||
lon_grid, lat_grid = self.transformer_to_geo.transform(
|
||
xx.ravel(), yy.ravel()
|
||
)
|
||
lon_grid = lon_grid.reshape(xx.shape)
|
||
lat_grid = lat_grid.reshape(xx.shape)
|
||
return result, lon_grid, lat_grid
|
||
|
||
return result
|
||
|
||
# ============ 私有方法 ============
|
||
|
||
def _ensure_closed_polygon(self, coords: List[List[float]]) -> List[List[float]]:
|
||
"""确保多边形闭合"""
|
||
if len(coords) >= 3:
|
||
# 使用 numpy 比较
|
||
if not np.array_equal(coords[0], coords[-1]):
|
||
return coords + [coords[0]]
|
||
return coords
|
||
|
||
def _spherical_distance(self, lon1_rad: float, lat1_rad: float,
|
||
lon2_rad: float, lat2_rad: float) -> float:
|
||
"""计算球面两点间角距离"""
|
||
dlon = lon2_rad - lon1_rad
|
||
dlat = lat2_rad - lat1_rad
|
||
|
||
a = np.sin(dlat/2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon/2)**2
|
||
return 2 * np.arcsin(np.sqrt(a))
|
||
|
||
def _is_point_in_polygon_spherical(self, point: Tuple[float, float],
|
||
polygon_coords: List[List[float]]) -> bool:
|
||
"""球面射线法判断点是否在多边形内"""
|
||
lon_p, lat_p = point
|
||
closed_polygon = self._ensure_closed_polygon(polygon_coords)
|
||
|
||
# 将多边形的边转换为球面大圆弧
|
||
crossings = 0
|
||
n = len(closed_polygon) - 1
|
||
|
||
for i in range(n):
|
||
lon1, lat1 = closed_polygon[i]
|
||
lon2, lat2 = closed_polygon[i + 1]
|
||
|
||
# 检查射线是否与边相交(近似算法)
|
||
# 简化:使用平面近似,对小区域足够准确
|
||
if ((lat1 > lat_p) != (lat2 > lat_p)) and \
|
||
(lon_p < (lon2 - lon1) * (lat_p - lat1) / (lat2 - lat1) + lon1):
|
||
crossings += 1
|
||
|
||
return crossings % 2 == 1
|
||
|
||
def _is_point_in_polygon_planar(self, point: Tuple[float, float],
|
||
polygon_coords: List[List[float]]) -> bool:
|
||
"""投影到平面后判断"""
|
||
# 转换为投影坐标
|
||
point_proj = np.array(self.transformer_to_proj.transform(point[0], point[1])).reshape(1, 2)
|
||
polygon_proj = np.array([
|
||
self.transformer_to_proj.transform(lon, lat)
|
||
for lon, lat in polygon_coords
|
||
])
|
||
|
||
# 使用平面方法判断
|
||
path = Path(polygon_proj)
|
||
return path.contains_point(point_proj[0])
|
||
|
||
def _create_grid_projected(self, polygon_coords: List[List[float]],
|
||
resolution_m: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""在投影坐标系中创建网格"""
|
||
# 将多边形转换为投影坐标
|
||
polygon_proj = []
|
||
for lon, lat in polygon_coords:
|
||
x, y = self.transformer_to_proj.transform(lon, lat)
|
||
polygon_proj.append([x, y])
|
||
|
||
polygon_proj = np.array(polygon_proj)
|
||
|
||
# 计算边界框
|
||
x_min, y_min = polygon_proj.min(axis=0)
|
||
x_max, y_max = polygon_proj.max(axis=0)
|
||
|
||
# 扩展半个网格
|
||
x_min -= resolution_m / 2
|
||
x_max += resolution_m / 2
|
||
y_min -= resolution_m / 2
|
||
y_max += resolution_m / 2
|
||
|
||
# 创建网格
|
||
x_grid = np.arange(x_min, x_max + resolution_m, resolution_m)
|
||
y_grid = np.arange(y_min, y_max + resolution_m, resolution_m)
|
||
xx, yy = np.meshgrid(x_grid, y_grid)
|
||
|
||
# 将网格点转回地理坐标
|
||
grid_coords_geo = []
|
||
for x, y in zip(xx.ravel(), yy.ravel()):
|
||
lon, lat = self.transformer_to_geo.transform(x, y)
|
||
grid_coords_geo.append([lon, lat])
|
||
grid_coords_geo = np.array(grid_coords_geo).reshape(xx.shape[0], xx.shape[1], 2)
|
||
|
||
return xx, yy, grid_coords_geo
|
||
|
||
def _create_grid_geographic(self, polygon_coords: List[List[float]],
|
||
resolution_m: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""在经纬度坐标系中创建近似网格"""
|
||
# 计算中心点
|
||
lons = [coord[0] for coord in polygon_coords]
|
||
lats = [coord[1] for coord in polygon_coords]
|
||
center_lon = np.mean(lons)
|
||
center_lat = np.mean(lats)
|
||
|
||
# 计算经纬度到米的换算系数
|
||
lat_rad = np.radians(center_lat)
|
||
meters_per_degree_lon = 111319.9 * np.cos(lat_rad)
|
||
meters_per_degree_lat = 111000.0
|
||
|
||
# 计算边界框(米)
|
||
x_min_m, x_max_m, y_min_m, y_max_m = 1e9, -1e9, 1e9, -1e9
|
||
for lon, lat in polygon_coords:
|
||
x_m = (lon - center_lon) * meters_per_degree_lon
|
||
y_m = (lat - center_lat) * meters_per_degree_lat
|
||
x_min_m = min(x_min_m, x_m)
|
||
x_max_m = max(x_max_m, x_m)
|
||
y_min_m = min(y_min_m, y_m)
|
||
y_max_m = max(y_max_m, y_m)
|
||
|
||
# 扩展半个网格
|
||
x_min_m -= resolution_m / 2
|
||
x_max_m += resolution_m / 2
|
||
y_min_m -= resolution_m / 2
|
||
y_max_m += resolution_m / 2
|
||
|
||
# 创建网格(米)
|
||
x_grid_m = np.arange(x_min_m, x_max_m + resolution_m, resolution_m)
|
||
y_grid_m = np.arange(y_min_m, y_max_m + resolution_m, resolution_m)
|
||
|
||
# 转换为经纬度
|
||
x_grid_lon = center_lon + x_grid_m / meters_per_degree_lon
|
||
y_grid_lat = center_lat + y_grid_m / meters_per_degree_lat
|
||
|
||
xx, yy = np.meshgrid(x_grid_lon, y_grid_lat)
|
||
|
||
# 网格坐标(经纬度)
|
||
grid_coords_geo = np.dstack([xx, yy])
|
||
|
||
return xx, yy, grid_coords_geo
|
||
|
||
def get_polygon_bounds(self, polygon_coords: List[List[float]],
|
||
in_meters: bool = False) -> dict:
|
||
"""
|
||
获取多边形边界信息
|
||
|
||
Args:
|
||
polygon_coords: 多边形坐标
|
||
in_meters: 是否返回米为单位
|
||
|
||
Returns:
|
||
边界信息字典
|
||
"""
|
||
lons = [coord[0] for coord in polygon_coords]
|
||
lats = [coord[1] for coord in polygon_coords]
|
||
|
||
bounds = {
|
||
'min_lon': min(lons),
|
||
'max_lon': max(lons),
|
||
'min_lat': min(lats),
|
||
'max_lat': max(lats),
|
||
'center_lon': (min(lons) + max(lons)) / 2,
|
||
'center_lat': (min(lats) + max(lats)) / 2
|
||
}
|
||
|
||
if in_meters:
|
||
# 计算实际尺寸(米)
|
||
center_lat = bounds['center_lat']
|
||
lat_rad = np.radians(center_lat)
|
||
meters_per_degree_lon = 111319.9 * np.cos(lat_rad)
|
||
meters_per_degree_lat = 111000.0
|
||
|
||
bounds['width_m'] = (bounds['max_lon'] - bounds['min_lon']) * meters_per_degree_lon
|
||
bounds['height_m'] = (bounds['max_lat'] - bounds['min_lat']) * meters_per_degree_lat
|
||
bounds['area_m2'] = self.calculate_polygon_area(polygon_coords)
|
||
|
||
return bounds
|
||
|
||
class EarthworkCalculator3dTiles:
|
||
"""土方量计算器"""
|
||
|
||
def __init__(self, data_source: TerrainDataSource):
|
||
"""
|
||
初始化计算器
|
||
|
||
Args:
|
||
data_source: 地形数据源
|
||
"""
|
||
self.data_source = data_source
|
||
self.geometryUtils = GeometryUtils()
|
||
self._transformer_cache = {}
|
||
|
||
async def calculate(self,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float,
|
||
algorithm: AlgorithmType = AlgorithmType.TIN,
|
||
resolution: float = 1.0,
|
||
target_crs: str = "EPSG:4326",
|
||
interpolation_method: str = 'linear') -> EarthworkResult3dTiles:
|
||
"""
|
||
计算土方量
|
||
|
||
Args:
|
||
polygon_coords: 多边形坐标
|
||
design_elevation: 设计高程
|
||
algorithm: 计算算法
|
||
resolution: 格网分辨率(米)
|
||
target_crs: 目标坐标系
|
||
interpolation_method: 插值方法
|
||
|
||
Returns:
|
||
EarthworkResult: 计算结果
|
||
"""
|
||
try:
|
||
# 1. 获取数据
|
||
points = await self.data_source.get_points_in_polygon(polygon_coords)
|
||
|
||
if points.size == 0:
|
||
raise ValueError("区域内没有找到顶点数据")
|
||
|
||
# 2. 坐标转换
|
||
points = await self._transform_coordinates(points, target_crs)
|
||
|
||
# 3. 执行计算
|
||
if algorithm == AlgorithmType.GRID:
|
||
result = await self._calculate_grid(points, polygon_coords,
|
||
design_elevation, resolution,
|
||
interpolation_method)
|
||
elif algorithm == AlgorithmType.TIN:
|
||
result = await self._calculate_tin(points, polygon_coords,
|
||
design_elevation)
|
||
elif algorithm == AlgorithmType.PRISM:
|
||
result = await self._calculate_prism(points, polygon_coords,
|
||
design_elevation, resolution)
|
||
else:
|
||
raise ValueError(f"不支持的算法: {algorithm}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"土方量计算失败: {str(e)}")
|
||
raise
|
||
|
||
async def _transform_coordinates(self, points: np.ndarray,
|
||
target_crs: str) -> np.ndarray:
|
||
"""坐标转换"""
|
||
source_crs = self.data_source.get_crs()
|
||
|
||
if source_crs == target_crs:
|
||
return points
|
||
|
||
cache_key = f"{source_crs}->{target_crs}"
|
||
if cache_key not in self._transformer_cache:
|
||
self._transformer_cache[cache_key] = Transformer.from_crs(
|
||
CRS.from_string(source_crs),
|
||
CRS.from_string(target_crs),
|
||
always_xy=True
|
||
)
|
||
|
||
transformer = self._transformer_cache[cache_key]
|
||
points_2d = transformer.transform(points[:, 0], points[:, 1])
|
||
|
||
return np.column_stack([points_2d[0], points_2d[1], points[:, 2]])
|
||
|
||
async def _calculate_grid(self, points: np.ndarray,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float,
|
||
resolution: float,
|
||
interpolation_method: str) -> EarthworkResult3dTiles:
|
||
"""格网法计算"""
|
||
polygon_np = np.array(polygon_coords)
|
||
|
||
# 创建格网
|
||
xx, yy, x_grid, y_grid = self.geometryUtils.create_grid(polygon_np, resolution)
|
||
|
||
# 插值
|
||
natural_elevations = self.geometryUtils.interpolate_grid(xx, yy, points, interpolation_method)
|
||
|
||
# 初始化挖填量
|
||
cut_volume = 0.0
|
||
fill_volume = 0.0
|
||
total_area = 0.0
|
||
|
||
# 遍历每个格网单元
|
||
for i in range(len(x_grid) - 1):
|
||
for j in range(len(y_grid) - 1):
|
||
# 格网四个角点
|
||
cell_corners = np.array([
|
||
[x_grid[i], y_grid[j]],
|
||
[x_grid[i+1], y_grid[j]],
|
||
[x_grid[i+1], y_grid[j+1]],
|
||
[x_grid[i], y_grid[j+1]]
|
||
])
|
||
|
||
# 检查格网中心点是否在多边形内
|
||
cell_center = cell_corners.mean(axis=0)
|
||
if not self.geometryUtils.is_point_in_polygon(cell_center, polygon_np):
|
||
continue
|
||
|
||
# 获取格网四个角点的高程
|
||
cell_elevations = [
|
||
natural_elevations[j, i],
|
||
natural_elevations[j, i+1],
|
||
natural_elevations[j+1, i+1],
|
||
natural_elevations[j+1, i]
|
||
]
|
||
|
||
# 检查是否有无效数据
|
||
if any(np.isnan(elev) for elev in cell_elevations):
|
||
continue
|
||
|
||
# 计算格网平均高程
|
||
avg_elevation = np.mean(cell_elevations)
|
||
cell_area = resolution * resolution
|
||
total_area += cell_area
|
||
|
||
# 计算挖填量
|
||
height_diff = design_elevation - avg_elevation
|
||
if height_diff > 0:
|
||
fill_volume += height_diff * cell_area
|
||
else:
|
||
cut_volume += abs(height_diff) * cell_area
|
||
|
||
# 计算统计信息
|
||
area = self.geometryUtils.calculate_polygon_area(polygon_coords)
|
||
mask = ~np.isnan(natural_elevations)
|
||
valid_elevations = natural_elevations[mask]
|
||
|
||
return EarthworkResult3dTiles(
|
||
cut_volume=cut_volume,
|
||
fill_volume=fill_volume,
|
||
net_volume=cut_volume - fill_volume,
|
||
area=area,
|
||
avg_elevation=np.mean(valid_elevations) if valid_elevations.size > 0 else 0,
|
||
min_elevation=np.min(valid_elevations) if valid_elevations.size > 0 else 0,
|
||
max_elevation=np.max(valid_elevations) if valid_elevations.size > 0 else 0,
|
||
points_count=points.shape[0],
|
||
bounding_box={
|
||
"min": [x_grid[0], y_grid[0]],
|
||
"max": [x_grid[-1], y_grid[-1]]
|
||
},
|
||
volume_accuracy=self._calculate_accuracy(points, resolution),
|
||
algorithm=AlgorithmType.GRID.value,
|
||
resolution=resolution
|
||
).to_dict()
|
||
|
||
async def _calculate_tin(self, points: np.ndarray,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float) -> EarthworkResult3dTiles:
|
||
"""三角网法计算"""
|
||
polygon_np = np.array(polygon_coords)
|
||
|
||
# 创建Delaunay三角网
|
||
triangulation = Delaunay(points[:, :2])
|
||
|
||
# 筛选多边形内的三角形
|
||
cut_volume = 0.0
|
||
fill_volume = 0.0
|
||
total_area = 0.0
|
||
|
||
for simplex in triangulation.simplices:
|
||
triangle_points = points[simplex]
|
||
triangle_center = triangle_points.mean(axis=0)[:2]
|
||
|
||
# 检查三角形中心是否在多边形内
|
||
if not self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np):
|
||
continue
|
||
|
||
# 计算三角形面积
|
||
area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2])
|
||
if math.isnan(area) :
|
||
continue
|
||
total_area += area
|
||
|
||
# 计算平均高程(使用三个顶点的高程)
|
||
avg_elevation = triangle_points[:, 2].mean()
|
||
|
||
# 计算挖填量
|
||
height_diff = design_elevation - avg_elevation
|
||
if height_diff > 0:
|
||
fill_volume += height_diff * area
|
||
else:
|
||
cut_volume += abs(height_diff) * area
|
||
# if math.isnan(cut_volume) :
|
||
# print("cut_volume变为nan")
|
||
# if math.isnan(fill_volume) :
|
||
# print("fill_volume变为nan")
|
||
|
||
# 计算统计信息
|
||
area = self.geometryUtils.calculate_polygon_area(polygon_coords)
|
||
|
||
return EarthworkResult3dTiles(
|
||
cut_volume=cut_volume,
|
||
fill_volume=fill_volume,
|
||
net_volume=cut_volume - fill_volume,
|
||
area=area,
|
||
avg_elevation=points[:, 2].mean(),
|
||
min_elevation=points[:, 2].min(),
|
||
max_elevation=points[:, 2].max(),
|
||
points_count=points.shape[0],
|
||
bounding_box={
|
||
"min": points.min(axis=0)[:2].tolist(),
|
||
"max": points.max(axis=0)[:2].tolist()
|
||
},
|
||
volume_accuracy=self._calculate_accuracy(points, 0),
|
||
algorithm=AlgorithmType.TIN.value,
|
||
resolution=0
|
||
).to_dict()
|
||
|
||
async def _calculate_prism(self, points: np.ndarray,
|
||
polygon_coords: List[List[float]],
|
||
design_elevation: float,
|
||
resolution: float) -> EarthworkResult3dTiles:
|
||
"""三棱柱法计算"""
|
||
# 先创建TIN
|
||
polygon_np = np.array(polygon_coords)
|
||
triangulation = Delaunay(points[:, :2])
|
||
|
||
cut_volume = 0.0
|
||
fill_volume = 0.0
|
||
total_area = 0.0
|
||
|
||
for simplex in triangulation.simplices:
|
||
triangle_points = points[simplex]
|
||
triangle_center = triangle_points.mean(axis=0)[:2]
|
||
|
||
if not self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np):
|
||
continue
|
||
|
||
# 计算三角形面积
|
||
area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2])
|
||
total_area += area
|
||
|
||
# 对于每个三角形,计算三棱柱体积
|
||
# 这里简化处理,使用梯形公式
|
||
for i in range(3):
|
||
j = (i + 1) % 3
|
||
# 计算边的长度
|
||
edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2])
|
||
|
||
# 计算边两端点的挖填高度
|
||
height_i = design_elevation - triangle_points[i, 2]
|
||
height_j = design_elevation - triangle_points[j, 2]
|
||
|
||
# 计算边的平均挖填高度
|
||
avg_height = (abs(height_i) + abs(height_j)) / 2
|
||
|
||
# 计算边的面积(假设边宽度为resolution)
|
||
edge_area = edge_length * resolution
|
||
|
||
if height_i > 0 or height_j > 0:
|
||
fill_volume += avg_height * edge_area
|
||
else:
|
||
cut_volume += avg_height * edge_area
|
||
|
||
area = self.geometryUtils.calculate_polygon_area(polygon_coords)
|
||
|
||
return EarthworkResult3dTiles(
|
||
cut_volume=cut_volume,
|
||
fill_volume=fill_volume,
|
||
net_volume=cut_volume - fill_volume,
|
||
area=area,
|
||
avg_elevation=points[:, 2].mean(),
|
||
min_elevation=points[:, 2].min(),
|
||
max_elevation=points[:, 2].max(),
|
||
points_count=points.shape[0],
|
||
bounding_box={
|
||
"min": points.min(axis=0)[:2].tolist(),
|
||
"max": points.max(axis=0)[:2].tolist()
|
||
},
|
||
volume_accuracy=self._calculate_accuracy(points, resolution),
|
||
algorithm=AlgorithmType.PRISM.value,
|
||
resolution=resolution
|
||
).to_dict()
|
||
|
||
def _calculate_accuracy(self, points: np.ndarray, resolution: float) -> float:
|
||
"""计算精度评估"""
|
||
if points.shape[0] < 10:
|
||
return 0.5
|
||
|
||
# 基于点密度和分辨率估算精度
|
||
if points.shape[0] > 0:
|
||
# 计算点密度
|
||
bounds = points[:, :2]
|
||
area = (bounds[:, 0].max() - bounds[:, 0].min()) * \
|
||
(bounds[:, 1].max() - bounds[:, 1].min())
|
||
|
||
if area > 0:
|
||
point_density = points.shape[0] / area
|
||
|
||
if point_density > 100 and resolution <= 0.5: # 高密度,高分辨率
|
||
return 0.05
|
||
elif point_density > 10 and resolution <= 1.0:
|
||
return 0.1
|
||
elif point_density > 1:
|
||
return 0.2
|
||
|
||
return 0.3
|
||
|
||
async def validate(self, polygon_coords: List[List[float]]) -> Dict:
|
||
"""验证计算参数"""
|
||
try:
|
||
points = await self.data_source.get_points_in_polygon(polygon_coords)
|
||
|
||
validation_result = {
|
||
"polygon_valid": len(polygon_coords) >= 3,
|
||
"area": self.geometryUtils.calculate_polygon_area(polygon_coords),
|
||
"points_available": points.size > 0,
|
||
"points_count": points.shape[0] if points.size > 0 else 0,
|
||
"data_quality": "good" if points.shape[0] > 100 else "poor",
|
||
"suggested_algorithm": self._suggest_algorithm(points),
|
||
"estimated_accuracy": self._calculate_accuracy(points, 1.0) if points.size > 0 else None
|
||
}
|
||
|
||
if points.size > 0:
|
||
validation_result.update({
|
||
"elevation_range": {
|
||
"min": float(points[:, 2].min()),
|
||
"max": float(points[:, 2].max()),
|
||
"avg": float(points[:, 2].mean())
|
||
},
|
||
"bounding_box": {
|
||
"min": points.min(axis=0).tolist(),
|
||
"max": points.max(axis=0).tolist()
|
||
}
|
||
})
|
||
|
||
return validation_result
|
||
|
||
except Exception as e:
|
||
logger.error(f"验证失败: {str(e)}")
|
||
return {
|
||
"polygon_valid": False,
|
||
"error": str(e)
|
||
}
|
||
|
||
def _suggest_algorithm(self, points: np.ndarray) -> str:
|
||
"""建议计算算法"""
|
||
if points.shape[0] < 100:
|
||
return AlgorithmType.TIN.value
|
||
|
||
# 计算点的规则性
|
||
bounds = points[:, :2]
|
||
x_range = bounds[:, 0].max() - bounds[:, 0].min()
|
||
y_range = bounds[:, 1].max() - bounds[:, 1].min()
|
||
|
||
# 如果点分布相对规则,建议使用格网法
|
||
if x_range > 0 and y_range > 0:
|
||
aspect_ratio = max(x_range, y_range) / min(x_range, y_range)
|
||
if aspect_ratio < 2: # 相对规则
|
||
return AlgorithmType.GRID.value
|
||
|
||
return AlgorithmType.TIN.value |