ai_project_v1/b3dm/terrain_calculator.py
2026-01-29 11:51:20 +08:00

392 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
from typing import List, Tuple, Dict, Any
import logging
import os
import uuid
import b3dm.data_3dtiles_to_dem as data_3dtiles_to_dem
import b3dm.slope_aspect_img as slope_aspect_img
import b3dm.slope_aspect_tif as slope_aspect_tif
from b3dm.tileset_data_source import TilesetDataSource
logger = logging.getLogger(__name__)
_data_source = None
class TerrainCalculator:
"""地形坡度和坡向计算器"""
def preload_3dtiles(url: str) :
# 下载3dtiles地图数据
_data_source = TilesetDataSource(url)
_data_source.dowload_map_data(url)
if not _data_source.tileset_path :
logger.info(f"下载地图数据失败: {url}")
return "下载地图数据失败", None
def generate_slopeAspect_3d_overlook(region_coords, url, overall_3d_png_name, minio_sub_path) :
# 下载3dtiles地图数据
_data_source = TilesetDataSource(url)
_data_source.dowload_map_data(url)
if not _data_source.tileset_path :
logger.info(f"下载地图数据失败: {url},{region_coords}")
return "下载地图数据失败", None
tileset_path = _data_source.tileset_path
script_dir = os.path.dirname(os.path.abspath(__file__))
dem_path = os.path.join(script_dir, f"o_dem_{uuid.uuid4().hex[:8]}.tif")
data_3dtiles_to_dem.generate_dem(tileset_path, dem_path, region_coords)
if not os.path.exists(dem_path) :
logger.info(f"生成坡度坡向俯视图失败: {url},{region_coords}")
return "生成坡度坡向俯视图失败", None
overall_3d_png_path = os.path.join(script_dir, overall_3d_png_name)
slope_aspect_img.read_slope_aspect_by_dem(dem_path, overall_3d_png_path)
logger.info(f"生成成功: {url},{region_coords},{overall_3d_png_path}")
entry_bucket, _ = _data_source.parse_minio_url(url);
success, minio_path = _data_source.upload_file(entry_bucket, f"{minio_sub_path}/{overall_3d_png_name}", overall_3d_png_path)
if success :
return "生成成功", minio_path
else :
return "生成失败", None
def generate_slopeAspect_tif(region_coords, url, slope_aspect_tif_name, minio_sub_path) :
# 下载3dtiles地图数据
_data_source = TilesetDataSource(url)
_data_source.dowload_map_data(url)
if not _data_source.tileset_path :
logger.info(f"下载地图数据失败: {url},{region_coords}")
return "下载地图数据失败", None
tileset_path = _data_source.tileset_path
script_dir = os.path.dirname(os.path.abspath(__file__))
dem_path = os.path.join(script_dir, f"o_dem_{uuid.uuid4().hex[:8]}.tif")
data_3dtiles_to_dem.generate_dem(tileset_path, dem_path, region_coords)
if not os.path.exists(dem_path) :
logger.info(f"生成坡度坡向tif失败: {url},{region_coords}")
return "生成坡度坡向tif失败", None
slope_aspect_tif_path = os.path.join(script_dir, slope_aspect_tif_name)
slope_aspect_tif.create_slope_aspect(dem_path, 'combined', slope_aspect_tif_path)
logger.info(f"生成成功: {url},{region_coords},{slope_aspect_tif_path}")
entry_bucket, _ = _data_source.parse_minio_url(url);
success, minio_path = _data_source.upload_file(entry_bucket, f"{minio_sub_path}/{slope_aspect_tif_name}", slope_aspect_tif_path)
if success :
return "生成成功", minio_path
else :
return "生成失败", None
@staticmethod
def validate_vector(vector: List[float]) -> bool:
"""验证输入向量是否有效"""
if len(vector) != 3:
return False
if not all(isinstance(v, (int, float)) for v in vector):
return False
norm = np.linalg.norm(vector)
return norm > 1e-10 # 避免零向量
@staticmethod
def normalize_vector(vector: List[float]) -> np.ndarray:
"""向量归一化"""
arr = np.array(vector, dtype=np.float64)
norm = np.linalg.norm(arr)
return arr / norm if norm > 0 else arr
@staticmethod
def calculate_slope(normal_vector: List[float]) -> Dict[str, Any]:
"""
计算坡度
Args:
normal_vector: 法向量 [nx, ny, nz]
Returns:
dict: 包含坡度(度)和相关信息
"""
try:
# 验证输入
if not TerrainCalculator.validate_vector(normal_vector):
return {
"error": "无效的法向量,必须是长度为3的数值列表且不能为零向量",
"slope_deg": None
}
# 归一化
n = TerrainCalculator.normalize_vector(normal_vector)
# 计算坡度使用arccos法
nz_abs = abs(n[2])
# 处理数值误差
if nz_abs > 1.0:
nz_abs = 1.0
elif nz_abs < 0.0:
nz_abs = 0.0
# 计算坡度(弧度)
if abs(nz_abs - 1.0) < 1e-10: # 完全水平
slope_rad = 0.0
elif abs(nz_abs) < 1e-10: # 完全垂直
slope_rad = np.pi / 2
else:
slope_rad = np.arccos(nz_abs)
# 转换为度
slope_deg = np.degrees(slope_rad)
# 计算坡度百分比
slope_percent = np.tan(slope_rad) * 100 if slope_rad < np.pi/2 else float('inf')
return {
"slope_deg": float(slope_deg),
"slope_rad": float(slope_rad),
"slope_percent": float(slope_percent),
"normalized_vector": n.tolist(),
"classification": TerrainCalculator.classify_slope(slope_deg)
}
except Exception as e:
logger.error(f"坡度计算错误: {e}")
return {
"error": f"计算失败: {str(e)}",
"slope_deg": None
}
@staticmethod
def calculate_aspect(normal_vector: List[float]) -> Dict[str, Any]:
"""
计算坡向
Args:
normal_vector: 法向量 [nx, ny, nz]
Returns:
dict: 包含坡向(度)和相关信息
"""
try:
# 验证输入
if not TerrainCalculator.validate_vector(normal_vector):
return {
"error": "无效的法向量,必须是长度为3的数值列表且不能为零向量",
"aspect_deg": None
}
# 归一化
n = TerrainCalculator.normalize_vector(normal_vector)
# 检查是否为水平面
nx, ny, nz = n
horizontal_magnitude = np.sqrt(nx*nx + ny*ny)
if horizontal_magnitude < 1e-10: # 水平面,坡向无定义
return {
"aspect_deg": None,
"aspect_rad": None,
"is_flat": True,
"message": "水平面,坡向无定义",
"normalized_vector": n.tolist()
}
# 计算原始坡向(四象限反正切)
# 注意arctan2(nx, ny) 不是 arctan2(ny, nx)
raw_angle_rad = np.arctan2(nx, ny)
# 转换为坡向(下坡方向 = 法向量方向 + 180°
aspect_rad = raw_angle_rad + np.pi
# 转换为度
aspect_deg = np.degrees(aspect_rad)
# 归一化到 [0, 360) 范围
aspect_deg = aspect_deg % 360.0
# 转换为八方向
direction = TerrainCalculator.aspect_to_direction(aspect_deg)
return {
"aspect_deg": float(aspect_deg),
"aspect_rad": float(aspect_rad % (2*np.pi)),
"direction": direction,
"is_flat": False,
"normalized_vector": n.tolist(),
"raw_angle_deg": float(np.degrees(raw_angle_rad))
}
except Exception as e:
logger.error(f"坡向计算错误: {e}")
return {
"error": f"计算失败: {str(e)}",
"aspect_deg": None
}
@staticmethod
def calculate_slope_aspect(normal_vector: List[float]) -> Dict[str, Any]:
"""
同时计算坡度和坡向
Args:
normal_vector: 法向量 [nx, ny, nz]
Returns:
dict: 包含坡度和坡向的综合结果
"""
try:
slope_result = TerrainCalculator.calculate_slope(normal_vector)
aspect_result = TerrainCalculator.calculate_aspect(normal_vector)
result = {
"slope": slope_result,
"aspect": aspect_result,
"input_vector": normal_vector,
"calculation_time": None # 可在调用处添加时间戳
}
# 如果有错误,合并错误信息
errors = []
if "error" in slope_result and slope_result["error"]:
errors.append(f"坡度: {slope_result['error']}")
if "error" in aspect_result and aspect_result["error"]:
errors.append(f"坡向: {aspect_result['error']}")
if errors:
result["errors"] = errors
return result
except Exception as e:
logger.error(f"综合计算错误: {e}")
return {
"error": f"综合计算失败: {str(e)}"
}
@staticmethod
def classify_slope(slope_deg: float) -> Dict[str, Any]:
"""坡度分类"""
if slope_deg < 2:
return {"category": "平坦", "level": 0, "description": "基本平坦"}
elif slope_deg < 5:
return {"category": "缓坡", "level": 1, "description": "适合农业"}
elif slope_deg < 15:
return {"category": "斜坡", "level": 2, "description": "适合建设"}
elif slope_deg < 30:
return {"category": "陡坡", "level": 3, "description": "需要工程措施"}
elif slope_deg < 45:
return {"category": "急陡坡", "level": 4, "description": "高风险区域"}
else:
return {"category": "峭壁", "level": 5, "description": "危险区域"}
@staticmethod
def aspect_to_direction(aspect_deg: float) -> Dict[str, Any]:
"""将坡向转换为八方向"""
directions = ["", "东北", "", "东南", "", "西南", "西", "西北"]
# 计算方向索引 (45°一个区间)
index = int((aspect_deg + 22.5) % 360 / 45)
return {
"chinese": directions[index],
"english": ["N", "NE", "E", "SE", "S", "SW", "W", "NW"][index],
"degree_range": {
"min": (index * 45 - 22.5) % 360,
"max": (index * 45 + 22.5) % 360
}
}
@staticmethod
def batch_calculate(vectors: List[List[float]]) -> Dict[str, Any]:
"""批量计算多个法向量"""
try:
results = []
errors = []
for i, vec in enumerate(vectors):
try:
result = TerrainCalculator.calculate_slope_aspect(vec)
result["index"] = i
results.append(result)
except Exception as e:
errors.append({
"index": i,
"vector": vec,
"error": str(e)
})
return {
"total": len(vectors),
"successful": len(results),
"failed": len(errors),
"results": results,
"errors": errors,
"statistics": TerrainCalculator.calculate_statistics(results)
}
except Exception as e:
logger.error(f"批量计算错误: {e}")
return {"error": f"批量计算失败: {str(e)}"}
@staticmethod
def calculate_statistics(results: List[Dict]) -> Dict[str, Any]:
"""计算统计信息"""
if not results:
return {}
slope_values = []
aspect_values = []
for r in results:
if "slope" in r and "slope_deg" in r["slope"] and r["slope"]["slope_deg"] is not None:
slope_values.append(r["slope"]["slope_deg"])
if "aspect" in r and "aspect_deg" in r["aspect"] and r["aspect"]["aspect_deg"] is not None:
aspect_values.append(r["aspect"]["aspect_deg"])
if slope_values:
slope_arr = np.array(slope_values)
aspect_arr = np.array(aspect_values) if aspect_values else np.array([])
stats = {
"slope": {
"count": len(slope_values),
"mean": float(np.mean(slope_arr)),
"std": float(np.std(slope_arr)),
"min": float(np.min(slope_arr)),
"max": float(np.max(slope_arr)),
"median": float(np.median(slope_arr))
}
}
if aspect_values:
# 坡向统计需要循环统计
stats["aspect"] = {
"count": len(aspect_values),
"mean_vector": TerrainCalculator.circular_mean(aspect_arr),
"concentration": TerrainCalculator.circular_concentration(aspect_arr)
}
return stats
return {}
@staticmethod
def circular_mean(angles_deg: np.ndarray) -> float:
"""计算循环数据的平均值(角度)"""
angles_rad = np.radians(angles_deg)
x = np.mean(np.cos(angles_rad))
y = np.mean(np.sin(angles_rad))
mean_rad = np.arctan2(y, x)
return np.degrees(mean_rad) % 360
@staticmethod
def circular_concentration(angles_deg: np.ndarray) -> float:
"""计算角度数据的集中度 (0-1)"""
angles_rad = np.radians(angles_deg)
x = np.mean(np.cos(angles_rad))
y = np.mean(np.sin(angles_rad))
return np.sqrt(x*x + y*y)