import numpy as np from typing import List, Tuple, Dict, Any import logging import os import uuid from data_3dtiles_manager import MinIO3DTilesManager import data_3dtiles_to_dem import terrain3d_analyzer_color logger = logging.getLogger(__name__) class TerrainCalculator: """地形坡度和坡向计算器""" def generate_slopeAspect_3d_overlook(region_coords, url, overall_3d_png_name, minio_sub_path) : # 下载3dtiles地图数据 ENDPOINT_URL = "222.212.85.86:9000" ACCESS_KEY = "WuRenJi" SECRET_KEY = "WRJ@2024" manager = MinIO3DTilesManager( endpoint_url=ENDPOINT_URL, access_key=ACCESS_KEY, secret_key=SECRET_KEY, secure=False ) script_dir = os.path.dirname(os.path.abspath(__file__)) success, entry_local_path = manager.download_full_tileset( tileset_url=url, save_dir=f"{script_dir}/data_3dtiles", region_filter=None ) if not success : logger.info(f"下载地图数据失败: {url},{region_coords}") return "下载地图数据失败", None tileset_path = entry_local_path dem_path = os.path.join(script_dir, f"o_dem_{uuid.uuid4().hex[:8]}.tif") data_3dtiles_to_dem.generate_dem(region_coords, tileset_path, dem_path) if not os.path.exists(dem_path) : logger.info(f"生成坡度坡向俯视图失败: {url},{region_coords}") return "生成坡度坡向俯视图失败", None overall_3d_png_path = os.path.join(script_dir, overall_3d_png_name) terrain3d_analyzer_color.read_slope_aspect_by_dem(dem_path, overall_3d_png_path) logger.info(f"生成成功: {url},{region_coords},{overall_3d_png_path}") entry_bucket, _ = manager.parse_minio_url(url); success, minio_path = manager.upload_file(entry_bucket, f"{minio_sub_path}/{overall_3d_png_name}", overall_3d_png_path) if success : return "生成成功", minio_path else : return "生成失败", None @staticmethod def validate_vector(vector: List[float]) -> bool: """验证输入向量是否有效""" if len(vector) != 3: return False if not all(isinstance(v, (int, float)) for v in vector): return False norm = np.linalg.norm(vector) return norm > 1e-10 # 避免零向量 @staticmethod def normalize_vector(vector: List[float]) -> np.ndarray: """向量归一化""" arr = np.array(vector, dtype=np.float64) norm = np.linalg.norm(arr) return arr / norm if norm > 0 else arr @staticmethod def calculate_slope(normal_vector: List[float]) -> Dict[str, Any]: """ 计算坡度 Args: normal_vector: 法向量 [nx, ny, nz] Returns: dict: 包含坡度(度)和相关信息 """ try: # 验证输入 if not TerrainCalculator.validate_vector(normal_vector): return { "error": "无效的法向量,必须是长度为3的数值列表且不能为零向量", "slope_deg": None } # 归一化 n = TerrainCalculator.normalize_vector(normal_vector) # 计算坡度(使用arccos法) nz_abs = abs(n[2]) # 处理数值误差 if nz_abs > 1.0: nz_abs = 1.0 elif nz_abs < 0.0: nz_abs = 0.0 # 计算坡度(弧度) if abs(nz_abs - 1.0) < 1e-10: # 完全水平 slope_rad = 0.0 elif abs(nz_abs) < 1e-10: # 完全垂直 slope_rad = np.pi / 2 else: slope_rad = np.arccos(nz_abs) # 转换为度 slope_deg = np.degrees(slope_rad) # 计算坡度百分比 slope_percent = np.tan(slope_rad) * 100 if slope_rad < np.pi/2 else float('inf') return { "slope_deg": float(slope_deg), "slope_rad": float(slope_rad), "slope_percent": float(slope_percent), "normalized_vector": n.tolist(), "classification": TerrainCalculator.classify_slope(slope_deg) } except Exception as e: logger.error(f"坡度计算错误: {e}") return { "error": f"计算失败: {str(e)}", "slope_deg": None } @staticmethod def calculate_aspect(normal_vector: List[float]) -> Dict[str, Any]: """ 计算坡向 Args: normal_vector: 法向量 [nx, ny, nz] Returns: dict: 包含坡向(度)和相关信息 """ try: # 验证输入 if not TerrainCalculator.validate_vector(normal_vector): return { "error": "无效的法向量,必须是长度为3的数值列表且不能为零向量", "aspect_deg": None } # 归一化 n = TerrainCalculator.normalize_vector(normal_vector) # 检查是否为水平面 nx, ny, nz = n horizontal_magnitude = np.sqrt(nx*nx + ny*ny) if horizontal_magnitude < 1e-10: # 水平面,坡向无定义 return { "aspect_deg": None, "aspect_rad": None, "is_flat": True, "message": "水平面,坡向无定义", "normalized_vector": n.tolist() } # 计算原始坡向(四象限反正切) # 注意:arctan2(nx, ny) 不是 arctan2(ny, nx) raw_angle_rad = np.arctan2(nx, ny) # 转换为坡向(下坡方向 = 法向量方向 + 180°) aspect_rad = raw_angle_rad + np.pi # 转换为度 aspect_deg = np.degrees(aspect_rad) # 归一化到 [0, 360) 范围 aspect_deg = aspect_deg % 360.0 # 转换为八方向 direction = TerrainCalculator.aspect_to_direction(aspect_deg) return { "aspect_deg": float(aspect_deg), "aspect_rad": float(aspect_rad % (2*np.pi)), "direction": direction, "is_flat": False, "normalized_vector": n.tolist(), "raw_angle_deg": float(np.degrees(raw_angle_rad)) } except Exception as e: logger.error(f"坡向计算错误: {e}") return { "error": f"计算失败: {str(e)}", "aspect_deg": None } @staticmethod def calculate_slope_aspect(normal_vector: List[float]) -> Dict[str, Any]: """ 同时计算坡度和坡向 Args: normal_vector: 法向量 [nx, ny, nz] Returns: dict: 包含坡度和坡向的综合结果 """ try: slope_result = TerrainCalculator.calculate_slope(normal_vector) aspect_result = TerrainCalculator.calculate_aspect(normal_vector) result = { "slope": slope_result, "aspect": aspect_result, "input_vector": normal_vector, "calculation_time": None # 可在调用处添加时间戳 } # 如果有错误,合并错误信息 errors = [] if "error" in slope_result and slope_result["error"]: errors.append(f"坡度: {slope_result['error']}") if "error" in aspect_result and aspect_result["error"]: errors.append(f"坡向: {aspect_result['error']}") if errors: result["errors"] = errors return result except Exception as e: logger.error(f"综合计算错误: {e}") return { "error": f"综合计算失败: {str(e)}" } @staticmethod def classify_slope(slope_deg: float) -> Dict[str, Any]: """坡度分类""" if slope_deg < 2: return {"category": "平坦", "level": 0, "description": "基本平坦"} elif slope_deg < 5: return {"category": "缓坡", "level": 1, "description": "适合农业"} elif slope_deg < 15: return {"category": "斜坡", "level": 2, "description": "适合建设"} elif slope_deg < 30: return {"category": "陡坡", "level": 3, "description": "需要工程措施"} elif slope_deg < 45: return {"category": "急陡坡", "level": 4, "description": "高风险区域"} else: return {"category": "峭壁", "level": 5, "description": "危险区域"} @staticmethod def aspect_to_direction(aspect_deg: float) -> Dict[str, Any]: """将坡向转换为八方向""" directions = ["北", "东北", "东", "东南", "南", "西南", "西", "西北"] # 计算方向索引 (45°一个区间) index = int((aspect_deg + 22.5) % 360 / 45) return { "chinese": directions[index], "english": ["N", "NE", "E", "SE", "S", "SW", "W", "NW"][index], "degree_range": { "min": (index * 45 - 22.5) % 360, "max": (index * 45 + 22.5) % 360 } } @staticmethod def batch_calculate(vectors: List[List[float]]) -> Dict[str, Any]: """批量计算多个法向量""" try: results = [] errors = [] for i, vec in enumerate(vectors): try: result = TerrainCalculator.calculate_slope_aspect(vec) result["index"] = i results.append(result) except Exception as e: errors.append({ "index": i, "vector": vec, "error": str(e) }) return { "total": len(vectors), "successful": len(results), "failed": len(errors), "results": results, "errors": errors, "statistics": TerrainCalculator.calculate_statistics(results) } except Exception as e: logger.error(f"批量计算错误: {e}") return {"error": f"批量计算失败: {str(e)}"} @staticmethod def calculate_statistics(results: List[Dict]) -> Dict[str, Any]: """计算统计信息""" if not results: return {} slope_values = [] aspect_values = [] for r in results: if "slope" in r and "slope_deg" in r["slope"] and r["slope"]["slope_deg"] is not None: slope_values.append(r["slope"]["slope_deg"]) if "aspect" in r and "aspect_deg" in r["aspect"] and r["aspect"]["aspect_deg"] is not None: aspect_values.append(r["aspect"]["aspect_deg"]) if slope_values: slope_arr = np.array(slope_values) aspect_arr = np.array(aspect_values) if aspect_values else np.array([]) stats = { "slope": { "count": len(slope_values), "mean": float(np.mean(slope_arr)), "std": float(np.std(slope_arr)), "min": float(np.min(slope_arr)), "max": float(np.max(slope_arr)), "median": float(np.median(slope_arr)) } } if aspect_values: # 坡向统计需要循环统计 stats["aspect"] = { "count": len(aspect_values), "mean_vector": TerrainCalculator.circular_mean(aspect_arr), "concentration": TerrainCalculator.circular_concentration(aspect_arr) } return stats return {} @staticmethod def circular_mean(angles_deg: np.ndarray) -> float: """计算循环数据的平均值(角度)""" angles_rad = np.radians(angles_deg) x = np.mean(np.cos(angles_rad)) y = np.mean(np.sin(angles_rad)) mean_rad = np.arctan2(y, x) return np.degrees(mean_rad) % 360 @staticmethod def circular_concentration(angles_deg: np.ndarray) -> float: """计算角度数据的集中度 (0-1)""" angles_rad = np.radians(angles_deg) x = np.mean(np.cos(angles_rad)) y = np.mean(np.sin(angles_rad)) return np.sqrt(x*x + y*y)