# pip install fastapi uvicorn pdal pyvista numpy from sanic import Blueprint, Request, json from sanic.response import text from typing import Dict, Any import logging from b3dm.earthwork_calculator_point_cloud import EarthworkCalculatorPointCloud # 导入计算模块 from b3dm.earthwork_calculator_3d_tiles import EarthworkCalculator3dTiles, AlgorithmType, EarthworkResult3dTiles from b3dm.tileset_data_source import TilesetDataSource earthwork_bp = Blueprint("earthwork", url_prefix="") # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 全局变量 _calculator_point_cloud = None _calculator_3d_tiles = None _data_source_3d_tiles = None # 初始化函数 async def init_app(): """初始化应用""" global _data_source_3d_tiles, _calculator_3d_tiles, _calculator_point_cloud try: # 配置数据源 tileset_path = "./data/3dtiles/tileset.json" # 初始化数据源 _data_source_3d_tiles = TilesetDataSource(tileset_path) await _data_source_3d_tiles.initialize() # 初始化计算器-3dTiles _calculator_3d_tiles = EarthworkCalculator3dTiles(_data_source_3d_tiles) # 初始化计算器-点云 point_cloud_path = "./data/pointCloud/simulated_points.laz" _calculator_point_cloud = EarthworkCalculatorPointCloud(point_cloud_path) logger.info("土方量计算器初始化完成") except ImportError as e: logger.error(f"依赖库缺失: {str(e)}") raise except Exception as e: logger.error(f"初始化失败: {str(e)}") raise # 土方量计算接口-3dTiles @earthwork_bp.post("/api/v1/calc/earthwork3dTiles") async def calc_earthwork(request: Request): """ 土方量计算接口 请求参数示例: { "polygonCoords": [[120.1, 30.1], [120.2, 30.1], [120.2, 30.2], [120.1, 30.2]], "designElevation": 50.0, "algorithm": "tin", "resolution": 1.0, "crs": "EPSG:4326", "interpolationMethod": "linear" } """ try: # 1. 接收前端传参 data = request.json if not data: return _error_response("请求参数不能为空", 400) # 2. 提取参数 polygon_coords = data.get("polygonCoords") design_elevation = data.get("designElevation") if not polygon_coords: return _error_response("多边形坐标不能为空", 400) if design_elevation is None: return _error_response("设计高程不能为空", 400) # 3. 可选参数 algorithm = data.get("algorithm", "tin") resolution = data.get("resolution", 1.0) crs = data.get("crs", "EPSG:4326") interpolation_method = data.get("interpolationMethod", "linear") # 4. 参数验证 if len(polygon_coords) < 3: return _error_response("多边形至少需要3个点", 400) # 检查多边形是否闭合,如不闭合则自动闭合 if polygon_coords[0] != polygon_coords[-1]: polygon_coords.append(polygon_coords[0]) # 算法验证 if algorithm not in ["grid", "tin", "prism"]: return _error_response("算法必须是 grid, tin 或 prism", 400) # 分辨率验证 if resolution <= 0 or resolution > 100: return _error_response("分辨率必须在0-100米之间", 400) # 5. 确保计算器已初始化 if _calculator_3d_tiles is None: await init_app() # 6. 执行计算 algorithm_type = AlgorithmType(algorithm) result = await _calculator_3d_tiles.calculate( polygon_coords=polygon_coords, design_elevation=design_elevation, algorithm=algorithm_type, resolution=resolution, target_crs=crs, interpolation_method=interpolation_method ) # 7. 返回成功响应 return _success_response(result) except ValueError as e: logger.warning(f"参数验证失败: {str(e)}") return _error_response(f"参数错误: {str(e)}", 400) except Exception as e: logger.error(f"计算失败: {str(e)}") return _error_response(f"服务器内部错误: {str(e)}", 500) # 验证接口 @earthwork_bp.post("/api/v1/calc/earthwork3dTiles/validate") async def validate_earthwork(request: Request): """验证计算参数接口""" try: # 1. 接收前端传参 data = request.json if not data: return _error_response("请求参数不能为空", 400) # 2. 提取参数 polygon_coords = data.get("polygonCoords") if not polygon_coords: return _error_response("多边形坐标不能为空", 400) # 3. 参数验证 if len(polygon_coords) < 3: return _error_response("多边形至少需要3个点", 400) # 检查多边形是否闭合,如不闭合则自动闭合 if polygon_coords[0] != polygon_coords[-1]: polygon_coords.append(polygon_coords[0]) # 4. 确保计算器已初始化 if _calculator_3d_tiles is None: await init_app() # 5. 执行验证 validation_result = await _calculator_3d_tiles.validate(polygon_coords) # 6. 返回结果 return _success_response(validation_result) except Exception as e: logger.error(f"验证失败: {str(e)}") return _error_response(f"验证失败: {str(e)}", 400) # 获取算法列表接口 @earthwork_bp.get("/api/v1/calc/earthwork3dTiles/algorithms") async def get_algorithms(request: Request): """获取支持的算法列表接口""" try: algorithms = [ { "id": "grid", "name": "格网法", "description": "将计算区域划分为规则格网,通过插值计算每个格网的高程变化,适合平坦或规则地形", "accuracy": "中等", "performance": "快速", "parameters": { "resolution": { "name": "格网分辨率", "description": "格网大小(米),影响计算精度和性能", "default": 1.0, "range": [0.1, 10.0] } } }, { "id": "tin", "name": "三角网法", "description": "基于不规则三角网(TIN)构建地形表面,计算每个三角形的体积变化,适合复杂地形", "accuracy": "高", "performance": "中等", "parameters": { "resolution": { "name": "不适用", "description": "三角网法不使用固定的分辨率参数", "default": None } } }, { "id": "prism", "name": "三棱柱法", "description": "结合三角网和垂直棱柱的高精度算法,计算每个三棱柱的体积,精度最高", "accuracy": "最高", "performance": "较慢", "parameters": { "resolution": { "name": "棱柱宽度", "description": "棱柱宽度(米),影响计算精度", "default": 1.0, "range": [0.1, 5.0] } } } ] return _success_response(algorithms) except Exception as e: logger.error(f"获取算法列表失败: {str(e)}") return _error_response(f"获取算法列表失败: {str(e)}", 500) # 批量计算接口 @earthwork_bp.post("/api/v1/calc/earthwork3dTiles/batch") async def batch_calc_earthwork(request: Request): """批量土方量计算接口""" try: # 1. 接收前端传参 data = request.json if not data: return _error_response("请求参数不能为空", 400) calculations = data.get("calculations", []) if not calculations: return _error_response("计算任务列表不能为空", 400) if len(calculations) > 100: return _error_response("批量计算数量超过限制(最多100个)", 400) # 2. 确保计算器已初始化 if _calculator_3d_tiles is None: await init_app() # 3. 执行批量计算 results = [] errors = [] for i, calc_data in enumerate(calculations): try: # 提取参数 polygon_coords = calc_data.get("polygonCoords") design_elevation = calc_data.get("designElevation") if not polygon_coords or design_elevation is None: errors.append({ "index": i, "error": "缺少必要参数" }) continue # 参数验证 if len(polygon_coords) < 3: errors.append({ "index": i, "error": "多边形至少需要3个点" }) continue # 检查多边形是否闭合 if polygon_coords[0] != polygon_coords[-1]: polygon_coords.append(polygon_coords[0]) # 可选参数 algorithm = calc_data.get("algorithm", "tin") resolution = calc_data.get("resolution", 1.0) crs = calc_data.get("crs", "EPSG:4326") interpolation_method = calc_data.get("interpolationMethod", "linear") # 执行计算 algorithm_type = AlgorithmType(algorithm) result = await _calculator_3d_tiles.calculate( polygon_coords=polygon_coords, design_elevation=design_elevation, algorithm=algorithm_type, resolution=resolution, target_crs=crs, interpolation_method=interpolation_method ) results.append(result) except Exception as e: errors.append({ "index": i, "error": str(e), "polygon": polygon_coords if 'polygon_coords' in locals() else None }) continue # 4. 返回结果 batch_result = { "results": results, "errors": errors, "summary": { "total": len(calculations), "success": len(results), "failed": len(errors), "successRate": f"{(len(results)/len(calculations)*100):.1f}%" if calculations else "0%" } } message = f"批量计算完成,成功 {len(results)} 个,失败 {len(errors)} 个" return _success_response(batch_result) except Exception as e: logger.error(f"批量计算失败: {str(e)}") return _error_response(f"批量计算失败: {str(e)}", 500) # 核心接口:土方量计算-点云 @earthwork_bp.post("/api/v1/calc/earthworkPointCloud") async def calc_earthwork_point_cloud(request: Request): try: # 1. 接收前端传参 data = request.json if not data: return json({ "code": 400, "msg": "请求参数不能为空", "data": None }, status=400) polygon_coords = data.get("polygonCoords") # 计算区域多边形坐标 design_elev = data.get("designElevation") # 设计高程 crs = data.get("crs", "EPSG:4326") # 坐标系,默认WGS84 # 2. 确保计算器已初始化 if _calculator_point_cloud is None: await init_app() result = _calculator_point_cloud.calculate_earthwork(polygon_coords=polygon_coords, design_elev=design_elev, crs=crs) # 3. 处理结果 if not result["success"]: return _error_response(result["error"], 400) # 4. 格式化结果 formatted_result = _calculator_point_cloud.format_result(result) # 5. 返回成功响应 return _success_response(formatted_result) except Exception as e: logger.error(f"服务器错误: {str(e)}") return _error_response(f"服务器内部错误: {str(e)}", 500) def _success_response(data: Dict[str, Any]) -> json: """成功响应""" return json({ "code": 200, "msg": "计算成功", "data": data }) def _error_response(message: str, status_code: int = 400) -> json: """错误响应""" return json({ "code": status_code, "msg": message, "data": None }, status=status_code)