diff --git a/b3dm/earthwork_api.py b/b3dm/earthwork_api.py index 2c52b60..5abfcf5 100644 --- a/b3dm/earthwork_api.py +++ b/b3dm/earthwork_api.py @@ -136,7 +136,132 @@ async def calc_earthwork(request: Request): ) # 7. 返回成功响应 - return _success_response(result) + res_dict = result.to_dict() + res_dict["calculation_details"] = None + res_dict["elevation_statistics"] = None + res_dict["volume_distribution"] = None + return _success_response(res_dict) + + except ValueError as e: + logger.warning(f"参数验证失败: {str(e)}") + return _error_response(f"参数错误: {str(e)}", 400) + except Exception as e: + logger.error(f"计算失败: {str(e)}") + return _error_response(f"服务器内部错误: {str(e)}", 500) + +# 两期对比接口-3dTiles +@earthwork_bp.post("/api/v1/calc/twoPhaseComparison") +async def two_phase_comparison(request: Request): + """ + 两期对比接口 + + 请求参数示例: + { + "polygonCoords": [ + [ + 115.70440468338526, + 30.77363140345639 + ], + [ + 115.70443054007985, + 30.773510462589584 + ], + [ + 115.70459702429197, + 30.77360789911405 + ] + ], + "designElevation": 100, + "algorithm": "grid", + "resolution": 1, + "crs": "EPSG:4326", + "interpolationMethod": "linear", + "urlA": "http://222.212.85.86:9000/300bdf2b-a150-406e-be63-d28bd29b409f/model/hbgldk/yzk/20260113/3D/terra_b3dms/tileset.json", + "urlB": "http://222.212.85.86:9000/300bdf2b-a150-406e-be63-d28bd29b409f/model/hbgldk/yzk/20260113/3D/terra_b3dms/tileset.json" + } + """ + try: + # 1. 接收前端传参 + data = request.json + if not data: + return _error_response("请求参数不能为空", 400) + + # 2. 提取参数 + polygon_coords = data.get("polygonCoords") + design_elevation = data.get("designElevation", 1000) + urlA = data.get("urlA") + urlB = data.get("urlB") + + if not polygon_coords: + return _error_response("多边形坐标不能为空", 400) + if design_elevation is None: + return _error_response("设计高程不能为空", 400) + if urlA is None or urlB is None : + return _error_response("对比地图不能为空", 400) + + # 3. 可选参数 + algorithm = data.get("algorithm", "tin") + resolution = data.get("resolution", 1.0) + crs = data.get("crs", "EPSG:4326") + interpolation_method = data.get("interpolationMethod", "linear") + + # 4. 参数验证 + if len(polygon_coords) < 3: + return _error_response("多边形至少需要3个点", 400) + + # 检查多边形是否闭合,如不闭合则自动闭合 + if polygon_coords[0] != polygon_coords[-1]: + polygon_coords.append(polygon_coords[0]) + + # 算法验证 + if algorithm not in ["grid", "tin", "prism"]: + return _error_response("算法必须是 grid, tin 或 prism", 400) + + # 分辨率验证 + if resolution <= 0 or resolution > 100: + return _error_response("分辨率必须在0-100米之间", 400) + + # 5. 确保计算器已初始化 + app_info_a = init_app(urlA) + if not app_info_a.get('data_source').tileset_path : + return _error_response(f"下载地图失败:{urlA}", 400) + calculator_3d_tiles_a = app_info_a.get("calculator_3d_tiles") + app_info_b = init_app(urlB) + if not app_info_b.get('data_source').tileset_path : + return _error_response(f"下载地图失败:{urlB}", 400) + calculator_3d_tiles_b = app_info_b.get("calculator_3d_tiles") + + # 6. 执行计算 + algorithm_type = AlgorithmType.GRID + result_a = await calculator_3d_tiles_a.calculate( + polygon_coords=polygon_coords, + design_elevation=design_elevation, + algorithm=algorithm_type, + resolution=resolution, + target_crs=crs, + interpolation_method=interpolation_method + ) + result_b = await calculator_3d_tiles_b.calculate( + polygon_coords=polygon_coords, + design_elevation=design_elevation, + algorithm=algorithm_type, + resolution=resolution, + target_crs=crs, + interpolation_method=interpolation_method + ) + + # 获取网格数据 + grids_a = result_a.calculation_details + grids_b = result_b.calculation_details + + # 比较网格数据 + comparison_result = calculator_3d_tiles_a.compare_grid_cells(grids_a, grids_b) + + # 转换为字典 + result_dict = comparison_result.to_dict() + + # 7. 返回成功响应 + return _success_response(result_dict) except ValueError as e: logger.warning(f"参数验证失败: {str(e)}") diff --git a/b3dm/earthwork_calculator_3d_tiles.py b/b3dm/earthwork_calculator_3d_tiles.py index 96e1f9d..75a1fc8 100644 --- a/b3dm/earthwork_calculator_3d_tiles.py +++ b/b3dm/earthwork_calculator_3d_tiles.py @@ -2,13 +2,15 @@ import numpy as np from pyproj import Transformer, CRS from scipy.spatial import Delaunay -from dataclasses import dataclass -from typing import List, Dict, Optional, Tuple, Union +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Tuple, Union, Any import logging from enum import Enum from abc import ABC, abstractmethod import math from pyproj import Geod +from scipy.interpolate import LinearNDInterpolator, CloughTocher2DInterpolator +from matplotlib.path import Path logger = logging.getLogger(__name__) @@ -18,6 +20,168 @@ class AlgorithmType(Enum): TIN = "tin" PRISM = "prism" +@dataclass +class GridCellData: + """单个网格单元的数据""" + i: int # x方向索引 + j: int # y方向索引 + x_min: float # 网格最小x坐标 + x_max: float # 网格最大x坐标 + y_min: float # 网格最小y坐标 + y_max: float # 网格最大y坐标 + cut_volume: float # 挖方量 + fill_volume: float # 填方量 + net_volume: float # 净方量 + avg_elevation: float # 平均自然高程 + design_elevation: float # 设计高程 + height_diff: float # 设计高程与自然高程差 + area: float # 网格面积 + is_valid: bool # 是否有效 + corner_elevations: List[float] = field(default_factory=list) # 四个角点高程 + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + 'i': self.i, + 'j': self.j, + 'x_min': self.x_min, + 'x_max': self.x_max, + 'y_min': self.y_min, + 'y_max': self.y_max, + 'cut_volume': self.cut_volume, + 'fill_volume': self.fill_volume, + 'net_volume': self.net_volume, + 'avg_elevation': self.avg_elevation, + 'design_elevation': self.design_elevation, + 'height_diff': self.height_diff, + 'area': self.area, + 'is_valid': self.is_valid, + 'corner_elevations': self.corner_elevations + } + +@dataclass +class TriangleData: + """单个三角形单元的数据""" + triangle_id: int # 三角形ID + vertices: List[Tuple[float, float, float]] # 三个顶点坐标(x,y,z) + vertex_indices: List[int] # 顶点索引 + cut_volume: float # 挖方量 + fill_volume: float # 填方量 + net_volume: float # 净方量 + avg_elevation: float # 平均自然高程 + design_elevation: float # 设计高程 + height_diff: float # 设计高程与自然高程差 + area: float # 三角形面积 + is_valid: bool # 是否有效 + center_point: Tuple[float, float] # 三角形中心点 + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + 'triangle_id': self.triangle_id, + 'vertices': self.vertices, + 'vertex_indices': self.vertex_indices, + 'cut_volume': self.cut_volume, + 'fill_volume': self.fill_volume, + 'net_volume': self.net_volume, + 'avg_elevation': self.avg_elevation, + 'design_elevation': self.design_elevation, + 'height_diff': self.height_diff, + 'area': self.area, + 'is_valid': self.is_valid, + 'center_point': self.center_point + } + +@dataclass +class PrismData: + """单个三棱柱单元的数据""" + triangle_id: int # 三角形ID + vertices: List[Tuple[float, float, float]] # 三个顶点坐标(x,y,z) + vertex_indices: List[int] # 顶点索引 + cut_volume: float # 挖方量 + fill_volume: float # 填方量 + net_volume: float # 净方量 + avg_elevation: float # 平均自然高程 + design_elevation: float # 设计高程 + height_diffs: List[float] # 三个顶点的高程差 + area: float # 三角形面积 + is_valid: bool # 是否有效 + center_point: Tuple[float, float] # 三角形中心点 + edge_volumes: List[float] = field(default_factory=list) # 三个边的体积贡献 + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + 'triangle_id': self.triangle_id, + 'vertices': self.vertices, + 'vertex_indices': self.vertex_indices, + 'cut_volume': self.cut_volume, + 'fill_volume': self.fill_volume, + 'net_volume': self.net_volume, + 'avg_elevation': self.avg_elevation, + 'design_elevation': self.design_elevation, + 'height_diffs': self.height_diffs, + 'area': self.area, + 'is_valid': self.is_valid, + 'center_point': self.center_point, + 'edge_volumes': self.edge_volumes + } + +@dataclass +class GridCellDifference: + """网格单元差异数据""" + i: int # x方向索引 + j: int # y方向索引 + x_min: float # 网格最小x坐标 + x_max: float # 网格最大x坐标 + y_min: float # 网格最小y坐标 + y_max: float # 网格最大y坐标 + cut_volume: float # 挖方量差值 (b-a) + fill_volume: float # 填方量差值 (b-a) + net_volume: float # 净方量差值 (b-a) + avg_elevation: float # 平均自然高程差值 (b-a) + height_diff: float # 高程差变化 (b-a) + is_valid: bool # 是否有效(两个网格都有效) + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + 'i': self.i, + 'j': self.j, + 'x_min': self.x_min, + 'x_max': self.x_max, + 'y_min': self.y_min, + 'y_max': self.y_max, + 'cut_volume': self.cut_volume, + 'fill_volume': self.fill_volume, + 'net_volume': self.net_volume, + 'avg_elevation': self.avg_elevation, + 'height_diff': self.height_diff, + 'is_valid': self.is_valid + } + +@dataclass +class EarthworkComparisonResult: + """土方量比较结果""" + # 总体差异统计 + total_cut_volume: float # 总挖方量差值 + total_fill_volume: float # 总填方量差值 + total_net_volume: float # 总净方量差值 + + # 网格单元差异数据 + grid_differences: List[Dict[str, Any]] + + def to_dict(self) -> Dict[str, Any]: + """转换为字典""" + return { + "total_differences": { + "cut_volume": round(self.total_cut_volume, 8), + "fill_volume": round(self.total_fill_volume, 8), + "net_volume": round(self.total_net_volume, 8), + }, + "grid_differences": self.grid_differences + } + @dataclass class EarthworkResult3dTiles: """土方量计算结果""" @@ -34,9 +198,20 @@ class EarthworkResult3dTiles: algorithm: str # 使用的算法 resolution: float # 计算分辨率 + # 新增属性 - 计算单元详细数据 + calculation_details: List[Dict[str, Any]] = field(default_factory=list) # 所有计算单元的详细数据 + details_type: str = "" # 计算单元类型: "grid_cells", "triangles", "prisms" + details_dimensions: Dict[str, Any] = field(default_factory=dict) # 计算单元维度信息 + valid_unit_count: int = 0 # 有效计算单元数量 + total_unit_count: int = 0 # 总计算单元数量 + + # 新增属性 - 统计信息 + elevation_statistics: Dict[str, float] = field(default_factory=dict) # 高程统计信息 + volume_distribution: Dict[str, Any] = field(default_factory=dict) # 土方量分布信息 + def to_dict(self) -> Dict: """转换为字典""" - return { + result = { "volume": { "cut": round(self.cut_volume, 8), "fill": round(self.fill_volume, 8), @@ -64,7 +239,25 @@ class EarthworkResult3dTiles: "accuracy": self.volume_accuracy } } - + + # 添加新增的属性 + if self.calculation_details: + result["calculation_details"] = { + "type": self.details_type, + "dimensions": self.details_dimensions, + "valid_unit_count": self.valid_unit_count, + "total_unit_count": self.total_unit_count, + "data": self.calculation_details + } + + if self.elevation_statistics: + result["elevation_statistics"] = self.elevation_statistics + + if self.volume_distribution: + result["volume_distribution"] = self.volume_distribution + + return result + class TerrainDataSource(ABC): """地形数据源抽象类""" @@ -478,7 +671,230 @@ class EarthworkCalculator3dTiles: self.data_source = data_source self.geometryUtils = GeometryUtils() self._transformer_cache = {} + + def compare_grids_by_coordinate(self, + grids_a: List[Dict[str, Any]], + grids_b: List[Dict[str, Any]] + ) -> EarthworkComparisonResult: + """按坐标匹配比较网格数据""" + # 使用字典按坐标快速查找网格A + grids_a_by_coord = {} + for cell in grids_a: + key = (round(cell.get('x_min', 0), 6), round(cell.get('y_min', 0), 6)) + grids_a_by_coord[key] = cell + grid_differences_list = [] + total_cut_volume = 0.0 + total_fill_volume = 0.0 + total_net_volume = 0.0 + + # 收集有效网格的差异数据用于统计 + valid_cut_diffs = [] + valid_fill_diffs = [] + valid_net_diffs = [] + valid_elevation_diffs = [] + + matched_count = 0 + unmatched_count = 0 + + # 遍历网格B,查找匹配的网格A + for cell_b in grids_b: + key = (round(cell_b.get('x_min', 0), 6), round(cell_b.get('y_min', 0), 6)) + + if key in grids_a_by_coord: + cell_a = grids_a_by_coord[key] + matched_count += 1 + + # 计算各项差值 + cut_volume_diff = cell_b.get('cut_volume', 0) - cell_a.get('cut_volume', 0) + fill_volume_diff = cell_b.get('fill_volume', 0) - cell_a.get('fill_volume', 0) + net_volume_diff = cell_b.get('net_volume', 0) - cell_a.get('net_volume', 0) + + # 计算平均高程差值 + avg_elevation_a = cell_a.get('avg_elevation', np.nan) + avg_elevation_b = cell_b.get('avg_elevation', np.nan) + avg_elevation_diff = 0.0 + if not np.isnan(avg_elevation_a) and not np.isnan(avg_elevation_b): + avg_elevation_diff = avg_elevation_b - avg_elevation_a + + # 计算高程差变化 + height_diff_a = cell_a.get('height_diff', np.nan) + height_diff_b = cell_b.get('height_diff', np.nan) + height_diff_change = 0.0 + if not np.isnan(height_diff_a) and not np.isnan(height_diff_b): + height_diff_change = height_diff_b - height_diff_a + + # 检查单元是否有效 + is_valid_a = cell_a.get('is_valid', False) + is_valid_b = cell_b.get('is_valid', False) + is_valid = is_valid_a and is_valid_b + + # 创建差异单元对象 + diff_cell = GridCellDifference( + i=cell_b.get('i', 0), + j=cell_b.get('j', 0), + x_min=cell_b.get('x_min', 0), + x_max=cell_b.get('x_max', 0), + y_min=cell_b.get('y_min', 0), + y_max=cell_b.get('y_max', 0), + cut_volume=cut_volume_diff, + fill_volume=fill_volume_diff, + net_volume=net_volume_diff, + avg_elevation=avg_elevation_diff, + height_diff=height_diff_change, + is_valid=is_valid + ) + + grid_differences_list.append(diff_cell.to_dict()) + + # 累加有效网格的总体差异 + if is_valid: + total_cut_volume += cut_volume_diff + total_fill_volume += fill_volume_diff + total_net_volume += net_volume_diff + + # 收集统计信息 + valid_cut_diffs.append(cut_volume_diff) + valid_fill_diffs.append(fill_volume_diff) + valid_net_diffs.append(net_volume_diff) + valid_elevation_diffs.append(avg_elevation_diff) + else: + unmatched_count += 1 + # 对于未匹配的网格B,创建一个无效的差异单元 + grid_differences_list.append({ + 'i': cell_b.get('i', 0), + 'j': cell_b.get('j', 0), + 'x_min': cell_b.get('x_min', 0), + 'x_max': cell_b.get('x_max', 0), + 'y_min': cell_b.get('y_min', 0), + 'y_max': cell_b.get('y_max', 0), + 'cut_volume': 0.0, + 'fill_volume': 0.0, + 'net_volume': 0.0, + 'avg_elevation': 0.0, + 'height_diff': 0.0, + 'is_valid': False + }) + + return EarthworkComparisonResult( + total_cut_volume=total_cut_volume, + total_fill_volume=total_fill_volume, + total_net_volume=total_net_volume, + grid_differences=grid_differences_list + ) + + def compare_grids_by_index(self, + grids_a: List[Dict[str, Any]], + grids_b: List[Dict[str, Any]] + ) -> EarthworkComparisonResult: + """按索引顺序比较网格数据""" + grid_differences_list = [] + total_cut_volume = 0.0 + total_fill_volume = 0.0 + total_net_volume = 0.0 + + # 收集有效网格的差异数据用于统计 + valid_cut_diffs = [] + valid_fill_diffs = [] + valid_net_diffs = [] + valid_elevation_diffs = [] + + for i in range(len(grids_a)): + cell_a = grids_a[i] + cell_b = grids_b[i] + + # 检查是否是同一个网格单元 + if cell_a.get('i') != cell_b.get('i') or cell_a.get('j') != cell_b.get('j'): + print(f"警告: 索引{i}的网格不匹配: A({cell_a.get('i')},{cell_a.get('j')}) vs B({cell_b.get('i')},{cell_b.get('j')})") + # 尝试通过坐标匹配 + return self.compare_grids_by_coordinate(grids_a, grids_b) + + # 计算各项差值 + cut_volume_diff = cell_b.get('cut_volume', 0) - cell_a.get('cut_volume', 0) + fill_volume_diff = cell_b.get('fill_volume', 0) - cell_a.get('fill_volume', 0) + net_volume_diff = cell_b.get('net_volume', 0) - cell_a.get('net_volume', 0) + + # 计算平均高程差值 + avg_elevation_a = cell_a.get('avg_elevation', np.nan) + avg_elevation_b = cell_b.get('avg_elevation', np.nan) + avg_elevation_diff = 0.0 + if not np.isnan(avg_elevation_a) and not np.isnan(avg_elevation_b): + avg_elevation_diff = avg_elevation_b - avg_elevation_a + + # 计算高程差变化 + height_diff_a = cell_a.get('height_diff', np.nan) + height_diff_b = cell_b.get('height_diff', np.nan) + height_diff_change = 0.0 + if not np.isnan(height_diff_a) and not np.isnan(height_diff_b): + height_diff_change = height_diff_b - height_diff_a + + # 检查单元是否有效 + is_valid_a = cell_a.get('is_valid', False) + is_valid_b = cell_b.get('is_valid', False) + is_valid = is_valid_a and is_valid_b + + # 创建差异单元对象 + diff_cell = GridCellDifference( + i=cell_a.get('i', 0), + j=cell_a.get('j', 0), + x_min=cell_a.get('x_min', 0), + x_max=cell_a.get('x_max', 0), + y_min=cell_a.get('y_min', 0), + y_max=cell_a.get('y_max', 0), + cut_volume=cut_volume_diff, + fill_volume=fill_volume_diff, + net_volume=net_volume_diff, + avg_elevation=avg_elevation_diff, + height_diff=height_diff_change, + is_valid=is_valid + ) + + grid_differences_list.append(diff_cell.to_dict()) + + # 累加有效网格的总体差异 + if is_valid: + total_cut_volume += cut_volume_diff + total_fill_volume += fill_volume_diff + total_net_volume += net_volume_diff + + # 收集统计信息 + valid_cut_diffs.append(cut_volume_diff) + valid_fill_diffs.append(fill_volume_diff) + valid_net_diffs.append(net_volume_diff) + valid_elevation_diffs.append(avg_elevation_diff) + + return EarthworkComparisonResult( + total_cut_volume=total_cut_volume, + total_fill_volume=total_fill_volume, + total_net_volume=total_net_volume, + grid_differences=grid_differences_list + ) + + def compare_grid_cells( self, + grids_a: List[Dict[str, Any]], + grids_b: List[Dict[str, Any]] + ) -> EarthworkComparisonResult: + """ + 比较两个网格数据集合 + + Args: + grids_a: A结果的网格数据 + grids_b: B结果的网格数据 + + Returns: + 比较结果对象 + """ + if not grids_a or not grids_b: + raise ValueError("网格数据不能为空") + + if len(grids_a) != len(grids_b): + # 如果网格数量不同,尝试按坐标匹配 + print(f"警告: 网格数量不同 (A={len(grids_a)}, B={len(grids_b)}),尝试按坐标匹配") + return self.compare_grids_by_coordinate(grids_a, grids_b) + + # 网格数量相同,按顺序比较 + return self.compare_grids_by_index(grids_a, grids_b) + async def calculate(self, polygon_coords: List[List[float]], design_elevation: float, @@ -505,7 +921,8 @@ class EarthworkCalculator3dTiles: points = await self.data_source.get_points_in_polygon(polygon_coords) if points.size == 0: - raise ValueError("区域内没有找到顶点数据") + # raise ValueError("区域内没有找到顶点数据") + raise ValueError("区域太小,请调整区域") # 2. 坐标转换 points = await self._transform_coordinates(points, target_crs) @@ -552,77 +969,171 @@ class EarthworkCalculator3dTiles: return np.column_stack([points_2d[0], points_2d[1], points[:, 2]]) async def _calculate_grid(self, points: np.ndarray, - polygon_coords: List[List[float]], - design_elevation: float, - resolution: float, - interpolation_method: str) -> EarthworkResult3dTiles: - """格网法计算""" + polygon_coords: List[List[float]], + design_elevation: float, + resolution: float, + interpolation_method: str) -> EarthworkResult3dTiles: + """格网法计算 - 包含网格单元数据""" polygon_np = np.array(polygon_coords) # 创建格网 - xx, yy, x_grid, y_grid = self.geometryUtils.create_grid(polygon_np, resolution) + xx, yy, _ = self.geometryUtils.create_grid(polygon_np, resolution) + x_grid = xx[0, :] # 从xx矩阵提取x网格线 + y_grid = yy[:, 0] # 从yy矩阵提取y网格线 # 插值 natural_elevations = self.geometryUtils.interpolate_grid(xx, yy, points, interpolation_method) - # 初始化挖填量 - cut_volume = 0.0 - fill_volume = 0.0 + # 初始化统计信息 + cut_volume_total = 0.0 + fill_volume_total = 0.0 total_area = 0.0 + grid_cells: List[Dict[str, Any]] = [] + valid_grid_count = 0 + total_grid_count = (len(x_grid) - 1) * (len(y_grid) - 1) + + # 收集高程和体积用于统计 + valid_elevations = [] + cut_volumes_list = [] + fill_volumes_list = [] # 遍历每个格网单元 for i in range(len(x_grid) - 1): for j in range(len(y_grid) - 1): # 格网四个角点 + x_min, x_max = x_grid[i], x_grid[i+1] + y_min, y_max = y_grid[j], y_grid[j+1] + cell_corners = np.array([ - [x_grid[i], y_grid[j]], - [x_grid[i+1], y_grid[j]], - [x_grid[i+1], y_grid[j+1]], - [x_grid[i], y_grid[j+1]] + [x_min, y_min], + [x_max, y_min], + [x_max, y_max], + [x_min, y_max] ]) # 检查格网中心点是否在多边形内 cell_center = cell_corners.mean(axis=0) - if not self.geometryUtils.is_point_in_polygon(cell_center, polygon_np): - continue + is_in_polygon = self.geometryUtils.is_point_in_polygon(cell_center, polygon_np) # 获取格网四个角点的高程 - cell_elevations = [ - natural_elevations[j, i], - natural_elevations[j, i+1], - natural_elevations[j+1, i+1], - natural_elevations[j+1, i] + corner_elevations = [ + float(natural_elevations[j, i]), + float(natural_elevations[j, i+1]), + float(natural_elevations[j+1, i+1]), + float(natural_elevations[j+1, i]) ] # 检查是否有无效数据 - if any(np.isnan(elev) for elev in cell_elevations): - continue + has_valid_data = not any(np.isnan(elev) for elev in corner_elevations) - # 计算格网平均高程 - avg_elevation = np.mean(cell_elevations) - cell_area = resolution * resolution - total_area += cell_area - - # 计算挖填量 - height_diff = design_elevation - avg_elevation - if height_diff > 0: - fill_volume += height_diff * cell_area + if is_in_polygon and has_valid_data: + # 计算格网平均高程 + avg_elevation = np.mean(corner_elevations) + cell_area = resolution * resolution + total_area += cell_area + + # 计算挖填量 + height_diff = design_elevation - avg_elevation + + if height_diff > 0: + fill_volume = height_diff * cell_area + cut_volume = 0.0 + fill_volume_total += fill_volume + fill_volumes_list.append(fill_volume) + else: + cut_volume = abs(height_diff) * cell_area + fill_volume = 0.0 + cut_volume_total += cut_volume + cut_volumes_list.append(cut_volume) + + net_volume = cut_volume - fill_volume + + # 收集统计信息 + valid_elevations.append(avg_elevation) + valid_grid_count += 1 + + # 创建网格单元数据对象 + grid_cell = { + 'i': i, + 'j': j, + 'x_min': x_min, + 'x_max': x_max, + 'y_min': y_min, + 'y_max': y_max, + 'cut_volume': float(cut_volume), + 'fill_volume': float(fill_volume), + 'net_volume': float(net_volume), + 'avg_elevation': float(avg_elevation), + 'design_elevation': design_elevation, + 'height_diff': float(height_diff), + 'area': cell_area, + 'is_valid': True, + 'corner_elevations': corner_elevations + } else: - cut_volume += abs(height_diff) * cell_area + # 创建无效网格单元数据对象 + grid_cell = { + 'i': i, + 'j': j, + 'x_min': x_min, + 'x_max': x_max, + 'y_min': y_min, + 'y_max': y_max, + 'cut_volume': 0.0, + 'fill_volume': 0.0, + 'net_volume': 0.0, + 'avg_elevation': np.nan, + 'design_elevation': design_elevation, + 'height_diff': np.nan, + 'area': resolution * resolution, + 'is_valid': False, + 'corner_elevations': corner_elevations + } + + grid_cells.append(grid_cell) # 计算统计信息 area = self.geometryUtils.calculate_polygon_area(polygon_coords) mask = ~np.isnan(natural_elevations) - valid_elevations = natural_elevations[mask] + valid_elevations_grid = natural_elevations[mask] + + # 计算高程统计信息 + if valid_elevations: + elevation_stats = { + "std": float(np.std(valid_elevations)), + "median": float(np.median(valid_elevations)), + "q1": float(np.percentile(valid_elevations, 25)), + "q3": float(np.percentile(valid_elevations, 75)) + } + else: + elevation_stats = {} + + # 计算体积分布信息 + volume_distribution = {} + if cut_volumes_list: + volume_distribution["cut"] = { + "max": float(np.max(cut_volumes_list)), + "min": float(np.min(cut_volumes_list)), + "mean": float(np.mean(cut_volumes_list)), + "total_cells": len(cut_volumes_list) + } + + if fill_volumes_list: + volume_distribution["fill"] = { + "max": float(np.max(fill_volumes_list)), + "min": float(np.min(fill_volumes_list)), + "mean": float(np.mean(fill_volumes_list)), + "total_cells": len(fill_volumes_list) + } return EarthworkResult3dTiles( - cut_volume=cut_volume, - fill_volume=fill_volume, - net_volume=cut_volume - fill_volume, + cut_volume=cut_volume_total, + fill_volume=fill_volume_total, + net_volume=cut_volume_total - fill_volume_total, area=area, - avg_elevation=np.mean(valid_elevations) if valid_elevations.size > 0 else 0, - min_elevation=np.min(valid_elevations) if valid_elevations.size > 0 else 0, - max_elevation=np.max(valid_elevations) if valid_elevations.size > 0 else 0, + avg_elevation=np.mean(valid_elevations_grid) if valid_elevations_grid.size > 0 else 0, + min_elevation=np.min(valid_elevations_grid) if valid_elevations_grid.size > 0 else 0, + max_elevation=np.max(valid_elevations_grid) if valid_elevations_grid.size > 0 else 0, points_count=points.shape[0], bounding_box={ "min": [x_grid[0], y_grid[0]], @@ -630,58 +1141,183 @@ class EarthworkCalculator3dTiles: }, volume_accuracy=self._calculate_accuracy(points, resolution), algorithm=AlgorithmType.GRID.value, - resolution=resolution - ).to_dict() + resolution=resolution, + calculation_details=grid_cells, + details_type="grid_cells", + details_dimensions={ + "rows": len(y_grid) - 1, + "cols": len(x_grid) - 1, + "cell_size": resolution + }, + valid_unit_count=valid_grid_count, + total_unit_count=total_grid_count, + elevation_statistics=elevation_stats, + volume_distribution=volume_distribution + ) async def _calculate_tin(self, points: np.ndarray, - polygon_coords: List[List[float]], - design_elevation: float) -> EarthworkResult3dTiles: - """三角网法计算""" + polygon_coords: List[List[float]], + design_elevation: float) -> EarthworkResult3dTiles: + """三角网法计算 - 包含三角形单元数据""" polygon_np = np.array(polygon_coords) # 创建Delaunay三角网 triangulation = Delaunay(points[:, :2]) - # 筛选多边形内的三角形 - cut_volume = 0.0 - fill_volume = 0.0 + # 初始化统计信息 + cut_volume_total = 0.0 + fill_volume_total = 0.0 total_area = 0.0 + triangles: List[Dict[str, Any]] = [] + triangle_id = 0 + valid_triangle_count = 0 + total_triangle_count = len(triangulation.simplices) + # 收集高程和体积用于统计 + valid_elevations = [] + cut_volumes_list = [] + fill_volumes_list = [] + triangle_areas = [] + + # 筛选多边形内的三角形 for simplex in triangulation.simplices: triangle_points = points[simplex] triangle_center = triangle_points.mean(axis=0)[:2] # 检查三角形中心是否在多边形内 - if not self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np): - continue + is_in_polygon = self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np) - # 计算三角形面积 - area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2]) - if math.isnan(area) : - continue - total_area += area - - # 计算平均高程(使用三个顶点的高程) - avg_elevation = triangle_points[:, 2].mean() - - # 计算挖填量 - height_diff = design_elevation - avg_elevation - if height_diff > 0: - fill_volume += height_diff * area + if is_in_polygon: + # 计算三角形面积 + area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2]) + if math.isnan(area): + # 跳过无效的三角形 + triangles.append({ + 'triangle_id': triangle_id, + 'vertices': [tuple(point) for point in triangle_points], + 'vertex_indices': simplex.tolist(), + 'cut_volume': 0.0, + 'fill_volume': 0.0, + 'net_volume': 0.0, + 'avg_elevation': float(triangle_points[:, 2].mean()), + 'design_elevation': design_elevation, + 'height_diff': np.nan, + 'area': 0.0, + 'is_valid': False, + 'center_point': tuple(triangle_center) + }) + triangle_id += 1 + continue + + total_area += area + triangle_areas.append(area) + + # 计算平均高程(使用三个顶点的高程) + avg_elevation = triangle_points[:, 2].mean() + height_diff = design_elevation - avg_elevation + + # 计算挖填量 + if height_diff > 0: + fill_volume = height_diff * area + cut_volume = 0.0 + fill_volume_total += fill_volume + fill_volumes_list.append(fill_volume) + else: + cut_volume = abs(height_diff) * area + fill_volume = 0.0 + cut_volume_total += cut_volume + cut_volumes_list.append(cut_volume) + + net_volume = cut_volume - fill_volume + + # 收集统计信息 + valid_elevations.extend(triangle_points[:, 2].tolist()) + valid_triangle_count += 1 + + # 创建三角形数据对象 + triangle = { + 'triangle_id': triangle_id, + 'vertices': [tuple(point) for point in triangle_points], + 'vertex_indices': simplex.tolist(), + 'cut_volume': float(cut_volume), + 'fill_volume': float(fill_volume), + 'net_volume': float(net_volume), + 'avg_elevation': float(avg_elevation), + 'design_elevation': design_elevation, + 'height_diff': float(height_diff), + 'area': float(area), + 'is_valid': True, + 'center_point': tuple(triangle_center) + } else: - cut_volume += abs(height_diff) * area - # if math.isnan(cut_volume) : - # print("cut_volume变为nan") - # if math.isnan(fill_volume) : - # print("fill_volume变为nan") - + # 创建无效三角形数据对象 + area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2]) + if math.isnan(area): + area = 0.0 + + triangle = { + 'triangle_id': triangle_id, + 'vertices': [tuple(point) for point in triangle_points], + 'vertex_indices': simplex.tolist(), + 'cut_volume': 0.0, + 'fill_volume': 0.0, + 'net_volume': 0.0, + 'avg_elevation': float(triangle_points[:, 2].mean()), + 'design_elevation': design_elevation, + 'height_diff': np.nan, + 'area': float(area), + 'is_valid': False, + 'center_point': tuple(triangle_center) + } + + triangles.append(triangle) + triangle_id += 1 + # 计算统计信息 area = self.geometryUtils.calculate_polygon_area(polygon_coords) + # 计算高程统计信息 + if valid_elevations: + elevation_stats = { + "std": float(np.std(valid_elevations)), + "median": float(np.median(valid_elevations)), + "q1": float(np.percentile(valid_elevations, 25)), + "q3": float(np.percentile(valid_elevations, 75)) + } + else: + elevation_stats = {} + + # 计算体积分布信息 + volume_distribution = {} + if cut_volumes_list: + volume_distribution["cut"] = { + "max": float(np.max(cut_volumes_list)), + "min": float(np.min(cut_volumes_list)), + "mean": float(np.mean(cut_volumes_list)), + "total_triangles": len(cut_volumes_list) + } + + if fill_volumes_list: + volume_distribution["fill"] = { + "max": float(np.max(fill_volumes_list)), + "min": float(np.min(fill_volumes_list)), + "mean": float(np.mean(fill_volumes_list)), + "total_triangles": len(fill_volumes_list) + } + + # 计算三角形面积统计 + if triangle_areas: + volume_distribution["area"] = { + "max": float(np.max(triangle_areas)), + "min": float(np.min(triangle_areas)), + "mean": float(np.mean(triangle_areas)), + "total_area": float(np.sum(triangle_areas)) + } + return EarthworkResult3dTiles( - cut_volume=cut_volume, - fill_volume=fill_volume, - net_volume=cut_volume - fill_volume, + cut_volume=cut_volume_total, + fill_volume=fill_volume_total, + net_volume=cut_volume_total - fill_volume_total, area=area, avg_elevation=points[:, 2].mean(), min_elevation=points[:, 2].min(), @@ -693,61 +1329,218 @@ class EarthworkCalculator3dTiles: }, volume_accuracy=self._calculate_accuracy(points, 0), algorithm=AlgorithmType.TIN.value, - resolution=0 - ).to_dict() - + resolution=0, + calculation_details=triangles, + details_type="triangles", + details_dimensions={ + "triangle_count": total_triangle_count, + "valid_triangle_count": valid_triangle_count, + "vertex_count": points.shape[0] + }, + valid_unit_count=valid_triangle_count, + total_unit_count=total_triangle_count, + elevation_statistics=elevation_stats, + volume_distribution=volume_distribution + ) + async def _calculate_prism(self, points: np.ndarray, - polygon_coords: List[List[float]], - design_elevation: float, - resolution: float) -> EarthworkResult3dTiles: - """三棱柱法计算""" + polygon_coords: List[List[float]], + design_elevation: float, + resolution: float) -> EarthworkResult3dTiles: + """三棱柱法计算 - 包含三棱柱单元数据""" # 先创建TIN polygon_np = np.array(polygon_coords) triangulation = Delaunay(points[:, :2]) - cut_volume = 0.0 - fill_volume = 0.0 + cut_volume_total = 0.0 + fill_volume_total = 0.0 total_area = 0.0 + prisms: List[Dict[str, Any]] = [] + triangle_id = 0 + valid_prism_count = 0 + total_prism_count = len(triangulation.simplices) + + # 收集统计信息 + valid_elevations = [] + cut_volumes_list = [] + fill_volumes_list = [] + triangle_areas = [] + height_diffs_list = [] for simplex in triangulation.simplices: triangle_points = points[simplex] triangle_center = triangle_points.mean(axis=0)[:2] - if not self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np): - continue + is_in_polygon = self.geometryUtils.is_point_in_polygon(triangle_center, polygon_np) - # 计算三角形面积 - area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2]) - total_area += area + if is_in_polygon: + # 计算三角形面积 + area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2]) + if math.isnan(area): + # 跳过无效的三角形 + prisms.append({ + 'triangle_id': triangle_id, + 'vertices': [tuple(point) for point in triangle_points], + 'vertex_indices': simplex.tolist(), + 'cut_volume': 0.0, + 'fill_volume': 0.0, + 'net_volume': 0.0, + 'avg_elevation': float(triangle_points[:, 2].mean()), + 'design_elevation': design_elevation, + 'height_diffs': [float(design_elevation - triangle_points[i, 2]) for i in range(3)], + 'area': 0.0, + 'is_valid': False, + 'center_point': tuple(triangle_center), + 'edge_volumes': [0.0, 0.0, 0.0] + }) + triangle_id += 1 + continue + + total_area += area + triangle_areas.append(area) + avg_elevation = triangle_points[:, 2].mean() + + # 计算三棱柱体积 + triangle_cut_volume = 0.0 + triangle_fill_volume = 0.0 + edge_volumes = [] + height_diffs = [] + + for i in range(3): + j = (i + 1) % 3 + # 计算边的长度 + edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2]) + + # 计算边两端点的挖填高度 + height_i = design_elevation - triangle_points[i, 2] + height_j = design_elevation - triangle_points[j, 2] + height_diffs.extend([float(height_i), float(height_j)]) + height_diffs_list.extend([float(height_i), float(height_j)]) + + # 计算边的平均挖填高度 + avg_height = (abs(height_i) + abs(height_j)) / 2 + + # 计算边的面积(假设边宽度为resolution) + edge_area = edge_length * resolution + edge_volume = avg_height * edge_area + edge_volumes.append(float(edge_volume)) + + if height_i > 0 or height_j > 0: + triangle_fill_volume += edge_volume + fill_volume_total += edge_volume + fill_volumes_list.append(edge_volume) + else: + triangle_cut_volume += edge_volume + cut_volume_total += edge_volume + cut_volumes_list.append(edge_volume) + + # 去重高度差 + unique_height_diffs = list(set([height_diffs[0], height_diffs[2], height_diffs[4]])) + + # 收集统计信息 + valid_elevations.extend(triangle_points[:, 2].tolist()) + valid_prism_count += 1 + + # 创建三棱柱数据对象 + prism = { + 'triangle_id': triangle_id, + 'vertices': [tuple(point) for point in triangle_points], + 'vertex_indices': simplex.tolist(), + 'cut_volume': float(triangle_cut_volume), + 'fill_volume': float(triangle_fill_volume), + 'net_volume': float(triangle_cut_volume - triangle_fill_volume), + 'avg_elevation': float(avg_elevation), + 'design_elevation': design_elevation, + 'height_diffs': unique_height_diffs, + 'area': float(area), + 'is_valid': True, + 'center_point': tuple(triangle_center), + 'edge_volumes': edge_volumes + } + else: + # 创建无效三棱柱数据对象 + area = self.geometryUtils.calculate_triangle_area(triangle_points[:, :2]) + if math.isnan(area): + area = 0.0 + + # 计算高度差 + height_diffs = [] + for i in range(3): + height_i = design_elevation - triangle_points[i, 2] + height_diffs.append(float(height_i)) + + prism = { + 'triangle_id': triangle_id, + 'vertices': [tuple(point) for point in triangle_points], + 'vertex_indices': simplex.tolist(), + 'cut_volume': 0.0, + 'fill_volume': 0.0, + 'net_volume': 0.0, + 'avg_elevation': float(triangle_points[:, 2].mean()), + 'design_elevation': design_elevation, + 'height_diffs': height_diffs, + 'area': float(area), + 'is_valid': False, + 'center_point': tuple(triangle_center), + 'edge_volumes': [0.0, 0.0, 0.0] + } - # 对于每个三角形,计算三棱柱体积 - # 这里简化处理,使用梯形公式 - for i in range(3): - j = (i + 1) % 3 - # 计算边的长度 - edge_length = np.linalg.norm(triangle_points[i, :2] - triangle_points[j, :2]) - - # 计算边两端点的挖填高度 - height_i = design_elevation - triangle_points[i, 2] - height_j = design_elevation - triangle_points[j, 2] - - # 计算边的平均挖填高度 - avg_height = (abs(height_i) + abs(height_j)) / 2 - - # 计算边的面积(假设边宽度为resolution) - edge_area = edge_length * resolution - - if height_i > 0 or height_j > 0: - fill_volume += avg_height * edge_area - else: - cut_volume += avg_height * edge_area + prisms.append(prism) + triangle_id += 1 area = self.geometryUtils.calculate_polygon_area(polygon_coords) + # 计算高程统计信息 + if valid_elevations: + elevation_stats = { + "std": float(np.std(valid_elevations)), + "median": float(np.median(valid_elevations)), + "q1": float(np.percentile(valid_elevations, 25)), + "q3": float(np.percentile(valid_elevations, 75)) + } + else: + elevation_stats = {} + + # 计算高度差统计信息 + if height_diffs_list: + elevation_stats["height_diff"] = { + "max": float(np.max(height_diffs_list)), + "min": float(np.min(height_diffs_list)), + "mean": float(np.mean(height_diffs_list)), + "std": float(np.std(height_diffs_list)) + } + + # 计算体积分布信息 + volume_distribution = {} + if cut_volumes_list: + volume_distribution["cut"] = { + "max": float(np.max(cut_volumes_list)), + "min": float(np.min(cut_volumes_list)), + "mean": float(np.mean(cut_volumes_list)), + "total_edges": len(cut_volumes_list) + } + + if fill_volumes_list: + volume_distribution["fill"] = { + "max": float(np.max(fill_volumes_list)), + "min": float(np.min(fill_volumes_list)), + "mean": float(np.mean(fill_volumes_list)), + "total_edges": len(fill_volumes_list) + } + + # 计算三角形面积统计 + if triangle_areas: + volume_distribution["area"] = { + "max": float(np.max(triangle_areas)), + "min": float(np.min(triangle_areas)), + "mean": float(np.mean(triangle_areas)), + "total_area": float(np.sum(triangle_areas)) + } + return EarthworkResult3dTiles( - cut_volume=cut_volume, - fill_volume=fill_volume, - net_volume=cut_volume - fill_volume, + cut_volume=cut_volume_total, + fill_volume=fill_volume_total, + net_volume=cut_volume_total - fill_volume_total, area=area, avg_elevation=points[:, 2].mean(), min_elevation=points[:, 2].min(), @@ -759,9 +1552,21 @@ class EarthworkCalculator3dTiles: }, volume_accuracy=self._calculate_accuracy(points, resolution), algorithm=AlgorithmType.PRISM.value, - resolution=resolution - ).to_dict() - + resolution=resolution, + calculation_details=prisms, + details_type="prisms", + details_dimensions={ + "prism_count": total_prism_count, + "valid_prism_count": valid_prism_count, + "vertex_count": points.shape[0], + "edge_resolution": resolution + }, + valid_unit_count=valid_prism_count, + total_unit_count=total_prism_count, + elevation_statistics=elevation_stats, + volume_distribution=volume_distribution + ) + def _calculate_accuracy(self, points: np.ndarray, resolution: float) -> float: """计算精度评估""" if points.shape[0] < 10: