ai_project_v1/b3dm/terrain_api.py
2026-01-19 10:42:21 +08:00

419 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sanic import Blueprint, Request, json
from pydantic import BaseModel, Field, field_validator
from typing import List, Optional, Dict, Any
import numpy as np
import logging
import time
import uuid
import urllib.parse
import threading
import os
from b3dm.terrain_calculator import TerrainCalculator
terrain_bp = Blueprint("terrain", url_prefix="")
MINIO_SUB_PATH = "slopeAspectPng"
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 请求模型
class NormalVector(BaseModel):
"""法向量模型"""
nx: float = Field(..., description="法向量X分量")
ny: float = Field(..., description="法向量Y分量")
nz: float = Field(..., description="法向量Z分量")
@field_validator('nx', 'ny', 'nz')
def check_finite(cls, v):
if not np.isfinite(v):
raise ValueError(f"值必须是有限数字,得到: {v}")
return v
def to_list(self):
return [self.nx, self.ny, self.nz]
class BatchRequest(BaseModel):
"""批量请求模型"""
vectors: List[List[float]] = Field(..., description="法向量列表")
@field_validator('vectors')
def validate_vectors(cls, v):
if len(v) > 1000:
raise ValueError("批量处理最多支持1000个向量")
for vec in v:
if len(vec) != 3:
raise ValueError("每个向量必须是长度为3的列表")
if not all(isinstance(x, (int, float)) for x in vec):
raise ValueError("向量元素必须是数字")
return v
class PointItem(BaseModel):
"""单个点模型"""
x: float = Field(..., description="x坐标")
y: float = Field(..., description="y坐标")
z: float = Field(..., description="z坐标")
class PointRequest(BaseModel):
points: List[PointItem] = Field(..., description="点列表")
url: str = Field(..., description="URL地址")
@field_validator('points')
def validate_points_count(cls, v):
if len(v) > 10:
raise ValueError("批量处理最多支持10个点")
return v
class PreloadRequest(BaseModel):
url: str = Field(..., description="URL地址")
class AnalysisConfig(BaseModel):
"""分析配置"""
classify: bool = Field(default=True, description="是否进行分类")
include_percent: bool = Field(default=True, description="是否包含坡度百分比")
include_direction: bool = Field(default=True, description="是否包含方向描述")
# 中间件:请求计时
@terrain_bp.middleware("request")
async def add_start_time(request: Request):
request.ctx.start_time = time.time()
@terrain_bp.middleware("response")
async def add_response_time(request: Request, response):
if hasattr(request.ctx, "start_time"):
process_time = (time.time() - request.ctx.start_time) * 1000
response.headers["X-Process-Time"] = f"{process_time:.2f}ms"
@terrain_bp.post("/api/v1/calculate/slope")
async def calculate_slope(request: Request):
"""计算坡度"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
vector = NormalVector(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 计算坡度
result = TerrainCalculator.calculate_slope(vector.to_list())
# 检查是否有错误
if "error" in result and result["error"]:
return json(result, status=400)
return json({
"success": True,
"data": result,
"request": {
"input_vector": vector.to_list(),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"坡度计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.post("/api/v1/calculate/aspect")
async def calculate_aspect1(request: Request):
"""计算坡向"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
vector = NormalVector(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 计算坡向
result = TerrainCalculator.calculate_aspect(vector.to_list())
# 检查是否有错误
if "error" in result and result["error"]:
return json(result, status=400)
return json({
"success": True,
"data": result,
"request": {
"input_vector": vector.to_list(),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"坡向计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.post("/api/v1/calculate/preload3dTiles")
async def preload_3dtiles(request: Request):
"""预加载3dtiles地图数据"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
vector = PreloadRequest(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 创建并启动线程
script_dir = os.path.dirname(os.path.abspath(__file__))
thread1 = threading.Thread(target=TerrainCalculator.preload_3dtiles, args=(vector.url))
# 启动线程
thread1.start()
url_prefix = extract_and_rebuild_url(vector.url)
return json({
"success": True,
"data": f"{script_dir}/data_3dtiles",
"request": {
"input_vector": vector.model_dump(),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"坡向计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.post("/api/v1/calculate/slopeAspect")
async def calculate_slopeAspect(request: Request):
"""生成坡向坡度俯视图"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
vector = PointRequest(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 生成坡向坡度俯视图
region_coords = [(point.x, point.y, point.z) for point in vector.points]
overall_3d_png_name = f"o_dem_{uuid.uuid4().hex[:8]}_slopeAspect.png"
# 创建并启动线程
thread1 = threading.Thread(target=TerrainCalculator.generate_slopeAspect_3d_overlook, args=(region_coords, vector.url, overall_3d_png_name, MINIO_SUB_PATH))
# 启动线程
thread1.start()
url_prefix = extract_and_rebuild_url(vector.url)
return json({
"success": True,
"data": f"{url_prefix}/{MINIO_SUB_PATH}/{overall_3d_png_name}",
"request": {
"input_vector": vector.model_dump(),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"坡向计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.post("/api/v1/calculate/slopeAspectTif")
async def calculate_slopeAspect_tif(request: Request):
"""生成坡向坡度tif文件"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
vector = PointRequest(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 生成坡向坡度俯视图
region_coords = [(point.x, point.y, point.z) for point in vector.points]
slope_aspect_tif_name = f"o_dem_{uuid.uuid4().hex[:8]}_slopeAspect.tif"
# 创建并启动线程
thread1 = threading.Thread(target=TerrainCalculator.generate_slopeAspect_tif, args=(region_coords, vector.url, slope_aspect_tif_name, MINIO_SUB_PATH))
# 启动线程
thread1.start()
url_prefix = extract_and_rebuild_url(vector.url)
return json({
"success": True,
"data": f"{url_prefix}/{MINIO_SUB_PATH}/{slope_aspect_tif_name}",
"request": {
"input_vector": vector.model_dump(),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"坡向计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.post("/api/v1/calculate/both")
async def calculate_both(request: Request):
"""同时计算坡度和坡向"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
vector = NormalVector(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 计算坡度和坡向
result = TerrainCalculator.calculate_slope_aspect(vector.to_list())
# 检查是否有错误
if "error" in result and result["error"]:
return json(result, status=400)
return json({
"success": True,
"data": result,
"request": {
"input_vector": vector.to_list(),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"综合计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.post("/api/v1/calculate/batch")
async def batch_calculate(request: Request):
"""批量计算"""
try:
data = request.json
if not data:
return json({"error": "请求体不能为空"}, status=400)
# 验证输入
try:
batch_request = BatchRequest(**data)
except Exception as e:
return json({"error": f"输入验证失败: {str(e)}"}, status=400)
# 批量计算
start_time = time.time()
result = TerrainCalculator.batch_calculate(batch_request.vectors)
process_time = (time.time() - start_time) * 1000
if "error" in result and result["error"]:
return json(result, status=400)
return json({
"success": True,
"data": result,
"performance": {
"process_time_ms": process_time,
"vectors_per_second": len(batch_request.vectors) / (process_time / 1000) if process_time > 0 else 0
},
"request": {
"vector_count": len(batch_request.vectors),
"timestamp": time.time()
}
})
except Exception as e:
logger.error(f"批量计算API错误: {e}")
return json({
"error": f"服务器内部错误: {str(e)}",
"success": False
}, status=500)
@terrain_bp.get("/api/v1/example")
async def get_examples(request: Request):
"""获取示例数据"""
examples = {
"flat": {
"nx": 0.0,
"ny": 0.0,
"nz": 1.0,
"expected_slope": 0.0,
"description": "完全水平面"
},
"north_slope_30": {
"nx": 0.0,
"ny": -0.5,
"nz": 0.8660254,
"expected_slope": 30.0,
"expected_aspect": 0.0,
"description": "朝北30度斜坡"
},
"east_slope_45": {
"nx": 0.7071068,
"ny": 0.0,
"nz": 0.7071068,
"expected_slope": 45.0,
"expected_aspect": 90.0,
"description": "朝东45度斜坡"
},
"vertical": {
"nx": 1.0,
"ny": 0.0,
"nz": 0.0,
"expected_slope": 90.0,
"description": "垂直面"
}
}
return json({
"examples": examples,
"count": len(examples)
})
def extract_and_rebuild_url(url):
"""提取URL的三部分并重建"""
# 解析URL
parsed = urllib.parse.urlparse(url)
# 1. 提取协议部分 (http/https)
scheme = parsed.scheme or "http" # 如果没有协议默认用http
# 2. 提取IP端口/主机部分
netloc = parsed.netloc
# 3. 提取第一个路径分段
path = parsed.path.strip("/") # 去掉首尾的斜杠
path_parts = path.split("/")
if path_parts and path_parts[0]:
first_segment = path_parts[0]
else:
first_segment = ""
# 重建URL
if first_segment:
rebuilt_url = f"{scheme}://{netloc}/{first_segment}"
else:
rebuilt_url = f"{scheme}://{netloc}"
return rebuilt_url