# earthwork_calculator.py import numpy as np from pyproj import Transformer, CRS from scipy.spatial import Delaunay from dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Union import logging from enum import Enum from abc import ABC, abstractmethod import math logger = logging.getLogger(__name__) class AlgorithmType(Enum): """计算算法类型""" GRID = "grid" TIN = "tin" PRISM = "prism" @dataclass class EarthworkResult3dTiles: """土方量计算结果""" cut_volume: float # 挖方量 (m³) fill_volume: float # 填方量 (m³) net_volume: float # 净方量 (m³) area: float # 计算区域面积 (m²) avg_elevation: float # 平均高程 min_elevation: float # 最低高程 max_elevation: float # 最高高程 points_count: int # 使用的点数 bounding_box: Dict[str, List[float]] # 边界框 volume_accuracy: float # 计算精度 algorithm: str # 使用的算法 resolution: float # 计算分辨率 def to_dict(self) -> Dict: """转换为字典""" return { "volume": { "cut": round(self.cut_volume, 3), "fill": round(self.fill_volume, 3), "net": round(self.net_volume, 3), "unit": "m³" }, "area": { "value": round(self.area, 3), "unit": "m²" }, "elevation": { "average": round(self.avg_elevation, 3), "min": round(self.min_elevation, 3), "max": round(self.max_elevation, 3), "unit": "m" }, "statistics": { "points_count": self.points_count, "accuracy": round(self.volume_accuracy, 3), "algorithm": self.algorithm }, "bounding_box": self.bounding_box, "calculation_params": { "resolution": self.resolution, "accuracy": self.volume_accuracy } } class TerrainDataSource(ABC): """地形数据源抽象类""" @abstractmethod async def get_points_in_polygon(self, polygon_coords: List[List[float]], z_range: Optional[Tuple[float, float]] = None) -> np.ndarray: """ 获取多边形区域内的点云数据 Args: polygon_coords: 多边形坐标 [[x1,y1], [x2,y2], ...] z_range: 高程范围 (min_z, max_z) Returns: Nx3的numpy数组 [x, y, z] """ pass @abstractmethod async def get_data_bounds(self) -> Dict[str, List[float]]: """获取数据范围""" pass @abstractmethod def get_crs(self) -> str: """获取数据坐标系""" pass class GeometryUtils: """几何计算工具类""" @staticmethod def calculate_polygon_area(polygon_coords: List[List[float]]) -> float: """计算多边形面积(平面面积)""" polygon_np = np.array(polygon_coords) x = polygon_np[:, 0] y = polygon_np[:, 1] return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1))) @staticmethod def is_point_in_polygon(point: np.ndarray, polygon: np.ndarray) -> bool: """判断点是否在多边形内""" from matplotlib.path import Path path = Path(polygon) return path.contains_point(point) @staticmethod def calculate_triangle_area(points: np.ndarray) -> float: """计算三角形面积""" a = np.linalg.norm(points[0] - points[1]) b = np.linalg.norm(points[1] - points[2]) c = np.linalg.norm(points[2] - points[0]) s = (a + b + c) / 2 return np.sqrt(s * (s - a) * (s - b) * (s - c)) @staticmethod def create_grid(polygon: np.ndarray, resolution: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """创建规则格网""" x_min, y_min = polygon.min(axis=0) x_max, y_max = polygon.max(axis=0) # 扩展一个格网单元 x_min -= resolution x_max += resolution y_min -= resolution y_max += resolution x_grid = np.arange(x_min, x_max + resolution, resolution) y_grid = np.arange(y_min, y_max + resolution, resolution) xx, yy = np.meshgrid(x_grid, y_grid) return xx, yy, x_grid, y_grid @staticmethod def interpolate_grid(xx: np.ndarray, yy: np.ndarray, points: np.ndarray, method: str = 'linear') -> np.ndarray: """格网插值""" from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator grid_points = np.column_stack([xx.ravel(), yy.ravel()]) if method == 'linear': interpolator = LinearNDInterpolator(points[:, :2], points[:, 2]) elif method == 'cubic': interpolator = CloughTocher2DInterpolator(points[:, :2], points[:, 2]) else: raise ValueError(f"不支持的插值方法: {method}") elevations = interpolator(grid_points) return elevations.reshape(xx.shape) class EarthworkCalculator3dTiles: """土方量计算器""" def __init__(self, data_source: TerrainDataSource): """ 初始化计算器 Args: data_source: 地形数据源 """ self.data_source = data_source self._transformer_cache = {} async def calculate(self, polygon_coords: List[List[float]], design_elevation: float, algorithm: AlgorithmType = AlgorithmType.TIN, resolution: float = 1.0, target_crs: str = "EPSG:4326", interpolation_method: str = 'linear') -> EarthworkResult3dTiles: """ 计算土方量 Args: polygon_coords: 多边形坐标 design_elevation: 设计高程 algorithm: 计算算法 resolution: 格网分辨率(米) target_crs: 目标坐标系 interpolation_method: 插值方法 Returns: EarthworkResult: 计算结果 """ try: # 1. 获取数据 points = await self.data_source.get_points_in_polygon(polygon_coords) if points.size == 0: raise ValueError("区域内没有找到高程数据") # 2. 坐标转换 points = await self._transform_coordinates(points, target_crs) # 3. 执行计算 if algorithm == AlgorithmType.GRID: result = await self._calculate_grid(points, polygon_coords, design_elevation, resolution, interpolation_method) elif algorithm == AlgorithmType.TIN: result = await self._calculate_tin(points, polygon_coords, design_elevation) elif algorithm == AlgorithmType.PRISM: result = await self._calculate_prism(points, polygon_coords, design_elevation, resolution) else: raise ValueError(f"不支持的算法: {algorithm}") return result except Exception as e: logger.error(f"土方量计算失败: {str(e)}") raise async def _transform_coordinates(self, points: np.ndarray, target_crs: str) -> np.ndarray: """坐标转换""" source_crs = self.data_source.get_crs() if source_crs == target_crs: return points cache_key = f"{source_crs}->{target_crs}" if cache_key not in self._transformer_cache: self._transformer_cache[cache_key] = Transformer.from_crs( CRS.from_string(source_crs), CRS.from_string(target_crs), always_xy=True ) transformer = self._transformer_cache[cache_key] points_2d = transformer.transform(points[:, 0], points[:, 1]) return np.column_stack([points_2d[0], points_2d[1], points[:, 2]]) async def _calculate_grid(self, points: np.ndarray, polygon_coords: List[List[float]], design_elevation: float, resolution: float, interpolation_method: str) -> EarthworkResult3dTiles: """格网法计算""" polygon_np = np.array(polygon_coords) # 创建格网 xx, yy, x_grid, y_grid = GeometryUtils.create_grid(polygon_np, resolution) # 插值 natural_elevations = GeometryUtils.interpolate_grid(xx, yy, points, interpolation_method) # 初始化挖填量 cut_volume = 0.0 fill_volume = 0.0 total_area = 0.0 # 遍历每个格网单元 for i in range(len(x_grid) - 1): for j in range(len(y_grid) - 1): # 格网四个角点 cell_corners = np.array([ [x_grid[i], y_grid[j]], [x_grid[i+1], y_grid[j]], [x_grid[i+1], y_grid[j+1]], [x_grid[i], y_grid[j+1]] ]) # 检查格网中心点是否在多边形内 cell_center = cell_corners.mean(axis=0) if not GeometryUtils.is_point_in_polygon(cell_center, polygon_np): continue # 获取格网四个角点的高程 cell_elevations = [ natural_elevations[j, i], natural_elevations[j, i+1], natural_elevations[j+1, i+1], natural_elevations[j+1, i] ] # 检查是否有无效数据 if any(np.isnan(elev) for elev in cell_elevations): continue # 计算格网平均高程 avg_elevation = np.mean(cell_elevations) cell_area = resolution * resolution total_area += cell_area # 计算挖填量 height_diff = design_elevation - avg_elevation if height_diff > 0: fill_volume += height_diff * cell_area else: cut_volume += abs(height_diff) * cell_area # 计算统计信息 area = GeometryUtils.calculate_polygon_area(polygon_coords) mask = ~np.isnan(natural_elevations) valid_elevations = natural_elevations[mask] return EarthworkResult3dTiles( cut_volume=cut_volume, fill_volume=fill_volume, net_volume=cut_volume - fill_volume, area=area, avg_elevation=np.mean(valid_elevations) if valid_elevations.size > 0 else 0, min_elevation=np.min(valid_elevations) if valid_elevations.size > 0 else 0, max_elevation=np.max(valid_elevations) if valid_elevations.size > 0 else 0, points_count=points.shape[0], bounding_box={ "min": [x_grid[0], y_grid[0]], "max": [x_grid[-1], y_grid[-1]] }, volume_accuracy=self._calculate_accuracy(points, resolution), algorithm=AlgorithmType.GRID.value, resolution=resolution ).to_dict() async def _calculate_tin(self, points: np.ndarray, polygon_coords: List[List[float]], design_elevation: float) -> EarthworkResult3dTiles: """三角网法计算""" polygon_np = np.array(polygon_coords) # 创建Delaunay三角网 triangulation = Delaunay(points[:, :2]) # 筛选多边形内的三角形 cut_volume = 0.0 fill_volume = 0.0 total_area = 0.0 for simplex in triangulation.simplices: triangle_points = points[simplex] triangle_center = triangle_points.mean(axis=0)[:2] # 检查三角形中心是否在多边形内 if not GeometryUtils.is_point_in_polygon(triangle_center, polygon_np): continue # 计算三角形面积 area = GeometryUtils.calculate_triangle_area(triangle_points[:, :2]) total_area += area # 计算平均高程(使用三个顶点的高程) avg_elevation = triangle_points[:, 2].mean() # 计算挖填量 height_diff = design_elevation - avg_elevation if height_diff > 0: fill_volume += height_diff * area else: cut_volume += abs(height_diff) * area # 计算统计信息 area = GeometryUtils.calculate_polygon_area(polygon_coords) return EarthworkResult3dTiles( cut_volume=cut_volume, fill_volume=fill_volume, net_volume=cut_volume - fill_volume, area=area, avg_elevation=points[:, 2].mean(), min_elevation=points[:, 2].min(), max_elevation=points[:, 2].max(), points_count=points.shape[0], bounding_box={ "min": points.min(axis=0)[:2].tolist(), "max": points.max(axis=0)[:2].tolist() }, volume_accuracy=self._calculate_accuracy(points, 0), algorithm=AlgorithmType.TIN.value, resolution=0 ).to_dict() async def _calculate_prism(self, points: np.ndarray, polygon_coords: List[List[float]], design_elevation: float, resolution: float) -> EarthworkResult3dTiles: """三棱柱法计算""" # 先创建TIN polygon_np = np.array(polygon_coords) triangulation = Delaunay(points[:, :2]) cut_volume = 0.0 fill_volume = 0.0 total_area = 0.0 for simplex in triangulation.simplices: triangle_points = points[simplex] triangle_center = triangle_points.mean(axis=0)[:2] if not GeometryUtils.is_point_in_polygon(triangle_center, polygon_np): continue # 计算三角形面积 area = GeometryUtils.calculate_triangle_area(triangle_points[:, :2]) total_area += area # 对于每个三角形,计算三棱柱体积 # 这里简化处理,使用梯形公式 for i in range(3): j = (i + 1) % 3 # 计算边的长度 edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2]) # 计算边两端点的挖填高度 height_i = design_elevation - triangle_points[i, 2] height_j = design_elevation - triangle_points[j, 2] # 计算边的平均挖填高度 avg_height = (abs(height_i) + abs(height_j)) / 2 # 计算边的面积(假设边宽度为resolution) edge_area = edge_length * resolution if height_i > 0 or height_j > 0: fill_volume += avg_height * edge_area else: cut_volume += avg_height * edge_area area = GeometryUtils.calculate_polygon_area(polygon_coords) return EarthworkResult3dTiles( cut_volume=cut_volume, fill_volume=fill_volume, net_volume=cut_volume - fill_volume, area=area, avg_elevation=points[:, 2].mean(), min_elevation=points[:, 2].min(), max_elevation=points[:, 2].max(), points_count=points.shape[0], bounding_box={ "min": points.min(axis=0)[:2].tolist(), "max": points.max(axis=0)[:2].tolist() }, volume_accuracy=self._calculate_accuracy(points, resolution), algorithm=AlgorithmType.PRISM.value, resolution=resolution ).to_dict() def _calculate_accuracy(self, points: np.ndarray, resolution: float) -> float: """计算精度评估""" if points.shape[0] < 10: return 0.5 # 基于点密度和分辨率估算精度 if points.shape[0] > 0: # 计算点密度 bounds = points[:, :2] area = (bounds[:, 0].max() - bounds[:, 0].min()) * \ (bounds[:, 1].max() - bounds[:, 1].min()) if area > 0: point_density = points.shape[0] / area if point_density > 100 and resolution <= 0.5: # 高密度,高分辨率 return 0.05 elif point_density > 10 and resolution <= 1.0: return 0.1 elif point_density > 1: return 0.2 return 0.3 async def validate(self, polygon_coords: List[List[float]]) -> Dict: """验证计算参数""" try: points = await self.data_source.get_points_in_polygon(polygon_coords) validation_result = { "polygon_valid": len(polygon_coords) >= 3, "area": GeometryUtils.calculate_polygon_area(polygon_coords), "points_available": points.size > 0, "points_count": points.shape[0] if points.size > 0 else 0, "data_quality": "good" if points.shape[0] > 100 else "poor", "suggested_algorithm": self._suggest_algorithm(points), "estimated_accuracy": self._calculate_accuracy(points, 1.0) if points.size > 0 else None } if points.size > 0: validation_result.update({ "elevation_range": { "min": float(points[:, 2].min()), "max": float(points[:, 2].max()), "avg": float(points[:, 2].mean()) }, "bounding_box": { "min": points.min(axis=0).tolist(), "max": points.max(axis=0).tolist() } }) return validation_result except Exception as e: logger.error(f"验证失败: {str(e)}") return { "polygon_valid": False, "error": str(e) } def _suggest_algorithm(self, points: np.ndarray) -> str: """建议计算算法""" if points.shape[0] < 100: return AlgorithmType.TIN.value # 计算点的规则性 bounds = points[:, :2] x_range = bounds[:, 0].max() - bounds[:, 0].min() y_range = bounds[:, 1].max() - bounds[:, 1].min() # 如果点分布相对规则,建议使用格网法 if x_range > 0 and y_range > 0: aspect_ratio = max(x_range, y_range) / min(x_range, y_range) if aspect_ratio < 2: # 相对规则 return AlgorithmType.GRID.value return AlgorithmType.TIN.value