ai_project_v1/b3dm/earthwork_api.py

377 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# pip install fastapi uvicorn pdal pyvista numpy
from sanic import Blueprint, Request, json
from sanic.response import text
from typing import Dict, Any
import logging
from b3dm.earthwork_calculator_point_cloud import EarthworkCalculatorPointCloud
# 导入计算模块
from b3dm.earthwork_calculator_3d_tiles import EarthworkCalculator3dTiles, AlgorithmType, EarthworkResult3dTiles
from b3dm.tileset_data_source import TilesetDataSource
earthwork_bp = Blueprint("earthwork", url_prefix="")
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 全局变量
_calculator_point_cloud = None
_calculator_3d_tiles = None
_data_source_3d_tiles = None
# 初始化函数
async def init_app():
"""初始化应用"""
global _data_source_3d_tiles, _calculator_3d_tiles, _calculator_point_cloud
try:
# 配置数据源
tileset_path = "./data/3dtiles/tileset.json"
# 初始化数据源
_data_source_3d_tiles = TilesetDataSource(tileset_path)
await _data_source_3d_tiles.initialize()
# 初始化计算器-3dTiles
_calculator_3d_tiles = EarthworkCalculator3dTiles(_data_source_3d_tiles)
# 初始化计算器-点云
point_cloud_path = "./data/pointCloud/simulated_points.laz"
_calculator_point_cloud = EarthworkCalculatorPointCloud(point_cloud_path)
logger.info("土方量计算器初始化完成")
except ImportError as e:
logger.error(f"依赖库缺失: {str(e)}")
raise
except Exception as e:
logger.error(f"初始化失败: {str(e)}")
raise
# 土方量计算接口-3dTiles
@earthwork_bp.post("/api/v1/calc/earthwork3dTiles")
async def calc_earthwork(request: Request):
"""
土方量计算接口
请求参数示例:
{
"polygonCoords": [[120.1, 30.1], [120.2, 30.1], [120.2, 30.2], [120.1, 30.2]],
"designElevation": 50.0,
"algorithm": "tin",
"resolution": 1.0,
"crs": "EPSG:4326",
"interpolationMethod": "linear"
}
"""
try:
# 1. 接收前端传参
data = request.json
if not data:
return _error_response("请求参数不能为空", 400)
# 2. 提取参数
polygon_coords = data.get("polygonCoords")
design_elevation = data.get("designElevation")
if not polygon_coords:
return _error_response("多边形坐标不能为空", 400)
if design_elevation is None:
return _error_response("设计高程不能为空", 400)
# 3. 可选参数
algorithm = data.get("algorithm", "tin")
resolution = data.get("resolution", 1.0)
crs = data.get("crs", "EPSG:4326")
interpolation_method = data.get("interpolationMethod", "linear")
# 4. 参数验证
if len(polygon_coords) < 3:
return _error_response("多边形至少需要3个点", 400)
# 检查多边形是否闭合,如不闭合则自动闭合
if polygon_coords[0] != polygon_coords[-1]:
polygon_coords.append(polygon_coords[0])
# 算法验证
if algorithm not in ["grid", "tin", "prism"]:
return _error_response("算法必须是 grid, tin 或 prism", 400)
# 分辨率验证
if resolution <= 0 or resolution > 100:
return _error_response("分辨率必须在0-100米之间", 400)
# 5. 确保计算器已初始化
if _calculator_3d_tiles is None:
await init_app()
# 6. 执行计算
algorithm_type = AlgorithmType(algorithm)
result = await _calculator_3d_tiles.calculate(
polygon_coords=polygon_coords,
design_elevation=design_elevation,
algorithm=algorithm_type,
resolution=resolution,
target_crs=crs,
interpolation_method=interpolation_method
)
# 7. 返回成功响应
return _success_response(result)
except ValueError as e:
logger.warning(f"参数验证失败: {str(e)}")
return _error_response(f"参数错误: {str(e)}", 400)
except Exception as e:
logger.error(f"计算失败: {str(e)}")
return _error_response(f"服务器内部错误: {str(e)}", 500)
# 验证接口
@earthwork_bp.post("/api/v1/calc/earthwork3dTiles/validate")
async def validate_earthwork(request: Request):
"""验证计算参数接口"""
try:
# 1. 接收前端传参
data = request.json
if not data:
return _error_response("请求参数不能为空", 400)
# 2. 提取参数
polygon_coords = data.get("polygonCoords")
if not polygon_coords:
return _error_response("多边形坐标不能为空", 400)
# 3. 参数验证
if len(polygon_coords) < 3:
return _error_response("多边形至少需要3个点", 400)
# 检查多边形是否闭合,如不闭合则自动闭合
if polygon_coords[0] != polygon_coords[-1]:
polygon_coords.append(polygon_coords[0])
# 4. 确保计算器已初始化
if _calculator_3d_tiles is None:
await init_app()
# 5. 执行验证
validation_result = await _calculator_3d_tiles.validate(polygon_coords)
# 6. 返回结果
return _success_response(validation_result)
except Exception as e:
logger.error(f"验证失败: {str(e)}")
return _error_response(f"验证失败: {str(e)}", 400)
# 获取算法列表接口
@earthwork_bp.get("/api/v1/calc/earthwork3dTiles/algorithms")
async def get_algorithms(request: Request):
"""获取支持的算法列表接口"""
try:
algorithms = [
{
"id": "grid",
"name": "格网法",
"description": "将计算区域划分为规则格网,通过插值计算每个格网的高程变化,适合平坦或规则地形",
"accuracy": "中等",
"performance": "快速",
"parameters": {
"resolution": {
"name": "格网分辨率",
"description": "格网大小(米),影响计算精度和性能",
"default": 1.0,
"range": [0.1, 10.0]
}
}
},
{
"id": "tin",
"name": "三角网法",
"description": "基于不规则三角网TIN构建地形表面计算每个三角形的体积变化适合复杂地形",
"accuracy": "",
"performance": "中等",
"parameters": {
"resolution": {
"name": "不适用",
"description": "三角网法不使用固定的分辨率参数",
"default": None
}
}
},
{
"id": "prism",
"name": "三棱柱法",
"description": "结合三角网和垂直棱柱的高精度算法,计算每个三棱柱的体积,精度最高",
"accuracy": "最高",
"performance": "较慢",
"parameters": {
"resolution": {
"name": "棱柱宽度",
"description": "棱柱宽度(米),影响计算精度",
"default": 1.0,
"range": [0.1, 5.0]
}
}
}
]
return _success_response(algorithms)
except Exception as e:
logger.error(f"获取算法列表失败: {str(e)}")
return _error_response(f"获取算法列表失败: {str(e)}", 500)
# 批量计算接口
@earthwork_bp.post("/api/v1/calc/earthwork3dTiles/batch")
async def batch_calc_earthwork(request: Request):
"""批量土方量计算接口"""
try:
# 1. 接收前端传参
data = request.json
if not data:
return _error_response("请求参数不能为空", 400)
calculations = data.get("calculations", [])
if not calculations:
return _error_response("计算任务列表不能为空", 400)
if len(calculations) > 100:
return _error_response("批量计算数量超过限制最多100个", 400)
# 2. 确保计算器已初始化
if _calculator_3d_tiles is None:
await init_app()
# 3. 执行批量计算
results = []
errors = []
for i, calc_data in enumerate(calculations):
try:
# 提取参数
polygon_coords = calc_data.get("polygonCoords")
design_elevation = calc_data.get("designElevation")
if not polygon_coords or design_elevation is None:
errors.append({
"index": i,
"error": "缺少必要参数"
})
continue
# 参数验证
if len(polygon_coords) < 3:
errors.append({
"index": i,
"error": "多边形至少需要3个点"
})
continue
# 检查多边形是否闭合
if polygon_coords[0] != polygon_coords[-1]:
polygon_coords.append(polygon_coords[0])
# 可选参数
algorithm = calc_data.get("algorithm", "tin")
resolution = calc_data.get("resolution", 1.0)
crs = calc_data.get("crs", "EPSG:4326")
interpolation_method = calc_data.get("interpolationMethod", "linear")
# 执行计算
algorithm_type = AlgorithmType(algorithm)
result = await _calculator_3d_tiles.calculate(
polygon_coords=polygon_coords,
design_elevation=design_elevation,
algorithm=algorithm_type,
resolution=resolution,
target_crs=crs,
interpolation_method=interpolation_method
)
results.append(result)
except Exception as e:
errors.append({
"index": i,
"error": str(e),
"polygon": polygon_coords if 'polygon_coords' in locals() else None
})
continue
# 4. 返回结果
batch_result = {
"results": results,
"errors": errors,
"summary": {
"total": len(calculations),
"success": len(results),
"failed": len(errors),
"successRate": f"{(len(results)/len(calculations)*100):.1f}%" if calculations else "0%"
}
}
message = f"批量计算完成,成功 {len(results)} 个,失败 {len(errors)}"
return _success_response(batch_result)
except Exception as e:
logger.error(f"批量计算失败: {str(e)}")
return _error_response(f"批量计算失败: {str(e)}", 500)
# 核心接口:土方量计算-点云
@earthwork_bp.post("/api/v1/calc/earthworkPointCloud")
async def calc_earthwork_point_cloud(request: Request):
try:
# 1. 接收前端传参
data = request.json
if not data:
return json({
"code": 400,
"msg": "请求参数不能为空",
"data": None
}, status=400)
polygon_coords = data.get("polygonCoords") # 计算区域多边形坐标
design_elev = data.get("designElevation") # 设计高程
crs = data.get("crs", "EPSG:4326") # 坐标系默认WGS84
# 2. 确保计算器已初始化
if _calculator_point_cloud is None:
await init_app()
result = _calculator_point_cloud.calculate_earthwork(polygon_coords=polygon_coords, design_elev=design_elev, crs=crs)
# 3. 处理结果
if not result["success"]:
return _error_response(result["error"], 400)
# 4. 格式化结果
formatted_result = _calculator_point_cloud.format_result(result)
# 5. 返回成功响应
return _success_response(formatted_result)
except Exception as e:
logger.error(f"服务器错误: {str(e)}")
return _error_response(f"服务器内部错误: {str(e)}", 500)
def _success_response(data: Dict[str, Any]) -> json:
"""成功响应"""
return json({
"code": 200,
"msg": "计算成功",
"data": data
})
def _error_response(message: str, status_code: int = 400) -> json:
"""错误响应"""
return json({
"code": status_code,
"msg": message,
"data": None
}, status=status_code)