ai_project_v1/b3dm/earthwork_calculator_3d_tiles.py

1647 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# earthwork_calculator.py
import numpy as np
from pyproj import Transformer, CRS
from scipy.spatial import Delaunay
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Union, Any
import logging
from enum import Enum
from abc import ABC, abstractmethod
import math
from pyproj import Geod
from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator
from matplotlib.path import Path
logger = logging.getLogger(__name__)
class AlgorithmType(Enum):
"""计算算法类型"""
GRID = "grid"
TIN = "tin"
PRISM = "prism"
@dataclass
class GridCellData:
"""单个网格单元的数据"""
i: int # x方向索引
j: int # y方向索引
x_min: float # 网格最小x坐标
x_max: float # 网格最大x坐标
y_min: float # 网格最小y坐标
y_max: float # 网格最大y坐标
cut_volume: float # 挖方量
fill_volume: float # 填方量
net_volume: float # 净方量
avg_elevation: float # 平均自然高程
design_elevation: float # 设计高程
height_diff: float # 设计高程与自然高程差
area: float # 网格面积
is_valid: bool # 是否有效
corner_elevations: List[float] = field(default_factory=list) # 四个角点高程
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'i': self.i,
'j': self.j,
'x_min': self.x_min,
'x_max': self.x_max,
'y_min': self.y_min,
'y_max': self.y_max,
'cut_volume': self.cut_volume,
'fill_volume': self.fill_volume,
'net_volume': self.net_volume,
'avg_elevation': self.avg_elevation,
'design_elevation': self.design_elevation,
'height_diff': self.height_diff,
'area': self.area,
'is_valid': self.is_valid,
'corner_elevations': self.corner_elevations
}
@dataclass
class TriangleData:
"""单个三角形单元的数据"""
triangle_id: int # 三角形ID
vertices: List[Tuple[float, float, float]] # 三个顶点坐标(x,y,z)
vertex_indices: List[int] # 顶点索引
cut_volume: float # 挖方量
fill_volume: float # 填方量
net_volume: float # 净方量
avg_elevation: float # 平均自然高程
design_elevation: float # 设计高程
height_diff: float # 设计高程与自然高程差
area: float # 三角形面积
is_valid: bool # 是否有效
center_point: Tuple[float, float] # 三角形中心点
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'triangle_id': self.triangle_id,
'vertices': self.vertices,
'vertex_indices': self.vertex_indices,
'cut_volume': self.cut_volume,
'fill_volume': self.fill_volume,
'net_volume': self.net_volume,
'avg_elevation': self.avg_elevation,
'design_elevation': self.design_elevation,
'height_diff': self.height_diff,
'area': self.area,
'is_valid': self.is_valid,
'center_point': self.center_point
}
@dataclass
class PrismData:
"""单个三棱柱单元的数据"""
triangle_id: int # 三角形ID
vertices: List[Tuple[float, float, float]] # 三个顶点坐标(x,y,z)
vertex_indices: List[int] # 顶点索引
cut_volume: float # 挖方量
fill_volume: float # 填方量
net_volume: float # 净方量
avg_elevation: float # 平均自然高程
design_elevation: float # 设计高程
height_diffs: List[float] # 三个顶点的高程差
area: float # 三角形面积
is_valid: bool # 是否有效
center_point: Tuple[float, float] # 三角形中心点
edge_volumes: List[float] = field(default_factory=list) # 三个边的体积贡献
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'triangle_id': self.triangle_id,
'vertices': self.vertices,
'vertex_indices': self.vertex_indices,
'cut_volume': self.cut_volume,
'fill_volume': self.fill_volume,
'net_volume': self.net_volume,
'avg_elevation': self.avg_elevation,
'design_elevation': self.design_elevation,
'height_diffs': self.height_diffs,
'area': self.area,
'is_valid': self.is_valid,
'center_point': self.center_point,
'edge_volumes': self.edge_volumes
}
@dataclass
class GridCellDifference:
"""网格单元差异数据"""
i: int # x方向索引
j: int # y方向索引
x_min: float # 网格最小x坐标
x_max: float # 网格最大x坐标
y_min: float # 网格最小y坐标
y_max: float # 网格最大y坐标
cut_volume: float # 挖方量差值 (b-a)
fill_volume: float # 填方量差值 (b-a)
net_volume: float # 净方量差值 (b-a)
avg_elevation: float # 平均自然高程差值 (b-a)
height_diff: float # 高程差变化 (b-a)
is_valid: bool # 是否有效(两个网格都有效)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
'i': self.i,
'j': self.j,
'x_min': self.x_min,
'x_max': self.x_max,
'y_min': self.y_min,
'y_max': self.y_max,
'cut_volume': self.cut_volume,
'fill_volume': self.fill_volume,
'net_volume': self.net_volume,
'avg_elevation': self.avg_elevation,
'height_diff': self.height_diff,
'is_valid': self.is_valid
}
@dataclass
class EarthworkComparisonResult:
"""土方量比较结果"""
# 总体差异统计
total_cut_volume: float # 总挖方量差值
total_fill_volume: float # 总填方量差值
total_net_volume: float # 总净方量差值
# 网格单元差异数据
grid_differences: List[Dict[str, Any]]
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"total_differences": {
"cut_volume": round(self.total_cut_volume, 8),
"fill_volume": round(self.total_fill_volume, 8),
"net_volume": round(self.total_net_volume, 8),
},
"grid_differences": self.grid_differences
}
@dataclass
class EarthworkResult3dTiles:
"""土方量计算结果"""
cut_volume: float # 挖方量 (m³)
fill_volume: float # 填方量 (m³)
net_volume: float # 净方量 (m³)
area: float # 计算区域面积 (m²)
avg_elevation: float # 平均高程
min_elevation: float # 最低高程
max_elevation: float # 最高高程
points_count: int # 使用的点数
bounding_box: Dict[str, List[float]] # 边界框
volume_accuracy: float # 计算精度
algorithm: str # 使用的算法
resolution: float # 计算分辨率
# 新增属性 - 计算单元详细数据
calculation_details: List[Dict[str, Any]] = field(default_factory=list) # 所有计算单元的详细数据
details_type: str = "" # 计算单元类型: "grid_cells", "triangles", "prisms"
details_dimensions: Dict[str, Any] = field(default_factory=dict) # 计算单元维度信息
valid_unit_count: int = 0 # 有效计算单元数量
total_unit_count: int = 0 # 总计算单元数量
# 新增属性 - 统计信息
elevation_statistics: Dict[str, float] = field(default_factory=dict) # 高程统计信息
volume_distribution: Dict[str, Any] = field(default_factory=dict) # 土方量分布信息
def to_dict(self) -> Dict:
"""转换为字典"""
result = {
"volume": {
"cut": round(self.cut_volume, 8),
"fill": round(self.fill_volume, 8),
"net": round(self.net_volume, 8),
"unit": ""
},
"area": {
"value": round(self.area, 8),
"unit": ""
},
"elevation": {
"average": round(self.avg_elevation, 3),
"min": round(self.min_elevation, 3),
"max": round(self.max_elevation, 3),
"unit": "m"
},
"statistics": {
"points_count": self.points_count,
"accuracy": round(self.volume_accuracy, 3),
"algorithm": self.algorithm
},
"bounding_box": self.bounding_box,
"calculation_params": {
"resolution": self.resolution,
"accuracy": self.volume_accuracy
}
}
# 添加新增的属性
if self.calculation_details:
result["calculation_details"] = {
"type": self.details_type,
"dimensions": self.details_dimensions,
"valid_unit_count": self.valid_unit_count,
"total_unit_count": self.total_unit_count,
"data": self.calculation_details
}
if self.elevation_statistics:
result["elevation_statistics"] = self.elevation_statistics
if self.volume_distribution:
result["volume_distribution"] = self.volume_distribution
return result
class TerrainDataSource(ABC):
"""地形数据源抽象类"""
@abstractmethod
async def get_points_in_polygon(self,
polygon_coords: List[List[float]],
z_range: Optional[Tuple[float, float]] = None) -> np.ndarray:
"""
获取多边形区域内的点云数据
Args:
polygon_coords: 多边形坐标 [[x1,y1], [x2,y2], ...]
z_range: 高程范围 (min_z, max_z)
Returns:
Nx3的numpy数组 [x, y, z]
"""
pass
@abstractmethod
async def get_data_bounds(self) -> Dict[str, List[float]]:
"""获取数据范围"""
pass
@abstractmethod
def get_crs(self) -> str:
"""获取数据坐标系"""
pass
class GeometryUtils:
"""地理空间几何计算工具类(支持经纬度坐标)"""
def __init__(self, source_crs: str = "EPSG:4326", target_crs: str = "EPSG:3857"):
"""
初始化
Args:
source_crs: 源坐标系通常是EPSG:4326
target_crs: 目标投影坐标系(用于平面计算)
"""
self.source_crs = source_crs
self.target_crs = target_crs
self.geod = Geod(ellps="WGS84")
# 创建坐标转换器
self.transformer_to_proj = Transformer.from_crs(
source_crs, target_crs, always_xy=True
)
self.transformer_to_geo = Transformer.from_crs(
target_crs, source_crs, always_xy=True
)
def calculate_polygon_area(self, polygon_coords: List[List[float]]) -> float:
"""
计算多边形的地面实际面积(平方米)
Args:
polygon_coords: 经纬度坐标列表 [[lon1, lat1], ...]
Returns:
面积(平方米)
"""
if len(polygon_coords) < 3:
return 0.0
# 确保多边形闭合
closed_coords = self._ensure_closed_polygon(polygon_coords)
# 提取经纬度
lons = [coord[0] for coord in closed_coords]
lats = [coord[1] for coord in closed_coords]
# 使用测地线计算面积
area, _ = self.geod.polygon_area_perimeter(lons, lats)
return abs(area)
def is_point_in_polygon(self, point: Tuple[float, float],
polygon_coords: List[List[float]],
use_spherical: bool = True) -> bool:
"""
判断点是否在多边形内(支持地球表面判断)
Args:
point: 点坐标 (lon, lat)
polygon_coords: 多边形顶点坐标
use_spherical: 是否使用球面算法
Returns:
是否在多边形内
"""
if len(polygon_coords) < 3:
return False
if use_spherical:
# 方法1球面射线法更准确
return self._is_point_in_polygon_spherical(point, polygon_coords)
else:
# 方法2投影到平面后判断更快
return self._is_point_in_polygon_planar(point, polygon_coords)
def calculate_triangle_area(self, points: np.ndarray) -> float:
"""
计算三角形的地面面积(平方米)
Args:
points: 3×2数组每行是[lon, lat]
Returns:
三角形地面面积(平方米)
"""
if points.shape != (3, 2):
raise ValueError("需要3个点的坐标")
# 转换为球面坐标计算
lons = points[:, 0]
lats = points[:, 1]
# 使用球面三角形面积公式
R = 6378137.0 # WGS84地球半径
# 转换为弧度
lon_rad = np.radians(lons)
lat_rad = np.radians(lats)
# 计算球面三角形的面积
# 使用L'Huilier公式
a = self._spherical_distance(lon_rad[0], lat_rad[0], lon_rad[1], lat_rad[1])
b = self._spherical_distance(lon_rad[1], lat_rad[1], lon_rad[2], lat_rad[2])
c = self._spherical_distance(lon_rad[2], lat_rad[2], lon_rad[0], lat_rad[0])
s = (a + b + c) / 2
# 防止数值误差
tan_e2 = np.tan(s/2) * np.tan((s-a)/2) * np.tan((s-b)/2) * np.tan((s-c)/2)
tan_e2 = max(tan_e2, 0) # 避免负值
if tan_e2 > 0:
E = 4 * np.arctan(np.sqrt(tan_e2))
else:
E = 0
area = R * R * E
return area
def create_grid(self, polygon_coords: List[List[float]],
resolution_m: float,
use_projection: bool = True) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
创建规则格网(地面距离为单位的网格)
Args:
polygon_coords: 多边形坐标
resolution_m: 网格分辨率(米)
use_projection: 是否使用投影坐标系
Returns:
xx, yy: 网格坐标
grid_coords_geo: 网格点的地理坐标
"""
if use_projection:
# 方法1投影到平面坐标系创建网格
return self._create_grid_projected(polygon_coords, resolution_m)
else:
# 方法2直接在经纬度上创建近似网格小区域可用
return self._create_grid_geographic(polygon_coords, resolution_m)
def interpolate_grid(self, xx: np.ndarray, yy: np.ndarray,
points: np.ndarray,
method: str = 'linear',
return_geo: bool = False) -> np.ndarray:
"""
格网插值
Args:
xx, yy: 网格坐标(投影坐标系)
points: 已知点,每行是[lon, lat, elevation]或[x_proj, y_proj, elevation]
method: 插值方法 'linear''cubic'
return_geo: 是否返回地理坐标
Returns:
插值后的高程网格
"""
# 确保points是投影坐标
if points.shape[1] != 3:
raise ValueError("points应为3列: x, y, z")
# 如果输入是地理坐标,转换为投影坐标
if np.max(np.abs(points[:, 0])) > 180: # 粗略判断
# 已经是投影坐标
points_proj = points
else:
# 转换为投影坐标
x_proj, y_proj = self.transformer_to_proj.transform(
points[:, 0], points[:, 1]
)
points_proj = np.column_stack([x_proj, y_proj, points[:, 2]])
# 创建插值器
if method == 'linear':
interpolator = LinearNDInterpolator(
points_proj[:, :2],
points_proj[:, 2],
fill_value=np.nan
)
elif method == 'cubic':
interpolator = CloughTocher2DInterpolator(
points_proj[:, :2],
points_proj[:, 2],
fill_value=np.nan
)
else:
raise ValueError(f"不支持的插值方法: {method}")
# 插值
grid_points = np.column_stack([xx.ravel(), yy.ravel()])
elevations = interpolator(grid_points)
result = elevations.reshape(xx.shape)
if return_geo:
# 如果需要,将网格点转回地理坐标
lon_grid, lat_grid = self.transformer_to_geo.transform(
xx.ravel(), yy.ravel()
)
lon_grid = lon_grid.reshape(xx.shape)
lat_grid = lat_grid.reshape(xx.shape)
return result, lon_grid, lat_grid
return result
# ============ 私有方法 ============
def _ensure_closed_polygon(self, coords: List[List[float]]) -> List[List[float]]:
"""确保多边形闭合"""
if len(coords) >= 3:
# 使用 numpy 比较
if not np.array_equal(coords[0], coords[-1]):
return coords + [coords[0]]
return coords
def _spherical_distance(self, lon1_rad: float, lat1_rad: float,
lon2_rad: float, lat2_rad: float) -> float:
"""计算球面两点间角距离"""
dlon = lon2_rad - lon1_rad
dlat = lat2_rad - lat1_rad
a = np.sin(dlat/2)**2 + np.cos(lat1_rad) * np.cos(lat2_rad) * np.sin(dlon/2)**2
return 2 * np.arcsin(np.sqrt(a))
def _is_point_in_polygon_spherical(self, point: Tuple[float, float],
polygon_coords: List[List[float]]) -> bool:
"""球面射线法判断点是否在多边形内"""
lon_p, lat_p = point
closed_polygon = self._ensure_closed_polygon(polygon_coords)
# 将多边形的边转换为球面大圆弧
crossings = 0
n = len(closed_polygon) - 1
for i in range(n):
lon1, lat1 = closed_polygon[i]
lon2, lat2 = closed_polygon[i + 1]
# 检查射线是否与边相交(近似算法)
# 简化:使用平面近似,对小区域足够准确
if ((lat1 > lat_p) != (lat2 > lat_p)) and \
(lon_p < (lon2 - lon1) * (lat_p - lat1) / (lat2 - lat1) + lon1):
crossings += 1
return crossings % 2 == 1
def _is_point_in_polygon_planar(self, point: Tuple[float, float],
polygon_coords: List[List[float]]) -> bool:
"""投影到平面后判断"""
# 转换为投影坐标
point_proj = np.array(self.transformer_to_proj.transform(point[0], point[1])).reshape(1, 2)
polygon_proj = np.array([
self.transformer_to_proj.transform(lon, lat)
for lon, lat in polygon_coords
])
# 使用平面方法判断
path = Path(polygon_proj)
return path.contains_point(point_proj[0])
def _create_grid_projected(self, polygon_coords: List[List[float]],
resolution_m: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""在投影坐标系中创建网格"""
# 将多边形转换为投影坐标
polygon_proj = []
for lon, lat in polygon_coords:
x, y = self.transformer_to_proj.transform(lon, lat)
polygon_proj.append([x, y])
polygon_proj = np.array(polygon_proj)
# 计算边界框
x_min, y_min = polygon_proj.min(axis=0)
x_max, y_max = polygon_proj.max(axis=0)
# 扩展半个网格
x_min -= resolution_m / 2
x_max += resolution_m / 2
y_min -= resolution_m / 2
y_max += resolution_m / 2
# 创建网格
x_grid = np.arange(x_min, x_max + resolution_m, resolution_m)
y_grid = np.arange(y_min, y_max + resolution_m, resolution_m)
xx, yy = np.meshgrid(x_grid, y_grid)
# 将网格点转回地理坐标
grid_coords_geo = []
for x, y in zip(xx.ravel(), yy.ravel()):
lon, lat = self.transformer_to_geo.transform(x, y)
grid_coords_geo.append([lon, lat])
grid_coords_geo = np.array(grid_coords_geo).reshape(xx.shape[0], xx.shape[1], 2)
return xx, yy, grid_coords_geo
def _create_grid_geographic(self, polygon_coords: List[List[float]],
resolution_m: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""在经纬度坐标系中创建近似网格"""
# 计算中心点
lons = [coord[0] for coord in polygon_coords]
lats = [coord[1] for coord in polygon_coords]
center_lon = np.mean(lons)
center_lat = np.mean(lats)
# 计算经纬度到米的换算系数
lat_rad = np.radians(center_lat)
meters_per_degree_lon = 111319.9 * np.cos(lat_rad)
meters_per_degree_lat = 111000.0
# 计算边界框(米)
x_min_m, x_max_m, y_min_m, y_max_m = 1e9, -1e9, 1e9, -1e9
for lon, lat in polygon_coords:
x_m = (lon - center_lon) * meters_per_degree_lon
y_m = (lat - center_lat) * meters_per_degree_lat
x_min_m = min(x_min_m, x_m)
x_max_m = max(x_max_m, x_m)
y_min_m = min(y_min_m, y_m)
y_max_m = max(y_max_m, y_m)
# 扩展半个网格
x_min_m -= resolution_m / 2
x_max_m += resolution_m / 2
y_min_m -= resolution_m / 2
y_max_m += resolution_m / 2
# 创建网格(米)
x_grid_m = np.arange(x_min_m, x_max_m + resolution_m, resolution_m)
y_grid_m = np.arange(y_min_m, y_max_m + resolution_m, resolution_m)
# 转换为经纬度
x_grid_lon = center_lon + x_grid_m / meters_per_degree_lon
y_grid_lat = center_lat + y_grid_m / meters_per_degree_lat
xx, yy = np.meshgrid(x_grid_lon, y_grid_lat)
# 网格坐标(经纬度)
grid_coords_geo = np.dstack([xx, yy])
return xx, yy, grid_coords_geo
def get_polygon_bounds(self, polygon_coords: List[List[float]],
in_meters: bool = False) -> dict:
"""
获取多边形边界信息
Args:
polygon_coords: 多边形坐标
in_meters: 是否返回米为单位
Returns:
边界信息字典
"""
lons = [coord[0] for coord in polygon_coords]
lats = [coord[1] for coord in polygon_coords]
bounds = {
'min_lon': min(lons),
'max_lon': max(lons),
'min_lat': min(lats),
'max_lat': max(lats),
'center_lon': (min(lons) + max(lons)) / 2,
'center_lat': (min(lats) + max(lats)) / 2
}
if in_meters:
# 计算实际尺寸(米)
center_lat = bounds['center_lat']
lat_rad = np.radians(center_lat)
meters_per_degree_lon = 111319.9 * np.cos(lat_rad)
meters_per_degree_lat = 111000.0
bounds['width_m'] = (bounds['max_lon'] - bounds['min_lon']) * meters_per_degree_lon
bounds['height_m'] = (bounds['max_lat'] - bounds['min_lat']) * meters_per_degree_lat
bounds['area_m2'] = self.calculate_polygon_area(polygon_coords)
return bounds
class EarthworkCalculator3dTiles:
"""土方量计算器"""
def __init__(self, data_source: TerrainDataSource):
"""
初始化计算器
Args:
data_source: 地形数据源
"""
self.data_source = data_source
self.geometryUtils = GeometryUtils()
self._transformer_cache = {}
def compare_grids_by_coordinate(self,
grids_a: List[Dict[str, Any]],
grids_b: List[Dict[str, Any]]
) -> EarthworkComparisonResult:
"""按坐标匹配比较网格数据"""
# 使用字典按坐标快速查找网格A
grids_a_by_coord = {}
for cell in grids_a:
key = (round(cell.get('x_min', 0), 6), round(cell.get('y_min', 0), 6))
grids_a_by_coord[key] = cell
grid_differences_list = []
total_cut_volume = 0.0
total_fill_volume = 0.0
total_net_volume = 0.0
# 收集有效网格的差异数据用于统计
valid_cut_diffs = []
valid_fill_diffs = []
valid_net_diffs = []
valid_elevation_diffs = []
matched_count = 0
unmatched_count = 0
# 遍历网格B查找匹配的网格A
for cell_b in grids_b:
key = (round(cell_b.get('x_min', 0), 6), round(cell_b.get('y_min', 0), 6))
if key in grids_a_by_coord:
cell_a = grids_a_by_coord[key]
matched_count += 1
# 计算各项差值
cut_volume_diff = cell_b.get('cut_volume', 0) - cell_a.get('cut_volume', 0)
fill_volume_diff = cell_b.get('fill_volume', 0) - cell_a.get('fill_volume', 0)
net_volume_diff = cell_b.get('net_volume', 0) - cell_a.get('net_volume', 0)
# 计算平均高程差值
avg_elevation_a = cell_a.get('avg_elevation', np.nan)
avg_elevation_b = cell_b.get('avg_elevation', np.nan)
avg_elevation_diff = 0.0
if not np.isnan(avg_elevation_a) and not np.isnan(avg_elevation_b):
avg_elevation_diff = avg_elevation_b - avg_elevation_a
# 计算高程差变化
height_diff_a = cell_a.get('height_diff', np.nan)
height_diff_b = cell_b.get('height_diff', np.nan)
height_diff_change = 0.0
if not np.isnan(height_diff_a) and not np.isnan(height_diff_b):
height_diff_change = height_diff_b - height_diff_a
# 检查单元是否有效
is_valid_a = cell_a.get('is_valid', False)
is_valid_b = cell_b.get('is_valid', False)
is_valid = is_valid_a and is_valid_b
# 创建差异单元对象
diff_cell = GridCellDifference(
i=cell_b.get('i', 0),
j=cell_b.get('j', 0),
x_min=cell_b.get('x_min', 0),
x_max=cell_b.get('x_max', 0),
y_min=cell_b.get('y_min', 0),
y_max=cell_b.get('y_max', 0),
cut_volume=cut_volume_diff,
fill_volume=fill_volume_diff,
net_volume=net_volume_diff,
avg_elevation=avg_elevation_diff,
height_diff=height_diff_change,
is_valid=is_valid
)
grid_differences_list.append(diff_cell.to_dict())
# 累加有效网格的总体差异
if is_valid:
total_cut_volume += cut_volume_diff
total_fill_volume += fill_volume_diff
total_net_volume += net_volume_diff
# 收集统计信息
valid_cut_diffs.append(cut_volume_diff)
valid_fill_diffs.append(fill_volume_diff)
valid_net_diffs.append(net_volume_diff)
valid_elevation_diffs.append(avg_elevation_diff)
else:
unmatched_count += 1
# 对于未匹配的网格B创建一个无效的差异单元
grid_differences_list.append({
'i': cell_b.get('i', 0),
'j': cell_b.get('j', 0),
'x_min': cell_b.get('x_min', 0),
'x_max': cell_b.get('x_max', 0),
'y_min': cell_b.get('y_min', 0),
'y_max': cell_b.get('y_max', 0),
'cut_volume': 0.0,
'fill_volume': 0.0,
'net_volume': 0.0,
'avg_elevation': 0.0,
'height_diff': 0.0,
'is_valid': False
})
return EarthworkComparisonResult(
total_cut_volume=total_cut_volume,
total_fill_volume=total_fill_volume,
total_net_volume=total_net_volume,
grid_differences=grid_differences_list
)
def compare_grids_by_index(self,
grids_a: List[Dict[str, Any]],
grids_b: List[Dict[str, Any]]
) -> EarthworkComparisonResult:
"""按索引顺序比较网格数据"""
grid_differences_list = []
total_cut_volume = 0.0
total_fill_volume = 0.0
total_net_volume = 0.0
# 收集有效网格的差异数据用于统计
valid_cut_diffs = []
valid_fill_diffs = []
valid_net_diffs = []
valid_elevation_diffs = []
for i in range(len(grids_a)):
cell_a = grids_a[i]
cell_b = grids_b[i]
# 检查是否是同一个网格单元
if cell_a.get('i') != cell_b.get('i') or cell_a.get('j') != cell_b.get('j'):
print(f"警告: 索引{i}的网格不匹配: A({cell_a.get('i')},{cell_a.get('j')}) vs B({cell_b.get('i')},{cell_b.get('j')})")
# 尝试通过坐标匹配
return self.compare_grids_by_coordinate(grids_a, grids_b)
# 计算各项差值
cut_volume_diff = cell_b.get('cut_volume', 0) - cell_a.get('cut_volume', 0)
fill_volume_diff = cell_b.get('fill_volume', 0) - cell_a.get('fill_volume', 0)
net_volume_diff = cell_b.get('net_volume', 0) - cell_a.get('net_volume', 0)
# 计算平均高程差值
avg_elevation_a = cell_a.get('avg_elevation', np.nan)
avg_elevation_b = cell_b.get('avg_elevation', np.nan)
avg_elevation_diff = 0.0
if not np.isnan(avg_elevation_a) and not np.isnan(avg_elevation_b):
avg_elevation_diff = avg_elevation_b - avg_elevation_a
# 计算高程差变化
height_diff_a = cell_a.get('height_diff', np.nan)
height_diff_b = cell_b.get('height_diff', np.nan)
height_diff_change = 0.0
if not np.isnan(height_diff_a) and not np.isnan(height_diff_b):
height_diff_change = height_diff_b - height_diff_a
# 检查单元是否有效
is_valid_a = cell_a.get('is_valid', False)
is_valid_b = cell_b.get('is_valid', False)
is_valid = is_valid_a and is_valid_b
# 创建差异单元对象
diff_cell = GridCellDifference(
i=cell_a.get('i', 0),
j=cell_a.get('j', 0),
x_min=cell_a.get('x_min', 0),
x_max=cell_a.get('x_max', 0),
y_min=cell_a.get('y_min', 0),
y_max=cell_a.get('y_max', 0),
cut_volume=cut_volume_diff,
fill_volume=fill_volume_diff,
net_volume=net_volume_diff,
avg_elevation=avg_elevation_diff,
height_diff=height_diff_change,
is_valid=is_valid
)
grid_differences_list.append(diff_cell.to_dict())
# 累加有效网格的总体差异
if is_valid:
total_cut_volume += cut_volume_diff
total_fill_volume += fill_volume_diff
total_net_volume += net_volume_diff
# 收集统计信息
valid_cut_diffs.append(cut_volume_diff)
valid_fill_diffs.append(fill_volume_diff)
valid_net_diffs.append(net_volume_diff)
valid_elevation_diffs.append(avg_elevation_diff)
return EarthworkComparisonResult(
total_cut_volume=total_cut_volume,
total_fill_volume=total_fill_volume,
total_net_volume=total_net_volume,
grid_differences=grid_differences_list
)
def compare_grid_cells( self,
grids_a: List[Dict[str, Any]],
grids_b: List[Dict[str, Any]]
) -> EarthworkComparisonResult:
"""
比较两个网格数据集合
Args:
grids_a: A结果的网格数据
grids_b: B结果的网格数据
Returns:
比较结果对象
"""
if not grids_a or not grids_b:
raise ValueError("网格数据不能为空")
if len(grids_a) != len(grids_b):
# 如果网格数量不同,尝试按坐标匹配
print(f"警告: 网格数量不同 (A={len(grids_a)}, B={len(grids_b)}),尝试按坐标匹配")
return self.compare_grids_by_coordinate(grids_a, grids_b)
# 网格数量相同,按顺序比较
return self.compare_grids_by_index(grids_a, grids_b)
async def calculate(self,
polygon_coords: List[List[float]],
design_elevation: float,
algorithm: AlgorithmType = AlgorithmType.TIN,
resolution: float = 1.0,
target_crs: str = "EPSG:4326",
interpolation_method: str = 'linear') -> EarthworkResult3dTiles:
"""
计算土方量
Args:
polygon_coords: 多边形坐标
design_elevation: 设计高程
algorithm: 计算算法
resolution: 格网分辨率(米)
target_crs: 目标坐标系
interpolation_method: 插值方法
Returns:
EarthworkResult: 计算结果
"""
try:
# 1. 获取数据
points = await self.data_source.get_points_in_polygon(polygon_coords)
if points.size == 0:
# raise ValueError("区域内没有找到顶点数据")
raise ValueError("区域太小,请调整区域")
# 2. 坐标转换
points = await self._transform_coordinates(points, target_crs)
# 3. 执行计算
if algorithm == AlgorithmType.GRID:
result = await self._calculate_grid(points, polygon_coords,
design_elevation, resolution,
interpolation_method)
elif algorithm == AlgorithmType.TIN:
result = await self._calculate_tin(points, polygon_coords,
design_elevation)
elif algorithm == AlgorithmType.PRISM:
result = await self._calculate_prism(points, polygon_coords,
design_elevation, resolution)
else:
raise ValueError(f"不支持的算法: {algorithm}")
return result
except Exception as e:
logger.error(f"土方量计算失败: {str(e)}")
raise
async def _transform_coordinates(self, points: np.ndarray,
target_crs: str) -> np.ndarray:
"""坐标转换"""
source_crs = self.data_source.get_crs()
if source_crs == target_crs:
return points
cache_key = f"{source_crs}->{target_crs}"
if cache_key not in self._transformer_cache:
self._transformer_cache[cache_key] = Transformer.from_crs(
CRS.from_string(source_crs),
CRS.from_string(target_crs),
always_xy=True
)
transformer = self._transformer_cache[cache_key]
points_2d = transformer.transform(points[:, 0], points[:, 1])
return np.column_stack([points_2d[0], points_2d[1], points[:, 2]])
async def _calculate_grid(self, points: np.ndarray,
polygon_coords: List[List[float]],
design_elevation: float,
resolution: float,
interpolation_method: str) -> EarthworkResult3dTiles:
"""格网法计算 - 包含网格单元数据"""
polygon_np = np.array(polygon_coords)
# 创建格网
xx, yy, _ = self.geometryUtils.create_grid(polygon_np, resolution)
x_grid = xx[0, :] # 从xx矩阵提取x网格线
y_grid = yy[:, 0] # 从yy矩阵提取y网格线
# 插值
natural_elevations = self.geometryUtils.interpolate_grid(xx, yy, points, interpolation_method)
# 初始化统计信息
cut_volume_total = 0.0
fill_volume_total = 0.0
total_area = 0.0
grid_cells: List[Dict[str, Any]] = []
valid_grid_count = 0
total_grid_count = (len(x_grid) - 1) * (len(y_grid) - 1)
# 收集高程和体积用于统计
valid_elevations = []
cut_volumes_list = []
fill_volumes_list = []
# 遍历每个格网单元
for i in range(len(x_grid) - 1):
for j in range(len(y_grid) - 1):
# 格网四个角点
x_min, x_max = x_grid[i], x_grid[i+1]
y_min, y_max = y_grid[j], y_grid[j+1]
cell_corners = np.array([
[x_min, y_min],
[x_max, y_min],
[x_max, y_max],
[x_min, y_max]
])
# 检查格网中心点是否在多边形内
cell_center = cell_corners.mean(axis=0)
is_in_polygon = self.geometryUtils.is_point_in_polygon(cell_center, polygon_np)
# 获取格网四个角点的高程
corner_elevations = [
float(natural_elevations[j, i]),
float(natural_elevations[j, i+1]),
float(natural_elevations[j+1, i+1]),
float(natural_elevations[j+1, i])
]
# 检查是否有无效数据
has_valid_data = not any(np.isnan(elev) for elev in corner_elevations)
if is_in_polygon and has_valid_data:
# 计算格网平均高程
avg_elevation = np.mean(corner_elevations)
cell_area = resolution * resolution
total_area += cell_area
# 计算挖填量
height_diff = design_elevation - avg_elevation
if height_diff > 0:
fill_volume = height_diff * cell_area
cut_volume = 0.0
fill_volume_total += fill_volume
fill_volumes_list.append(fill_volume)
else:
cut_volume = abs(height_diff) * cell_area
fill_volume = 0.0
cut_volume_total += cut_volume
cut_volumes_list.append(cut_volume)
net_volume = cut_volume - fill_volume
# 收集统计信息
valid_elevations.append(avg_elevation)
valid_grid_count += 1
# 创建网格单元数据对象
grid_cell = {
'i': i,
'j': j,
'x_min': x_min,
'x_max': x_max,
'y_min': y_min,
'y_max': y_max,
'cut_volume': float(cut_volume),
'fill_volume': float(fill_volume),
'net_volume': float(net_volume),
'avg_elevation': float(avg_elevation),
'design_elevation': design_elevation,
'height_diff': float(height_diff),
'area': cell_area,
'is_valid': True,
'corner_elevations': corner_elevations
}
else:
# 创建无效网格单元数据对象
grid_cell = {
'i': i,
'j': j,
'x_min': x_min,
'x_max': x_max,
'y_min': y_min,
'y_max': y_max,
'cut_volume': 0.0,
'fill_volume': 0.0,
'net_volume': 0.0,
'avg_elevation': np.nan,
'design_elevation': design_elevation,
'height_diff': np.nan,
'area': resolution * resolution,
'is_valid': False,
'corner_elevations': corner_elevations
}
grid_cells.append(grid_cell)
# 计算统计信息
area = self.geometryUtils.calculate_polygon_area(polygon_coords)
mask = ~np.isnan(natural_elevations)
valid_elevations_grid = natural_elevations[mask]
# 计算高程统计信息
if valid_elevations:
elevation_stats = {
"std": float(np.std(valid_elevations)),
"median": float(np.median(valid_elevations)),
"q1": float(np.percentile(valid_elevations, 25)),
"q3": float(np.percentile(valid_elevations, 75))
}
else:
elevation_stats = {}
# 计算体积分布信息
volume_distribution = {}
if cut_volumes_list:
volume_distribution["cut"] = {
"max": float(np.max(cut_volumes_list)),
"min": float(np.min(cut_volumes_list)),
"mean": float(np.mean(cut_volumes_list)),
"total_cells": len(cut_volumes_list)
}
if fill_volumes_list:
volume_distribution["fill"] = {
"max": float(np.max(fill_volumes_list)),
"min": float(np.min(fill_volumes_list)),
"mean": float(np.mean(fill_volumes_list)),
"total_cells": len(fill_volumes_list)
}
return EarthworkResult3dTiles(
cut_volume=cut_volume_total,
fill_volume=fill_volume_total,
net_volume=cut_volume_total - fill_volume_total,
area=area,
avg_elevation=np.mean(valid_elevations_grid) if valid_elevations_grid.size > 0 else 0,
min_elevation=np.min(valid_elevations_grid) if valid_elevations_grid.size > 0 else 0,
max_elevation=np.max(valid_elevations_grid) if valid_elevations_grid.size > 0 else 0,
points_count=points.shape[0],
bounding_box={
"min": [x_grid[0], y_grid[0]],
"max": [x_grid[-1], y_grid[-1]]
},
volume_accuracy=self._calculate_accuracy(points, resolution),
algorithm=AlgorithmType.GRID.value,
resolution=resolution,
calculation_details=grid_cells,
details_type="grid_cells",
details_dimensions={
"rows": len(y_grid) - 1,
"cols": len(x_grid) - 1,
"cell_size": resolution
},
valid_unit_count=valid_grid_count,
total_unit_count=total_grid_count,
elevation_statistics=elevation_stats,
volume_distribution=volume_distribution
)
async def _calculate_tin(self, points: np.ndarray,
polygon_coords: List[List[float]],
design_elevation: float) -> EarthworkResult3dTiles:
"""三角网法计算 - 包含三角形单元数据"""
polygon_np = np.array(polygon_coords)
# 创建Delaunay三角网
triangulation = Delaunay(points[:, :2])
# 初始化统计信息
cut_volume_total = 0.0
fill_volume_total = 0.0
total_area = 0.0
triangles: List[Dict[str, Any]] = []
triangle_id = 0
valid_triangle_count = 0
total_triangle_count = len(triangulation.simplices)
# 收集高程和体积用于统计
valid_elevations = []
cut_volumes_list = []
fill_volumes_list = []
triangle_areas = []
# 筛选多边形内的三角形
for simplex in triangulation.simplices:
triangle_points = points[simplex]
triangle_center = triangle_points.mean(axis=0)[:2]
# 检查三角形中心是否在多边形内
is_in_polygon = self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np)
if is_in_polygon:
# 计算三角形面积
area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2])
if math.isnan(area):
# 跳过无效的三角形
triangles.append({
'triangle_id': triangle_id,
'vertices': [tuple(point) for point in triangle_points],
'vertex_indices': simplex.tolist(),
'cut_volume': 0.0,
'fill_volume': 0.0,
'net_volume': 0.0,
'avg_elevation': float(triangle_points[:, 2].mean()),
'design_elevation': design_elevation,
'height_diff': np.nan,
'area': 0.0,
'is_valid': False,
'center_point': tuple(triangle_center)
})
triangle_id += 1
continue
total_area += area
triangle_areas.append(area)
# 计算平均高程(使用三个顶点的高程)
avg_elevation = triangle_points[:, 2].mean()
height_diff = design_elevation - avg_elevation
# 计算挖填量
if height_diff > 0:
fill_volume = height_diff * area
cut_volume = 0.0
fill_volume_total += fill_volume
fill_volumes_list.append(fill_volume)
else:
cut_volume = abs(height_diff) * area
fill_volume = 0.0
cut_volume_total += cut_volume
cut_volumes_list.append(cut_volume)
net_volume = cut_volume - fill_volume
# 收集统计信息
valid_elevations.extend(triangle_points[:, 2].tolist())
valid_triangle_count += 1
# 创建三角形数据对象
triangle = {
'triangle_id': triangle_id,
'vertices': [tuple(point) for point in triangle_points],
'vertex_indices': simplex.tolist(),
'cut_volume': float(cut_volume),
'fill_volume': float(fill_volume),
'net_volume': float(net_volume),
'avg_elevation': float(avg_elevation),
'design_elevation': design_elevation,
'height_diff': float(height_diff),
'area': float(area),
'is_valid': True,
'center_point': tuple(triangle_center)
}
else:
# 创建无效三角形数据对象
area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2])
if math.isnan(area):
area = 0.0
triangle = {
'triangle_id': triangle_id,
'vertices': [tuple(point) for point in triangle_points],
'vertex_indices': simplex.tolist(),
'cut_volume': 0.0,
'fill_volume': 0.0,
'net_volume': 0.0,
'avg_elevation': float(triangle_points[:, 2].mean()),
'design_elevation': design_elevation,
'height_diff': np.nan,
'area': float(area),
'is_valid': False,
'center_point': tuple(triangle_center)
}
triangles.append(triangle)
triangle_id += 1
# 计算统计信息
area = self.geometryUtils.calculate_polygon_area(polygon_coords)
# 计算高程统计信息
if valid_elevations:
elevation_stats = {
"std": float(np.std(valid_elevations)),
"median": float(np.median(valid_elevations)),
"q1": float(np.percentile(valid_elevations, 25)),
"q3": float(np.percentile(valid_elevations, 75))
}
else:
elevation_stats = {}
# 计算体积分布信息
volume_distribution = {}
if cut_volumes_list:
volume_distribution["cut"] = {
"max": float(np.max(cut_volumes_list)),
"min": float(np.min(cut_volumes_list)),
"mean": float(np.mean(cut_volumes_list)),
"total_triangles": len(cut_volumes_list)
}
if fill_volumes_list:
volume_distribution["fill"] = {
"max": float(np.max(fill_volumes_list)),
"min": float(np.min(fill_volumes_list)),
"mean": float(np.mean(fill_volumes_list)),
"total_triangles": len(fill_volumes_list)
}
# 计算三角形面积统计
if triangle_areas:
volume_distribution["area"] = {
"max": float(np.max(triangle_areas)),
"min": float(np.min(triangle_areas)),
"mean": float(np.mean(triangle_areas)),
"total_area": float(np.sum(triangle_areas))
}
return EarthworkResult3dTiles(
cut_volume=cut_volume_total,
fill_volume=fill_volume_total,
net_volume=cut_volume_total - fill_volume_total,
area=area,
avg_elevation=points[:, 2].mean(),
min_elevation=points[:, 2].min(),
max_elevation=points[:, 2].max(),
points_count=points.shape[0],
bounding_box={
"min": points.min(axis=0)[:2].tolist(),
"max": points.max(axis=0)[:2].tolist()
},
volume_accuracy=self._calculate_accuracy(points, 0),
algorithm=AlgorithmType.TIN.value,
resolution=0,
calculation_details=triangles,
details_type="triangles",
details_dimensions={
"triangle_count": total_triangle_count,
"valid_triangle_count": valid_triangle_count,
"vertex_count": points.shape[0]
},
valid_unit_count=valid_triangle_count,
total_unit_count=total_triangle_count,
elevation_statistics=elevation_stats,
volume_distribution=volume_distribution
)
async def _calculate_prism(self, points: np.ndarray,
polygon_coords: List[List[float]],
design_elevation: float,
resolution: float) -> EarthworkResult3dTiles:
"""三棱柱法计算 - 包含三棱柱单元数据"""
# 先创建TIN
polygon_np = np.array(polygon_coords)
triangulation = Delaunay(points[:, :2])
cut_volume_total = 0.0
fill_volume_total = 0.0
total_area = 0.0
prisms: List[Dict[str, Any]] = []
triangle_id = 0
valid_prism_count = 0
total_prism_count = len(triangulation.simplices)
# 收集统计信息
valid_elevations = []
cut_volumes_list = []
fill_volumes_list = []
triangle_areas = []
height_diffs_list = []
for simplex in triangulation.simplices:
triangle_points = points[simplex]
triangle_center = triangle_points.mean(axis=0)[:2]
is_in_polygon = self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np)
if is_in_polygon:
# 计算三角形面积
area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2])
if math.isnan(area):
# 跳过无效的三角形
prisms.append({
'triangle_id': triangle_id,
'vertices': [tuple(point) for point in triangle_points],
'vertex_indices': simplex.tolist(),
'cut_volume': 0.0,
'fill_volume': 0.0,
'net_volume': 0.0,
'avg_elevation': float(triangle_points[:, 2].mean()),
'design_elevation': design_elevation,
'height_diffs': [float(design_elevation - triangle_points[i, 2]) for i in range(3)],
'area': 0.0,
'is_valid': False,
'center_point': tuple(triangle_center),
'edge_volumes': [0.0, 0.0, 0.0]
})
triangle_id += 1
continue
total_area += area
triangle_areas.append(area)
avg_elevation = triangle_points[:, 2].mean()
# 计算三棱柱体积
triangle_cut_volume = 0.0
triangle_fill_volume = 0.0
edge_volumes = []
height_diffs = []
for i in range(3):
j = (i + 1) % 3
# 计算边的长度
edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2])
# 计算边两端点的挖填高度
height_i = design_elevation - triangle_points[i, 2]
height_j = design_elevation - triangle_points[j, 2]
height_diffs.extend([float(height_i), float(height_j)])
height_diffs_list.extend([float(height_i), float(height_j)])
# 计算边的平均挖填高度
avg_height = (abs(height_i) + abs(height_j)) / 2
# 计算边的面积(假设边宽度为resolution)
edge_area = edge_length * resolution
edge_volume = avg_height * edge_area
edge_volumes.append(float(edge_volume))
if height_i > 0 or height_j > 0:
triangle_fill_volume += edge_volume
fill_volume_total += edge_volume
fill_volumes_list.append(edge_volume)
else:
triangle_cut_volume += edge_volume
cut_volume_total += edge_volume
cut_volumes_list.append(edge_volume)
# 去重高度差
unique_height_diffs = list(set([height_diffs[0], height_diffs[2], height_diffs[4]]))
# 收集统计信息
valid_elevations.extend(triangle_points[:, 2].tolist())
valid_prism_count += 1
# 创建三棱柱数据对象
prism = {
'triangle_id': triangle_id,
'vertices': [tuple(point) for point in triangle_points],
'vertex_indices': simplex.tolist(),
'cut_volume': float(triangle_cut_volume),
'fill_volume': float(triangle_fill_volume),
'net_volume': float(triangle_cut_volume - triangle_fill_volume),
'avg_elevation': float(avg_elevation),
'design_elevation': design_elevation,
'height_diffs': unique_height_diffs,
'area': float(area),
'is_valid': True,
'center_point': tuple(triangle_center),
'edge_volumes': edge_volumes
}
else:
# 创建无效三棱柱数据对象
area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2])
if math.isnan(area):
area = 0.0
# 计算高度差
height_diffs = []
for i in range(3):
height_i = design_elevation - triangle_points[i, 2]
height_diffs.append(float(height_i))
prism = {
'triangle_id': triangle_id,
'vertices': [tuple(point) for point in triangle_points],
'vertex_indices': simplex.tolist(),
'cut_volume': 0.0,
'fill_volume': 0.0,
'net_volume': 0.0,
'avg_elevation': float(triangle_points[:, 2].mean()),
'design_elevation': design_elevation,
'height_diffs': height_diffs,
'area': float(area),
'is_valid': False,
'center_point': tuple(triangle_center),
'edge_volumes': [0.0, 0.0, 0.0]
}
prisms.append(prism)
triangle_id += 1
area = self.geometryUtils.calculate_polygon_area(polygon_coords)
# 计算高程统计信息
if valid_elevations:
elevation_stats = {
"std": float(np.std(valid_elevations)),
"median": float(np.median(valid_elevations)),
"q1": float(np.percentile(valid_elevations, 25)),
"q3": float(np.percentile(valid_elevations, 75))
}
else:
elevation_stats = {}
# 计算高度差统计信息
if height_diffs_list:
elevation_stats["height_diff"] = {
"max": float(np.max(height_diffs_list)),
"min": float(np.min(height_diffs_list)),
"mean": float(np.mean(height_diffs_list)),
"std": float(np.std(height_diffs_list))
}
# 计算体积分布信息
volume_distribution = {}
if cut_volumes_list:
volume_distribution["cut"] = {
"max": float(np.max(cut_volumes_list)),
"min": float(np.min(cut_volumes_list)),
"mean": float(np.mean(cut_volumes_list)),
"total_edges": len(cut_volumes_list)
}
if fill_volumes_list:
volume_distribution["fill"] = {
"max": float(np.max(fill_volumes_list)),
"min": float(np.min(fill_volumes_list)),
"mean": float(np.mean(fill_volumes_list)),
"total_edges": len(fill_volumes_list)
}
# 计算三角形面积统计
if triangle_areas:
volume_distribution["area"] = {
"max": float(np.max(triangle_areas)),
"min": float(np.min(triangle_areas)),
"mean": float(np.mean(triangle_areas)),
"total_area": float(np.sum(triangle_areas))
}
return EarthworkResult3dTiles(
cut_volume=cut_volume_total,
fill_volume=fill_volume_total,
net_volume=cut_volume_total - fill_volume_total,
area=area,
avg_elevation=points[:, 2].mean(),
min_elevation=points[:, 2].min(),
max_elevation=points[:, 2].max(),
points_count=points.shape[0],
bounding_box={
"min": points.min(axis=0)[:2].tolist(),
"max": points.max(axis=0)[:2].tolist()
},
volume_accuracy=self._calculate_accuracy(points, resolution),
algorithm=AlgorithmType.PRISM.value,
resolution=resolution,
calculation_details=prisms,
details_type="prisms",
details_dimensions={
"prism_count": total_prism_count,
"valid_prism_count": valid_prism_count,
"vertex_count": points.shape[0],
"edge_resolution": resolution
},
valid_unit_count=valid_prism_count,
total_unit_count=total_prism_count,
elevation_statistics=elevation_stats,
volume_distribution=volume_distribution
)
def _calculate_accuracy(self, points: np.ndarray, resolution: float) -> float:
"""计算精度评估"""
if points.shape[0] < 10:
return 0.5
# 基于点密度和分辨率估算精度
if points.shape[0] > 0:
# 计算点密度
bounds = points[:, :2]
area = (bounds[:, 0].max() - bounds[:, 0].min()) * \
(bounds[:, 1].max() - bounds[:, 1].min())
if area > 0:
point_density = points.shape[0] / area
if point_density > 100 and resolution <= 0.5: # 高密度,高分辨率
return 0.05
elif point_density > 10 and resolution <= 1.0:
return 0.1
elif point_density > 1:
return 0.2
return 0.3
async def validate(self, polygon_coords: List[List[float]]) -> Dict:
"""验证计算参数"""
try:
points = await self.data_source.get_points_in_polygon(polygon_coords)
validation_result = {
"polygon_valid": len(polygon_coords) >= 3,
"area": self.geometryUtils.calculate_polygon_area(polygon_coords),
"points_available": points.size > 0,
"points_count": points.shape[0] if points.size > 0 else 0,
"data_quality": "good" if points.shape[0] > 100 else "poor",
"suggested_algorithm": self._suggest_algorithm(points),
"estimated_accuracy": self._calculate_accuracy(points, 1.0) if points.size > 0 else None
}
if points.size > 0:
validation_result.update({
"elevation_range": {
"min": float(points[:, 2].min()),
"max": float(points[:, 2].max()),
"avg": float(points[:, 2].mean())
},
"bounding_box": {
"min": points.min(axis=0).tolist(),
"max": points.max(axis=0).tolist()
}
})
return validation_result
except Exception as e:
logger.error(f"验证失败: {str(e)}")
return {
"polygon_valid": False,
"error": str(e)
}
def _suggest_algorithm(self, points: np.ndarray) -> str:
"""建议计算算法"""
if points.shape[0] < 100:
return AlgorithmType.TIN.value
# 计算点的规则性
bounds = points[:, :2]
x_range = bounds[:, 0].max() - bounds[:, 0].min()
y_range = bounds[:, 1].max() - bounds[:, 1].min()
# 如果点分布相对规则,建议使用格网法
if x_range > 0 and y_range > 0:
aspect_ratio = max(x_range, y_range) / min(x_range, y_range)
if aspect_ratio < 2: # 相对规则
return AlgorithmType.GRID.value
return AlgorithmType.TIN.value