import numpy as np from osgeo import gdal import math import os from typing import Optional, Tuple, Dict, Any class SlopeAspectGenerator: """ 坡度和坡向生成器 支持生成单独的坡度、坡向文件,或包含两者的合并文件 """ def __init__(self, dem_path: str): """ 初始化坡度和坡向生成器 Args: dem_path: 输入DEM文件路径 """ self.dem_path = dem_path self.dataset = gdal.Open(dem_path, gdal.GA_ReadOnly) if not self.dataset: raise FileNotFoundError(f"无法打开DEM文件: {dem_path}") # 获取地理信息 self.geotransform = self.dataset.GetGeoTransform() self.projection = self.dataset.GetProjection() self.band = self.dataset.GetRasterBand(1) self.data = self.band.ReadAsArray() # 获取栅格信息 self.cols = self.dataset.RasterXSize self.rows = self.dataset.RasterYSize self.no_data = self.band.GetNoDataValue() # 如果DEM没有无效值设置,使用-9999 if self.no_data is None: self.no_data = -9999.0 print(f"警告: DEM未设置无效值,将使用 {self.no_data} 作为无效值") # 像元大小(考虑非正方形像元) self.cell_size_x = abs(self.geotransform[1]) self.cell_size_y = abs(self.geotransform[5]) print(f"DEM信息: {self.rows}行 x {self.cols}列") print(f"X方向像元大小: {self.cell_size_x:.4f}") print(f"Y方向像元大小: {self.cell_size_y:.4f}") print(f"无效值: {self.no_data}") def calculate_slope(self, z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray: """ 计算坡度 Args: z_factor: 垂直单位转换因子 algorithm: 算法选项 'Horn' 或 'ZevenbergenThorne' Returns: 坡度数组(单位:度) """ print(f"正在计算坡度 (算法: {algorithm})...") # 创建输出数组 slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32) # 创建填充后的数据(用于边界处理) padded_data = np.pad( self.data, pad_width=1, mode='constant', constant_values=self.no_data ) # 创建有效掩码 valid_mask = (self.data != self.no_data) padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False) for i in range(self.rows): for j in range(self.cols): if not valid_mask[i, j]: continue i_pad, j_pad = i + 1, j + 1 # 获取3x3窗口 window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2] window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2] # 如果窗口内有无效值,跳过 if not np.all(window_valid): continue if algorithm.upper() == 'ZEVERBERGENTHORNE': # Zevenbergen & Thorne 算法 dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x) dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y) else: # Horn 算法(默认) dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) - (window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x) dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) - (window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y) # 计算坡度(弧度转角度) slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2)) slope_deg = math.degrees(slope_rad) slope[i, j] = slope_deg # 边界处理:将边缘设为无效值 slope[0, :] = self.no_data slope[-1, :] = self.no_data slope[:, 0] = self.no_data slope[:, -1] = self.no_data valid_slope = slope[slope != self.no_data] if len(valid_slope) > 0: print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f} 度") return slope def calculate_aspect(self, algorithm: str = 'Horn') -> np.ndarray: """ 计算坡向 Args: algorithm: 算法选项 'Horn' 或 'ZevenbergenThorne' Returns: 坡向数组(单位:度,0-360,北为0度,顺时针增加) """ print(f"正在计算坡向 (算法: {algorithm})...") # 创建输出数组 aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32) # 平地用-1表示 # 创建填充后的数据 padded_data = np.pad( self.data, pad_width=1, mode='constant', constant_values=self.no_data ) # 创建有效掩码 valid_mask = (self.data != self.no_data) padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False) for i in range(self.rows): for j in range(self.cols): if not valid_mask[i, j]: continue i_pad, j_pad = i + 1, j + 1 # 获取3x3窗口 window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2] window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2] # 如果窗口内有无效值,跳过 if not np.all(window_valid): continue if algorithm.upper() == 'ZEVERBERGENTHORNE': # Zevenbergen & Thorne 算法 dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x) dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y) else: # Horn 算法(默认) dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) - (window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x) dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) - (window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y) # 计算坡向 if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10: # 平地 aspect[i, j] = -1.0 else: aspect_rad = math.atan2(dz_dy, -dz_dx) # atan2(y, x) aspect_deg = math.degrees(aspect_rad) # 转换为0-360度(北为0度,顺时针) if aspect_deg < 0: aspect_deg += 360.0 aspect[i, j] = aspect_deg # 边界处理:将边缘设为-1 aspect[0, :] = -1.0 aspect[-1, :] = -1.0 aspect[:, 0] = -1.0 aspect[:, -1] = -1.0 valid_aspect = aspect[aspect != -1.0] if len(valid_aspect) > 0: print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f} 度") return aspect def calculate_slope_aspect(self, z_factor: float = 1.0, algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]: """ 同时计算坡度和坡向(优化版本,减少重复计算) Args: z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: (slope_array, aspect_array) """ print(f"同时计算坡度和坡向 (算法: {algorithm})...") # 创建输出数组 slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32) aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32) # 创建填充后的数据 padded_data = np.pad( self.data, pad_width=1, mode='constant', constant_values=self.no_data ) # 创建有效掩码 valid_mask = (self.data != self.no_data) padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False) for i in range(self.rows): for j in range(self.cols): if not valid_mask[i, j]: continue i_pad, j_pad = i + 1, j + 1 # 获取3x3窗口 window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2] window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2] # 如果窗口内有无效值,跳过 if not np.all(window_valid): continue # 计算导数(一次计算,两个都用) if algorithm.upper() == 'ZEVERBERGENTHORNE': dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x) dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y) else: dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) - (window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x) dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) - (window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y) # 计算坡度 slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2)) slope_deg = math.degrees(slope_rad) slope[i, j] = slope_deg # 计算坡向 if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10: aspect[i, j] = -1.0 else: aspect_rad = math.atan2(dz_dy, -dz_dx) aspect_deg = math.degrees(aspect_rad) if aspect_deg < 0: aspect_deg += 360.0 aspect[i, j] = aspect_deg # 边界处理 slope[0, :] = self.no_data slope[-1, :] = self.no_data slope[:, 0] = self.no_data slope[:, -1] = self.no_data aspect[0, :] = -1.0 aspect[-1, :] = -1.0 aspect[:, 0] = -1.0 aspect[:, -1] = -1.0 # 显示统计信息 valid_slope = slope[slope != self.no_data] valid_aspect = aspect[aspect != -1.0] if len(valid_slope) > 0: print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f} 度") if len(valid_aspect) > 0: print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f} 度") return slope, aspect def save_raster(self, data: np.ndarray, output_path: str, band_name: str = '', data_type: int = gdal.GDT_Float32, no_data_value: Optional[float] = None) -> None: """ 保存栅格数据为TIFF文件 Args: data: 栅格数据数组 output_path: 输出文件路径 band_name: 波段名称 data_type: GDAL数据类型 no_data_value: 无效值 """ driver = gdal.GetDriverByName('GTiff') out_dataset = driver.Create( output_path, self.cols, self.rows, 1, # 单波段 data_type, options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES'] ) out_dataset.SetGeoTransform(self.geotransform) out_dataset.SetProjection(self.projection) out_band = out_dataset.GetRasterBand(1) out_band.WriteArray(data) if no_data_value is not None: out_band.SetNoDataValue(no_data_value) elif band_name == 'Slope': out_band.SetNoDataValue(self.no_data) elif band_name == 'Aspect': out_band.SetNoDataValue(-1.0) if band_name: out_band.SetDescription(band_name) # 设置统计信息(加速QGIS等软件加载) out_band.ComputeStatistics(False) out_dataset.FlushCache() out_dataset = None print(f"文件已保存: {output_path}") if band_name: print(f" 波段: {band_name}") def save_combined_raster(self, slope_data: np.ndarray, aspect_data: np.ndarray, output_path: str) -> None: """ 保存包含坡度和坡向两个波段的TIFF文件 Args: slope_data: 坡度数据 aspect_data: 坡向数据 output_path: 输出文件路径 """ driver = gdal.GetDriverByName('GTiff') # 创建包含2个波段的数据集 out_dataset = driver.Create( output_path, self.cols, self.rows, 2, # 2个波段 gdal.GDT_Float32, options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES'] ) # 设置地理信息 out_dataset.SetGeoTransform(self.geotransform) out_dataset.SetProjection(self.projection) # 写入坡度波段(波段1) slope_band = out_dataset.GetRasterBand(1) slope_band.WriteArray(slope_data) slope_band.SetNoDataValue(self.no_data) slope_band.SetDescription('Slope') slope_band.SetMetadataItem('UNITS', 'degrees') slope_band.SetMetadataItem('RANGE', '0-90') slope_band.ComputeStatistics(False) # 写入坡向波段(波段2) aspect_band = out_dataset.GetRasterBand(2) aspect_band.WriteArray(aspect_data) aspect_band.SetNoDataValue(-1.0) aspect_band.SetDescription('Aspect') aspect_band.SetMetadataItem('UNITS', 'degrees') aspect_band.SetMetadataItem('RANGE', '0-360') aspect_band.SetMetadataItem('DIRECTION', 'Clockwise from North') aspect_band.ComputeStatistics(False) # 设置数据集元数据 out_dataset.SetMetadata({ 'SOURCE_DEM': os.path.basename(self.dem_path), 'PROCESSING_METHOD': 'SlopeAspectGenerator', 'CELL_SIZE_X': str(self.cell_size_x), 'CELL_SIZE_Y': str(self.cell_size_y) }) out_dataset.FlushCache() out_dataset = None print(f"合并文件已保存: {output_path}") print(f" 波段1: Slope (坡度, 单位:度)") print(f" 波段2: Aspect (坡向, 单位:度)") def generate_slope_tif(self, output_path: Optional[str] = None, z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray: """ 生成坡度TIFF文件 Args: output_path: 输出文件路径(可选) z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: 坡度数据数组 """ if output_path is None: base_name = os.path.splitext(self.dem_path)[0] output_path = f"{base_name}_slope.tif" print(f"\n生成坡度文件...") slope = self.calculate_slope(z_factor, algorithm) self.save_raster(slope, output_path, 'Slope', no_data_value=self.no_data) return slope def generate_aspect_tif(self, output_path: Optional[str] = None, algorithm: str = 'Horn') -> np.ndarray: """ 生成坡向TIFF文件 Args: output_path: 输出文件路径(可选) algorithm: 算法选项 Returns: 坡向数据数组 """ if output_path is None: base_name = os.path.splitext(self.dem_path)[0] output_path = f"{base_name}_aspect.tif" print(f"\n生成坡向文件...") aspect = self.calculate_aspect(algorithm) self.save_raster(aspect, output_path, 'Aspect', no_data_value=-1.0) return aspect def generate_slope_aspect_separate(self, slope_output: Optional[str] = None, aspect_output: Optional[str] = None, z_factor: float = 1.0, algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]: """ 分别生成坡度和坡向TIFF文件 Args: slope_output: 坡度输出路径(可选) aspect_output: 坡向输出路径(可选) z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: (slope_data, aspect_data) """ # 设置输出路径 base_name = os.path.splitext(self.dem_path)[0] if slope_output is None: slope_output = f"{base_name}_slope.tif" if aspect_output is None: aspect_output = f"{base_name}_aspect.tif" print(f"\n分别生成坡度和坡向文件...") # 同时计算坡度和坡向(优化版本) slope, aspect = self.calculate_slope_aspect(z_factor, algorithm) # 保存文件 self.save_raster(slope, slope_output, 'Slope', no_data_value=self.no_data) self.save_raster(aspect, aspect_output, 'Aspect', no_data_value=-1.0) return slope, aspect def generate_combined_tif(self, output_path: Optional[str] = None, z_factor: float = 1.0, algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]: """ 生成包含坡度和坡向的合并TIFF文件 Args: output_path: 输出文件路径(可选) z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: (slope_data, aspect_data) """ if output_path is None: base_name = os.path.splitext(self.dem_path)[0] output_path = f"{base_name}_slope_aspect.tif" print(f"\n生成合并文件...") # 同时计算坡度和坡向 slope, aspect = self.calculate_slope_aspect(z_factor, algorithm) # 保存合并文件 self.save_combined_raster(slope, aspect, output_path) return slope, aspect def generate_all(self, output_dir: Optional[str] = None, z_factor: float = 1.0, algorithm: str = 'Horn') -> Dict[str, str]: """ 生成所有类型的文件:单独的坡度、单独的坡向、合并文件、统计信息 Args: output_dir: 输出目录(可选) z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: 包含所有输出文件路径的字典 """ # 设置输出路径 base_name = os.path.basename(self.dem_path) file_base = os.path.splitext(base_name)[0] if output_dir: if not os.path.exists(output_dir): os.makedirs(output_dir) output_prefix = os.path.join(output_dir, file_base) else: output_prefix = file_base # 计算坡度和坡向(只计算一次) print(f"\n计算坡度和坡向...") slope, aspect = self.calculate_slope_aspect(z_factor, algorithm) # 生成各种文件 results = {} # 1. 单独的坡度文件 slope_file = f"{output_prefix}_slope.tif" self.save_raster(slope, slope_file, 'Slope', no_data_value=self.no_data) results['slope'] = slope_file # 2. 单独的坡向文件 aspect_file = f"{output_prefix}_aspect.tif" self.save_raster(aspect, aspect_file, 'Aspect', no_data_value=-1.0) results['aspect'] = aspect_file # 3. 合并文件 combined_file = f"{output_prefix}_slope_aspect.tif" self.save_combined_raster(slope, aspect, combined_file) results['combined'] = combined_file # 4. 统计信息文件(文本) stats_file = f"{output_prefix}_statistics.txt" self.save_statistics(slope, aspect, stats_file) results['statistics'] = stats_file print(f"\n所有文件生成完成!") for key, path in results.items(): print(f" {key}: {os.path.basename(path)}") return results def save_statistics(self, slope_data: np.ndarray, aspect_data: np.ndarray, output_path: str) -> None: """ 保存统计信息到文本文件 Args: slope_data: 坡度数据 aspect_data: 坡向数据 output_path: 输出文件路径 """ valid_slope = slope_data[slope_data != self.no_data] valid_aspect = aspect_data[aspect_data != -1.0] with open(output_path, 'w', encoding='utf-8') as f: f.write("坡度和坡向统计信息\n") f.write("=" * 50 + "\n") f.write(f"输入DEM: {os.path.basename(self.dem_path)}\n") f.write(f"栅格大小: {self.rows} x {self.cols}\n") f.write(f"处理时间: {np.datetime64('now')}\n\n") f.write("坡度统计:\n") f.write("-" * 30 + "\n") if len(valid_slope) > 0: f.write(f"最小值: {np.min(valid_slope):.4f} 度\n") f.write(f"最大值: {np.max(valid_slope):.4f} 度\n") f.write(f"平均值: {np.mean(valid_slope):.4f} 度\n") f.write(f"标准差: {np.std(valid_slope):.4f} 度\n") f.write(f"中位数: {np.median(valid_slope):.4f} 度\n") f.write(f"有效像元数: {len(valid_slope)} / {self.rows * self.cols}\n") # 坡度分级统计 f.write("\n坡度分级统计:\n") slope_classes = ['平地(0-2)', '缓坡(2-5)', '斜坡(5-15)', '陡坡(15-30)', '急陡坡(30-45)', '峭壁(45-90)'] bins = [0, 2, 5, 15, 30, 45, 90] hist, _ = np.histogram(valid_slope, bins=bins) total = sum(hist) for i in range(len(slope_classes)): if hist[i] > 0: percentage = (hist[i] / total) * 100 f.write(f"{slope_classes[i]}: {hist[i]} ({percentage:.2f}%)\n") else: f.write("无有效数据\n") f.write("\n坡向统计:\n") f.write("-" * 30 + "\n") if len(valid_aspect) > 0: f.write(f"最小值: {np.min(valid_aspect):.4f} 度\n") f.write(f"最大值: {np.max(valid_aspect):.4f} 度\n") f.write(f"平均值: {np.mean(valid_aspect):.4f} 度\n") f.write(f"标准差: {np.std(valid_aspect):.4f} 度\n") f.write(f"中位数: {np.median(valid_aspect):.4f} 度\n") f.write(f"有效像元数: {len(valid_aspect)} / {self.rows * self.cols}\n") # 坡向分类统计 f.write("\n坡向分类统计:\n") directions = ['北(0-22.5)', '东北(22.5-67.5)', '东(67.5-112.5)', '东南(112.5-157.5)', '南(157.5-202.5)', '西南(202.5-247.5)', '西(247.5-292.5)', '西北(292.5-337.5)', '北(337.5-360)'] bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360] hist, _ = np.histogram(valid_aspect, bins=bins) total = sum(hist) for i in range(len(directions)): if hist[i] > 0: percentage = (hist[i] / total) * 100 f.write(f"{directions[i]}: {hist[i]} ({percentage:.2f}%)\n") else: f.write("无有效数据\n") print(f"统计信息已保存: {output_path}") def get_dem_info(self) -> Dict[str, Any]: """ 获取DEM文件的详细信息 Returns: 包含DEM信息的字典 """ info = { 'file_path': self.dem_path, 'file_name': os.path.basename(self.dem_path), 'rows': self.rows, 'cols': self.cols, 'cell_size_x': self.cell_size_x, 'cell_size_y': self.cell_size_y, 'no_data_value': self.no_data, 'projection': self.projection } # 获取高程统计 valid_data = self.data[self.data != self.no_data] if len(valid_data) > 0: info['elevation_min'] = float(np.min(valid_data)) info['elevation_max'] = float(np.max(valid_data)) info['elevation_mean'] = float(np.mean(valid_data)) info['elevation_std'] = float(np.std(valid_data)) info['valid_cells'] = int(len(valid_data)) info['total_cells'] = self.rows * self.cols info['valid_percentage'] = (len(valid_data) / (self.rows * self.cols)) * 100 return info def get_slope_statistics(self, slope_data: np.ndarray) -> Dict[str, Any]: """ 获取坡度统计信息 Args: slope_data: 坡度数据数组 Returns: 坡度统计信息字典 """ valid_slope = slope_data[slope_data != self.no_data] stats = { 'min': float(np.min(valid_slope)) if len(valid_slope) > 0 else None, 'max': float(np.max(valid_slope)) if len(valid_slope) > 0 else None, 'mean': float(np.mean(valid_slope)) if len(valid_slope) > 0 else None, 'std': float(np.std(valid_slope)) if len(valid_slope) > 0 else None, 'median': float(np.median(valid_slope)) if len(valid_slope) > 0 else None, 'valid_cells': int(len(valid_slope)), 'total_cells': self.rows * self.cols } return stats def get_aspect_statistics(self, aspect_data: np.ndarray) -> Dict[str, Any]: """ 获取坡向统计信息 Args: aspect_data: 坡向数据数组 Returns: 坡向统计信息字典 """ valid_aspect = aspect_data[aspect_data != -1.0] stats = { 'min': float(np.min(valid_aspect)) if len(valid_aspect) > 0 else None, 'max': float(np.max(valid_aspect)) if len(valid_aspect) > 0 else None, 'mean': float(np.mean(valid_aspect)) if len(valid_aspect) > 0 else None, 'std': float(np.std(valid_aspect)) if len(valid_aspect) > 0 else None, 'median': float(np.median(valid_aspect)) if len(valid_aspect) > 0 else None, 'valid_cells': int(len(valid_aspect)), 'total_cells': self.rows * self.cols } # 坡向分类统计 if len(valid_aspect) > 0: bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360] directions = ['北', '东北', '东', '东南', '南', '西南', '西', '西北', '北'] hist, _ = np.histogram(valid_aspect, bins=bins) direction_stats = {} for i in range(len(directions)): direction_stats[directions[i]] = { 'count': int(hist[i]), 'percentage': (hist[i] / len(valid_aspect)) * 100 if len(valid_aspect) > 0 else 0 } stats['direction_distribution'] = direction_stats return stats def close(self): """关闭数据集,释放资源""" if hasattr(self, 'dataset') and self.dataset: self.dataset = None if hasattr(self, 'band') and self.band: self.band = None print("资源已释放") def __enter__(self): """支持with语句""" return self def __exit__(self, exc_type, exc_val, exc_tb): """退出with语句时自动关闭""" self.close() # 使用示例和辅助函数 def create_slope_aspect(dem_path: str, output_type: str = 'combined', output_path: Optional[str] = None, z_factor: float = 1.0, algorithm: str = 'Horn') -> Dict[str, str]: """ 创建坡度坡向的便捷函数 Args: dem_path: DEM文件路径 output_type: 输出类型 'slope', 'aspect', 'separate', 'combined', 'all' output_path: 输出路径(对于单个文件)或输出目录 z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: 包含输出文件路径的字典 """ print(f"处理DEM文件: {dem_path}") with SlopeAspectGenerator(dem_path) as generator: # 获取DEM信息 dem_info = generator.get_dem_info() print(f"DEM信息: {dem_info['rows']}x{dem_info['cols']}, " f"高程范围: {dem_info.get('elevation_min', 'N/A'):.2f}-" f"{dem_info.get('elevation_max', 'N/A'):.2f}") results = {} if output_type == 'slope': # 只生成坡度 slope_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope.tif" slope = generator.generate_slope_tif(slope_file, z_factor, algorithm) results['slope'] = slope_file # 获取统计信息 slope_stats = generator.get_slope_statistics(slope) print(f"坡度统计: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}度") elif output_type == 'aspect': # 只生成坡向 aspect_file = output_path or f"{os.path.splitext(dem_path)[0]}_aspect.tif" aspect = generator.generate_aspect_tif(aspect_file, algorithm) results['aspect'] = aspect_file # 获取统计信息 aspect_stats = generator.get_aspect_statistics(aspect) print(f"坡向统计: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}度") elif output_type == 'separate': # 分别生成坡度和坡向 if output_path and os.path.isdir(output_path): # 如果输出路径是目录 slope_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_slope.tif") aspect_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_aspect.tif") slope, aspect = generator.generate_slope_aspect_separate( slope_file, aspect_file, z_factor, algorithm) else: # 如果输出路径是文件前缀 base = output_path or os.path.splitext(dem_path)[0] slope_file = f"{base}_slope.tif" aspect_file = f"{base}_aspect.tif" slope, aspect = generator.generate_slope_aspect_separate( slope_file, aspect_file, z_factor, algorithm) results['slope'] = slope_file results['aspect'] = aspect_file # 获取统计信息 slope_stats = generator.get_slope_statistics(slope) aspect_stats = generator.get_aspect_statistics(aspect) print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}度") print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}度") elif output_type == 'combined': # 生成合并文件 combined_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope_aspect.tif" slope, aspect = generator.generate_combined_tif(combined_file, z_factor, algorithm) results['combined'] = combined_file # 获取统计信息 slope_stats = generator.get_slope_statistics(slope) aspect_stats = generator.get_aspect_statistics(aspect) print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}度") print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}度") elif output_type == 'all': # 生成所有文件 output_dir = output_path if (output_path and os.path.isdir(output_path)) else None results = generator.generate_all(output_dir, z_factor, algorithm) else: raise ValueError(f"不支持的输出类型: {output_type}") return results def batch_process_slope_aspect(dem_files: list, output_dir: str, output_type: str = 'all', z_factor: float = 1.0, algorithm: str = 'Horn') -> Dict[str, list]: """ 批量处理多个DEM文件 Args: dem_files: DEM文件路径列表 output_dir: 输出目录 output_type: 输出类型 z_factor: 垂直单位转换因子 algorithm: 算法选项 Returns: 包含处理结果的字典 """ if not os.path.exists(output_dir): os.makedirs(output_dir) results = { 'success': [], 'failed': [], 'outputs': [] } total_files = len(dem_files) print(f"开始批量处理 {total_files} 个DEM文件...") for i, dem_file in enumerate(dem_files, 1): if not os.path.exists(dem_file): print(f"[{i}/{total_files}] 文件不存在: {dem_file}") results['failed'].append(dem_file) continue print(f"\n[{i}/{total_files}] 处理: {os.path.basename(dem_file)}") try: # 处理文件 file_results = create_slope_aspect( dem_file, output_type, output_dir, z_factor, algorithm ) results['success'].append(dem_file) results['outputs'].append(file_results) print(f"[{i}/{total_files}] 完成") except Exception as e: print(f"[{i}/{total_files}] 失败: {e}") results['failed'].append(dem_file) # 打印总结 print(f"\n" + "="*50) print(f"批量处理完成!") print(f"成功: {len(results['success'])} 个") print(f"失败: {len(results['failed'])} 个") if results['failed']: print("\n失败的文件:") for failed_file in results['failed']: print(f" {os.path.basename(failed_file)}") return results # 主示例代码 def main_example(): """使用示例""" # 示例1: 基本使用 print("示例1: 基本使用") print("=" * 50) # 配置参数 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) dem_file = os.path.join(SCRIPT_DIR, "o_dem_23d110d0.tif") if os.path.exists(dem_file): # 方法1: 使用便捷函数 print("\n方法1: 使用便捷函数") slope_file = os.path.join(SCRIPT_DIR, "o_combined.tif") results = create_slope_aspect(dem_file, 'combined', slope_file) # 方法2: 直接使用类 # print("\n方法2: 直接使用类") # with SlopeAspectGenerator(dem_file) as gen: # # 获取DEM信息 # dem_info = gen.get_dem_info() # print(f"DEM信息: {dem_info}") # # 生成合并文件 # slope, aspect = gen.generate_combined_tif("o_combined.tif") # # 获取统计信息 # slope_stats = gen.get_slope_statistics(slope) # aspect_stats = gen.get_aspect_statistics(aspect) # print(f"坡度统计: {slope_stats}") # print(f"坡向统计: {aspect_stats}") else: print(f"DEM文件不存在: {dem_file}") print("请创建一个示例DEM文件或使用您自己的DEM文件") # 创建一个简单的示例DEM print("\n创建一个示例DEM文件用于测试...") create_sample_dem("sample_dem.tif") # 使用示例DEM print("\n使用示例DEM进行测试...") results = create_slope_aspect("sample_dem.tif", 'all') # 示例2: 批量处理 # print("\n\n示例2: 批量处理") # print("=" * 50) # # 假设有多个DEM文件 # dem_files = ["dem1.tif", "dem2.tif", "dem3.tif"] # 替换为实际文件 # # 过滤出存在的文件 # existing_files = [f for f in dem_files if os.path.exists(f)] # if len(existing_files) > 0: # batch_results = batch_process_slope_aspect( # existing_files, # "batch_output", # 'all' # ) # else: # print("未找到DEM文件进行批量处理") # print("创建示例文件进行批量处理演示...") # # 创建几个示例DEM文件 # for i in range(1, 4): # create_sample_dem(f"sample_dem_{i}.tif", offset=i*100) # sample_files = [f"sample_dem_{i}.tif" for i in range(1, 4)] # batch_results = batch_process_slope_aspect( # sample_files, # "batch_output", # 'all' # ) def create_sample_dem(output_path: str, rows: int = 100, cols: int = 100, cell_size: float = 30.0, offset: float = 0.0): """ 创建一个示例DEM文件用于测试 Args: output_path: 输出文件路径 rows: 行数 cols: 列数 cell_size: 像元大小 offset: 高程偏移量 """ from osgeo import osr # 创建一个简单的地形:一个山峰 x = np.linspace(-5, 5, cols) y = np.linspace(-5, 5, rows) X, Y = np.meshgrid(x, y) # 创建高程数据(高斯山峰) elevation = 1000 * np.exp(-(X**2 + Y**2) / 10) + offset # 添加一些噪声 elevation += np.random.normal(0, 10, (rows, cols)) # 创建栅格文件 driver = gdal.GetDriverByName('GTiff') dataset = driver.Create( output_path, cols, rows, 1, gdal.GDT_Float32 ) # 设置地理变换(左下角坐标为(0,0)) dataset.SetGeoTransform((0, cell_size, 0, 0, 0, -cell_size)) # 设置投影(WGS84) srs = osr.SpatialReference() srs.ImportFromEPSG(4326) dataset.SetProjection(srs.ExportToWkt()) # 写入数据 band = dataset.GetRasterBand(1) band.WriteArray(elevation) band.SetNoDataValue(-9999.0) band.SetDescription('Elevation') dataset.FlushCache() dataset = None print(f"示例DEM已创建: {output_path}") if __name__ == "__main__": # 运行示例 main_example()