1062 lines
39 KiB
Python
1062 lines
39 KiB
Python
import numpy as np
|
||
from osgeo import gdal
|
||
import math
|
||
import os
|
||
from typing import Optional, Tuple, Dict, Any
|
||
|
||
|
||
class SlopeAspectGenerator:
|
||
"""
|
||
坡度和坡向生成器
|
||
支持生成单独的坡度、坡向文件,或包含两者的合并文件
|
||
"""
|
||
|
||
def __init__(self, dem_path: str):
|
||
"""
|
||
初始化坡度和坡向生成器
|
||
|
||
Args:
|
||
dem_path: 输入DEM文件路径
|
||
"""
|
||
self.dem_path = dem_path
|
||
self.dataset = gdal.Open(dem_path, gdal.GA_ReadOnly)
|
||
|
||
if not self.dataset:
|
||
raise FileNotFoundError(f"无法打开DEM文件: {dem_path}")
|
||
|
||
# 获取地理信息
|
||
self.geotransform = self.dataset.GetGeoTransform()
|
||
self.projection = self.dataset.GetProjection()
|
||
self.band = self.dataset.GetRasterBand(1)
|
||
self.data = self.band.ReadAsArray()
|
||
|
||
# 获取栅格信息
|
||
self.cols = self.dataset.RasterXSize
|
||
self.rows = self.dataset.RasterYSize
|
||
self.no_data = self.band.GetNoDataValue()
|
||
|
||
# 如果DEM没有无效值设置,使用-9999
|
||
if self.no_data is None:
|
||
self.no_data = -9999.0
|
||
print(f"警告: DEM未设置无效值,将使用 {self.no_data} 作为无效值")
|
||
|
||
# 像元大小(考虑非正方形像元)
|
||
self.cell_size_x = abs(self.geotransform[1])
|
||
self.cell_size_y = abs(self.geotransform[5])
|
||
|
||
print(f"DEM信息: {self.rows}行 x {self.cols}列")
|
||
print(f"X方向像元大小: {self.cell_size_x:.4f}")
|
||
print(f"Y方向像元大小: {self.cell_size_y:.4f}")
|
||
print(f"无效值: {self.no_data}")
|
||
|
||
def calculate_slope(self, z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray:
|
||
"""
|
||
计算坡度
|
||
|
||
Args:
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项 'Horn' 或 'ZevenbergenThorne'
|
||
|
||
Returns:
|
||
坡度数组(单位:度)
|
||
"""
|
||
print(f"正在计算坡度 (算法: {algorithm})...")
|
||
|
||
# 创建输出数组
|
||
slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32)
|
||
|
||
# 创建填充后的数据(用于边界处理)
|
||
padded_data = np.pad(
|
||
self.data,
|
||
pad_width=1,
|
||
mode='constant',
|
||
constant_values=self.no_data
|
||
)
|
||
|
||
# 创建有效掩码
|
||
valid_mask = (self.data != self.no_data)
|
||
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
|
||
|
||
for i in range(self.rows):
|
||
for j in range(self.cols):
|
||
if not valid_mask[i, j]:
|
||
continue
|
||
|
||
i_pad, j_pad = i + 1, j + 1
|
||
|
||
# 获取3x3窗口
|
||
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
|
||
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
|
||
|
||
# 如果窗口内有无效值,跳过
|
||
if not np.all(window_valid):
|
||
continue
|
||
|
||
if algorithm.upper() == 'ZEVERBERGENTHORNE':
|
||
# Zevenbergen & Thorne 算法
|
||
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
|
||
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
|
||
else:
|
||
# Horn 算法(默认)
|
||
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
|
||
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
|
||
|
||
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
|
||
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
|
||
|
||
# 计算坡度(弧度转角度)
|
||
slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2))
|
||
slope_deg = math.degrees(slope_rad)
|
||
slope[i, j] = slope_deg
|
||
|
||
# 边界处理:将边缘设为无效值
|
||
slope[0, :] = self.no_data
|
||
slope[-1, :] = self.no_data
|
||
slope[:, 0] = self.no_data
|
||
slope[:, -1] = self.no_data
|
||
|
||
valid_slope = slope[slope != self.no_data]
|
||
if len(valid_slope) > 0:
|
||
print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f} 度")
|
||
|
||
return slope
|
||
|
||
def calculate_aspect(self, algorithm: str = 'Horn') -> np.ndarray:
|
||
"""
|
||
计算坡向
|
||
|
||
Args:
|
||
algorithm: 算法选项 'Horn' 或 'ZevenbergenThorne'
|
||
|
||
Returns:
|
||
坡向数组(单位:度,0-360,北为0度,顺时针增加)
|
||
"""
|
||
print(f"正在计算坡向 (算法: {algorithm})...")
|
||
|
||
# 创建输出数组
|
||
aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32) # 平地用-1表示
|
||
|
||
# 创建填充后的数据
|
||
padded_data = np.pad(
|
||
self.data,
|
||
pad_width=1,
|
||
mode='constant',
|
||
constant_values=self.no_data
|
||
)
|
||
|
||
# 创建有效掩码
|
||
valid_mask = (self.data != self.no_data)
|
||
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
|
||
|
||
for i in range(self.rows):
|
||
for j in range(self.cols):
|
||
if not valid_mask[i, j]:
|
||
continue
|
||
|
||
i_pad, j_pad = i + 1, j + 1
|
||
|
||
# 获取3x3窗口
|
||
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
|
||
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
|
||
|
||
# 如果窗口内有无效值,跳过
|
||
if not np.all(window_valid):
|
||
continue
|
||
|
||
if algorithm.upper() == 'ZEVERBERGENTHORNE':
|
||
# Zevenbergen & Thorne 算法
|
||
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
|
||
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
|
||
else:
|
||
# Horn 算法(默认)
|
||
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
|
||
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
|
||
|
||
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
|
||
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
|
||
|
||
# 计算坡向
|
||
if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10:
|
||
# 平地
|
||
aspect[i, j] = -1.0
|
||
else:
|
||
aspect_rad = math.atan2(dz_dy, -dz_dx) # atan2(y, x)
|
||
aspect_deg = math.degrees(aspect_rad)
|
||
|
||
# 转换为0-360度(北为0度,顺时针)
|
||
if aspect_deg < 0:
|
||
aspect_deg += 360.0
|
||
|
||
aspect[i, j] = aspect_deg
|
||
|
||
# 边界处理:将边缘设为-1
|
||
aspect[0, :] = -1.0
|
||
aspect[-1, :] = -1.0
|
||
aspect[:, 0] = -1.0
|
||
aspect[:, -1] = -1.0
|
||
|
||
valid_aspect = aspect[aspect != -1.0]
|
||
if len(valid_aspect) > 0:
|
||
print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f} 度")
|
||
|
||
return aspect
|
||
|
||
def calculate_slope_aspect(self, z_factor: float = 1.0,
|
||
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
|
||
"""
|
||
同时计算坡度和坡向(优化版本,减少重复计算)
|
||
|
||
Args:
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
(slope_array, aspect_array)
|
||
"""
|
||
print(f"同时计算坡度和坡向 (算法: {algorithm})...")
|
||
|
||
# 创建输出数组
|
||
slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32)
|
||
aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32)
|
||
|
||
# 创建填充后的数据
|
||
padded_data = np.pad(
|
||
self.data,
|
||
pad_width=1,
|
||
mode='constant',
|
||
constant_values=self.no_data
|
||
)
|
||
|
||
# 创建有效掩码
|
||
valid_mask = (self.data != self.no_data)
|
||
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
|
||
|
||
for i in range(self.rows):
|
||
for j in range(self.cols):
|
||
if not valid_mask[i, j]:
|
||
continue
|
||
|
||
i_pad, j_pad = i + 1, j + 1
|
||
|
||
# 获取3x3窗口
|
||
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
|
||
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
|
||
|
||
# 如果窗口内有无效值,跳过
|
||
if not np.all(window_valid):
|
||
continue
|
||
|
||
# 计算导数(一次计算,两个都用)
|
||
if algorithm.upper() == 'ZEVERBERGENTHORNE':
|
||
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
|
||
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
|
||
else:
|
||
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
|
||
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
|
||
|
||
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
|
||
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
|
||
|
||
# 计算坡度
|
||
slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2))
|
||
slope_deg = math.degrees(slope_rad)
|
||
slope[i, j] = slope_deg
|
||
|
||
# 计算坡向
|
||
if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10:
|
||
aspect[i, j] = -1.0
|
||
else:
|
||
aspect_rad = math.atan2(dz_dy, -dz_dx)
|
||
aspect_deg = math.degrees(aspect_rad)
|
||
|
||
if aspect_deg < 0:
|
||
aspect_deg += 360.0
|
||
|
||
aspect[i, j] = aspect_deg
|
||
|
||
# 边界处理
|
||
slope[0, :] = self.no_data
|
||
slope[-1, :] = self.no_data
|
||
slope[:, 0] = self.no_data
|
||
slope[:, -1] = self.no_data
|
||
|
||
aspect[0, :] = -1.0
|
||
aspect[-1, :] = -1.0
|
||
aspect[:, 0] = -1.0
|
||
aspect[:, -1] = -1.0
|
||
|
||
# 显示统计信息
|
||
valid_slope = slope[slope != self.no_data]
|
||
valid_aspect = aspect[aspect != -1.0]
|
||
|
||
if len(valid_slope) > 0:
|
||
print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f} 度")
|
||
if len(valid_aspect) > 0:
|
||
print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f} 度")
|
||
|
||
return slope, aspect
|
||
|
||
def save_raster(self, data: np.ndarray, output_path: str,
|
||
band_name: str = '', data_type: int = gdal.GDT_Float32,
|
||
no_data_value: Optional[float] = None) -> None:
|
||
"""
|
||
保存栅格数据为TIFF文件
|
||
|
||
Args:
|
||
data: 栅格数据数组
|
||
output_path: 输出文件路径
|
||
band_name: 波段名称
|
||
data_type: GDAL数据类型
|
||
no_data_value: 无效值
|
||
"""
|
||
driver = gdal.GetDriverByName('GTiff')
|
||
|
||
out_dataset = driver.Create(
|
||
output_path,
|
||
self.cols,
|
||
self.rows,
|
||
1, # 单波段
|
||
data_type,
|
||
options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES']
|
||
)
|
||
|
||
out_dataset.SetGeoTransform(self.geotransform)
|
||
out_dataset.SetProjection(self.projection)
|
||
|
||
out_band = out_dataset.GetRasterBand(1)
|
||
out_band.WriteArray(data)
|
||
|
||
if no_data_value is not None:
|
||
out_band.SetNoDataValue(no_data_value)
|
||
elif band_name == 'Slope':
|
||
out_band.SetNoDataValue(self.no_data)
|
||
elif band_name == 'Aspect':
|
||
out_band.SetNoDataValue(-1.0)
|
||
|
||
if band_name:
|
||
out_band.SetDescription(band_name)
|
||
|
||
# 设置统计信息(加速QGIS等软件加载)
|
||
out_band.ComputeStatistics(False)
|
||
|
||
out_dataset.FlushCache()
|
||
out_dataset = None
|
||
|
||
print(f"文件已保存: {output_path}")
|
||
if band_name:
|
||
print(f" 波段: {band_name}")
|
||
|
||
def save_combined_raster(self, slope_data: np.ndarray, aspect_data: np.ndarray,
|
||
output_path: str) -> None:
|
||
"""
|
||
保存包含坡度和坡向两个波段的TIFF文件
|
||
|
||
Args:
|
||
slope_data: 坡度数据
|
||
aspect_data: 坡向数据
|
||
output_path: 输出文件路径
|
||
"""
|
||
driver = gdal.GetDriverByName('GTiff')
|
||
|
||
# 创建包含2个波段的数据集
|
||
out_dataset = driver.Create(
|
||
output_path,
|
||
self.cols,
|
||
self.rows,
|
||
2, # 2个波段
|
||
gdal.GDT_Float32,
|
||
options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES']
|
||
)
|
||
|
||
# 设置地理信息
|
||
out_dataset.SetGeoTransform(self.geotransform)
|
||
out_dataset.SetProjection(self.projection)
|
||
|
||
# 写入坡度波段(波段1)
|
||
slope_band = out_dataset.GetRasterBand(1)
|
||
slope_band.WriteArray(slope_data)
|
||
slope_band.SetNoDataValue(self.no_data)
|
||
slope_band.SetDescription('Slope')
|
||
slope_band.SetMetadataItem('UNITS', 'degrees')
|
||
slope_band.SetMetadataItem('RANGE', '0-90')
|
||
slope_band.ComputeStatistics(False)
|
||
|
||
# 写入坡向波段(波段2)
|
||
aspect_band = out_dataset.GetRasterBand(2)
|
||
aspect_band.WriteArray(aspect_data)
|
||
aspect_band.SetNoDataValue(-1.0)
|
||
aspect_band.SetDescription('Aspect')
|
||
aspect_band.SetMetadataItem('UNITS', 'degrees')
|
||
aspect_band.SetMetadataItem('RANGE', '0-360')
|
||
aspect_band.SetMetadataItem('DIRECTION', 'Clockwise from North')
|
||
aspect_band.ComputeStatistics(False)
|
||
|
||
# 设置数据集元数据
|
||
out_dataset.SetMetadata({
|
||
'SOURCE_DEM': os.path.basename(self.dem_path),
|
||
'PROCESSING_METHOD': 'SlopeAspectGenerator',
|
||
'CELL_SIZE_X': str(self.cell_size_x),
|
||
'CELL_SIZE_Y': str(self.cell_size_y)
|
||
})
|
||
|
||
out_dataset.FlushCache()
|
||
out_dataset = None
|
||
|
||
print(f"合并文件已保存: {output_path}")
|
||
print(f" 波段1: Slope (坡度, 单位:度)")
|
||
print(f" 波段2: Aspect (坡向, 单位:度)")
|
||
|
||
def generate_slope_tif(self, output_path: Optional[str] = None,
|
||
z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray:
|
||
"""
|
||
生成坡度TIFF文件
|
||
|
||
Args:
|
||
output_path: 输出文件路径(可选)
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
坡度数据数组
|
||
"""
|
||
if output_path is None:
|
||
base_name = os.path.splitext(self.dem_path)[0]
|
||
output_path = f"{base_name}_slope.tif"
|
||
|
||
print(f"\n生成坡度文件...")
|
||
slope = self.calculate_slope(z_factor, algorithm)
|
||
self.save_raster(slope, output_path, 'Slope', no_data_value=self.no_data)
|
||
|
||
return slope
|
||
|
||
def generate_aspect_tif(self, output_path: Optional[str] = None,
|
||
algorithm: str = 'Horn') -> np.ndarray:
|
||
"""
|
||
生成坡向TIFF文件
|
||
|
||
Args:
|
||
output_path: 输出文件路径(可选)
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
坡向数据数组
|
||
"""
|
||
if output_path is None:
|
||
base_name = os.path.splitext(self.dem_path)[0]
|
||
output_path = f"{base_name}_aspect.tif"
|
||
|
||
print(f"\n生成坡向文件...")
|
||
aspect = self.calculate_aspect(algorithm)
|
||
self.save_raster(aspect, output_path, 'Aspect', no_data_value=-1.0)
|
||
|
||
return aspect
|
||
|
||
def generate_slope_aspect_separate(self,
|
||
slope_output: Optional[str] = None,
|
||
aspect_output: Optional[str] = None,
|
||
z_factor: float = 1.0,
|
||
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
|
||
"""
|
||
分别生成坡度和坡向TIFF文件
|
||
|
||
Args:
|
||
slope_output: 坡度输出路径(可选)
|
||
aspect_output: 坡向输出路径(可选)
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
(slope_data, aspect_data)
|
||
"""
|
||
# 设置输出路径
|
||
base_name = os.path.splitext(self.dem_path)[0]
|
||
|
||
if slope_output is None:
|
||
slope_output = f"{base_name}_slope.tif"
|
||
if aspect_output is None:
|
||
aspect_output = f"{base_name}_aspect.tif"
|
||
|
||
print(f"\n分别生成坡度和坡向文件...")
|
||
|
||
# 同时计算坡度和坡向(优化版本)
|
||
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
|
||
|
||
# 保存文件
|
||
self.save_raster(slope, slope_output, 'Slope', no_data_value=self.no_data)
|
||
self.save_raster(aspect, aspect_output, 'Aspect', no_data_value=-1.0)
|
||
|
||
return slope, aspect
|
||
|
||
def generate_combined_tif(self, output_path: Optional[str] = None,
|
||
z_factor: float = 1.0,
|
||
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
|
||
"""
|
||
生成包含坡度和坡向的合并TIFF文件
|
||
|
||
Args:
|
||
output_path: 输出文件路径(可选)
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
(slope_data, aspect_data)
|
||
"""
|
||
if output_path is None:
|
||
base_name = os.path.splitext(self.dem_path)[0]
|
||
output_path = f"{base_name}_slope_aspect.tif"
|
||
|
||
print(f"\n生成合并文件...")
|
||
|
||
# 同时计算坡度和坡向
|
||
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
|
||
|
||
# 保存合并文件
|
||
self.save_combined_raster(slope, aspect, output_path)
|
||
|
||
return slope, aspect
|
||
|
||
def generate_all(self, output_dir: Optional[str] = None,
|
||
z_factor: float = 1.0,
|
||
algorithm: str = 'Horn') -> Dict[str, str]:
|
||
"""
|
||
生成所有类型的文件:单独的坡度、单独的坡向、合并文件、统计信息
|
||
|
||
Args:
|
||
output_dir: 输出目录(可选)
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
包含所有输出文件路径的字典
|
||
"""
|
||
# 设置输出路径
|
||
base_name = os.path.basename(self.dem_path)
|
||
file_base = os.path.splitext(base_name)[0]
|
||
|
||
if output_dir:
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
output_prefix = os.path.join(output_dir, file_base)
|
||
else:
|
||
output_prefix = file_base
|
||
|
||
# 计算坡度和坡向(只计算一次)
|
||
print(f"\n计算坡度和坡向...")
|
||
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
|
||
|
||
# 生成各种文件
|
||
results = {}
|
||
|
||
# 1. 单独的坡度文件
|
||
slope_file = f"{output_prefix}_slope.tif"
|
||
self.save_raster(slope, slope_file, 'Slope', no_data_value=self.no_data)
|
||
results['slope'] = slope_file
|
||
|
||
# 2. 单独的坡向文件
|
||
aspect_file = f"{output_prefix}_aspect.tif"
|
||
self.save_raster(aspect, aspect_file, 'Aspect', no_data_value=-1.0)
|
||
results['aspect'] = aspect_file
|
||
|
||
# 3. 合并文件
|
||
combined_file = f"{output_prefix}_slope_aspect.tif"
|
||
self.save_combined_raster(slope, aspect, combined_file)
|
||
results['combined'] = combined_file
|
||
|
||
# 4. 统计信息文件(文本)
|
||
stats_file = f"{output_prefix}_statistics.txt"
|
||
self.save_statistics(slope, aspect, stats_file)
|
||
results['statistics'] = stats_file
|
||
|
||
print(f"\n所有文件生成完成!")
|
||
for key, path in results.items():
|
||
print(f" {key}: {os.path.basename(path)}")
|
||
|
||
return results
|
||
|
||
def save_statistics(self, slope_data: np.ndarray, aspect_data: np.ndarray,
|
||
output_path: str) -> None:
|
||
"""
|
||
保存统计信息到文本文件
|
||
|
||
Args:
|
||
slope_data: 坡度数据
|
||
aspect_data: 坡向数据
|
||
output_path: 输出文件路径
|
||
"""
|
||
valid_slope = slope_data[slope_data != self.no_data]
|
||
valid_aspect = aspect_data[aspect_data != -1.0]
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write("坡度和坡向统计信息\n")
|
||
f.write("=" * 50 + "\n")
|
||
f.write(f"输入DEM: {os.path.basename(self.dem_path)}\n")
|
||
f.write(f"栅格大小: {self.rows} x {self.cols}\n")
|
||
f.write(f"处理时间: {np.datetime64('now')}\n\n")
|
||
|
||
f.write("坡度统计:\n")
|
||
f.write("-" * 30 + "\n")
|
||
if len(valid_slope) > 0:
|
||
f.write(f"最小值: {np.min(valid_slope):.4f} 度\n")
|
||
f.write(f"最大值: {np.max(valid_slope):.4f} 度\n")
|
||
f.write(f"平均值: {np.mean(valid_slope):.4f} 度\n")
|
||
f.write(f"标准差: {np.std(valid_slope):.4f} 度\n")
|
||
f.write(f"中位数: {np.median(valid_slope):.4f} 度\n")
|
||
f.write(f"有效像元数: {len(valid_slope)} / {self.rows * self.cols}\n")
|
||
|
||
# 坡度分级统计
|
||
f.write("\n坡度分级统计:\n")
|
||
slope_classes = ['平地(0-2)', '缓坡(2-5)', '斜坡(5-15)', '陡坡(15-30)', '急陡坡(30-45)', '峭壁(45-90)']
|
||
bins = [0, 2, 5, 15, 30, 45, 90]
|
||
|
||
hist, _ = np.histogram(valid_slope, bins=bins)
|
||
total = sum(hist)
|
||
|
||
for i in range(len(slope_classes)):
|
||
if hist[i] > 0:
|
||
percentage = (hist[i] / total) * 100
|
||
f.write(f"{slope_classes[i]}: {hist[i]} ({percentage:.2f}%)\n")
|
||
else:
|
||
f.write("无有效数据\n")
|
||
|
||
f.write("\n坡向统计:\n")
|
||
f.write("-" * 30 + "\n")
|
||
if len(valid_aspect) > 0:
|
||
f.write(f"最小值: {np.min(valid_aspect):.4f} 度\n")
|
||
f.write(f"最大值: {np.max(valid_aspect):.4f} 度\n")
|
||
f.write(f"平均值: {np.mean(valid_aspect):.4f} 度\n")
|
||
f.write(f"标准差: {np.std(valid_aspect):.4f} 度\n")
|
||
f.write(f"中位数: {np.median(valid_aspect):.4f} 度\n")
|
||
f.write(f"有效像元数: {len(valid_aspect)} / {self.rows * self.cols}\n")
|
||
|
||
# 坡向分类统计
|
||
f.write("\n坡向分类统计:\n")
|
||
directions = ['北(0-22.5)', '东北(22.5-67.5)', '东(67.5-112.5)', '东南(112.5-157.5)',
|
||
'南(157.5-202.5)', '西南(202.5-247.5)', '西(247.5-292.5)', '西北(292.5-337.5)', '北(337.5-360)']
|
||
bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360]
|
||
|
||
hist, _ = np.histogram(valid_aspect, bins=bins)
|
||
total = sum(hist)
|
||
|
||
for i in range(len(directions)):
|
||
if hist[i] > 0:
|
||
percentage = (hist[i] / total) * 100
|
||
f.write(f"{directions[i]}: {hist[i]} ({percentage:.2f}%)\n")
|
||
else:
|
||
f.write("无有效数据\n")
|
||
|
||
print(f"统计信息已保存: {output_path}")
|
||
|
||
def get_dem_info(self) -> Dict[str, Any]:
|
||
"""
|
||
获取DEM文件的详细信息
|
||
|
||
Returns:
|
||
包含DEM信息的字典
|
||
"""
|
||
info = {
|
||
'file_path': self.dem_path,
|
||
'file_name': os.path.basename(self.dem_path),
|
||
'rows': self.rows,
|
||
'cols': self.cols,
|
||
'cell_size_x': self.cell_size_x,
|
||
'cell_size_y': self.cell_size_y,
|
||
'no_data_value': self.no_data,
|
||
'projection': self.projection
|
||
}
|
||
|
||
# 获取高程统计
|
||
valid_data = self.data[self.data != self.no_data]
|
||
if len(valid_data) > 0:
|
||
info['elevation_min'] = float(np.min(valid_data))
|
||
info['elevation_max'] = float(np.max(valid_data))
|
||
info['elevation_mean'] = float(np.mean(valid_data))
|
||
info['elevation_std'] = float(np.std(valid_data))
|
||
info['valid_cells'] = int(len(valid_data))
|
||
info['total_cells'] = self.rows * self.cols
|
||
info['valid_percentage'] = (len(valid_data) / (self.rows * self.cols)) * 100
|
||
|
||
return info
|
||
|
||
def get_slope_statistics(self, slope_data: np.ndarray) -> Dict[str, Any]:
|
||
"""
|
||
获取坡度统计信息
|
||
|
||
Args:
|
||
slope_data: 坡度数据数组
|
||
|
||
Returns:
|
||
坡度统计信息字典
|
||
"""
|
||
valid_slope = slope_data[slope_data != self.no_data]
|
||
|
||
stats = {
|
||
'min': float(np.min(valid_slope)) if len(valid_slope) > 0 else None,
|
||
'max': float(np.max(valid_slope)) if len(valid_slope) > 0 else None,
|
||
'mean': float(np.mean(valid_slope)) if len(valid_slope) > 0 else None,
|
||
'std': float(np.std(valid_slope)) if len(valid_slope) > 0 else None,
|
||
'median': float(np.median(valid_slope)) if len(valid_slope) > 0 else None,
|
||
'valid_cells': int(len(valid_slope)),
|
||
'total_cells': self.rows * self.cols
|
||
}
|
||
|
||
return stats
|
||
|
||
def get_aspect_statistics(self, aspect_data: np.ndarray) -> Dict[str, Any]:
|
||
"""
|
||
获取坡向统计信息
|
||
|
||
Args:
|
||
aspect_data: 坡向数据数组
|
||
|
||
Returns:
|
||
坡向统计信息字典
|
||
"""
|
||
valid_aspect = aspect_data[aspect_data != -1.0]
|
||
|
||
stats = {
|
||
'min': float(np.min(valid_aspect)) if len(valid_aspect) > 0 else None,
|
||
'max': float(np.max(valid_aspect)) if len(valid_aspect) > 0 else None,
|
||
'mean': float(np.mean(valid_aspect)) if len(valid_aspect) > 0 else None,
|
||
'std': float(np.std(valid_aspect)) if len(valid_aspect) > 0 else None,
|
||
'median': float(np.median(valid_aspect)) if len(valid_aspect) > 0 else None,
|
||
'valid_cells': int(len(valid_aspect)),
|
||
'total_cells': self.rows * self.cols
|
||
}
|
||
|
||
# 坡向分类统计
|
||
if len(valid_aspect) > 0:
|
||
bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360]
|
||
directions = ['北', '东北', '东', '东南', '南', '西南', '西', '西北', '北']
|
||
|
||
hist, _ = np.histogram(valid_aspect, bins=bins)
|
||
direction_stats = {}
|
||
|
||
for i in range(len(directions)):
|
||
direction_stats[directions[i]] = {
|
||
'count': int(hist[i]),
|
||
'percentage': (hist[i] / len(valid_aspect)) * 100 if len(valid_aspect) > 0 else 0
|
||
}
|
||
|
||
stats['direction_distribution'] = direction_stats
|
||
|
||
return stats
|
||
|
||
def close(self):
|
||
"""关闭数据集,释放资源"""
|
||
if hasattr(self, 'dataset') and self.dataset:
|
||
self.dataset = None
|
||
if hasattr(self, 'band') and self.band:
|
||
self.band = None
|
||
print("资源已释放")
|
||
|
||
def __enter__(self):
|
||
"""支持with语句"""
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
"""退出with语句时自动关闭"""
|
||
self.close()
|
||
|
||
|
||
# 使用示例和辅助函数
|
||
def create_slope_aspect(dem_path: str, output_type: str = 'combined',
|
||
output_path: Optional[str] = None,
|
||
z_factor: float = 1.0, algorithm: str = 'Horn') -> Dict[str, str]:
|
||
"""
|
||
创建坡度坡向的便捷函数
|
||
|
||
Args:
|
||
dem_path: DEM文件路径
|
||
output_type: 输出类型 'slope', 'aspect', 'separate', 'combined', 'all'
|
||
output_path: 输出路径(对于单个文件)或输出目录
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
包含输出文件路径的字典
|
||
"""
|
||
print(f"处理DEM文件: {dem_path}")
|
||
|
||
with SlopeAspectGenerator(dem_path) as generator:
|
||
# 获取DEM信息
|
||
dem_info = generator.get_dem_info()
|
||
print(f"DEM信息: {dem_info['rows']}x{dem_info['cols']}, "
|
||
f"高程范围: {dem_info.get('elevation_min', 'N/A'):.2f}-"
|
||
f"{dem_info.get('elevation_max', 'N/A'):.2f}")
|
||
|
||
results = {}
|
||
|
||
if output_type == 'slope':
|
||
# 只生成坡度
|
||
slope_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope.tif"
|
||
slope = generator.generate_slope_tif(slope_file, z_factor, algorithm)
|
||
results['slope'] = slope_file
|
||
|
||
# 获取统计信息
|
||
slope_stats = generator.get_slope_statistics(slope)
|
||
print(f"坡度统计: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}度")
|
||
|
||
elif output_type == 'aspect':
|
||
# 只生成坡向
|
||
aspect_file = output_path or f"{os.path.splitext(dem_path)[0]}_aspect.tif"
|
||
aspect = generator.generate_aspect_tif(aspect_file, algorithm)
|
||
results['aspect'] = aspect_file
|
||
|
||
# 获取统计信息
|
||
aspect_stats = generator.get_aspect_statistics(aspect)
|
||
print(f"坡向统计: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}度")
|
||
|
||
elif output_type == 'separate':
|
||
# 分别生成坡度和坡向
|
||
if output_path and os.path.isdir(output_path):
|
||
# 如果输出路径是目录
|
||
slope_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_slope.tif")
|
||
aspect_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_aspect.tif")
|
||
slope, aspect = generator.generate_slope_aspect_separate(
|
||
slope_file, aspect_file, z_factor, algorithm)
|
||
else:
|
||
# 如果输出路径是文件前缀
|
||
base = output_path or os.path.splitext(dem_path)[0]
|
||
slope_file = f"{base}_slope.tif"
|
||
aspect_file = f"{base}_aspect.tif"
|
||
slope, aspect = generator.generate_slope_aspect_separate(
|
||
slope_file, aspect_file, z_factor, algorithm)
|
||
|
||
results['slope'] = slope_file
|
||
results['aspect'] = aspect_file
|
||
|
||
# 获取统计信息
|
||
slope_stats = generator.get_slope_statistics(slope)
|
||
aspect_stats = generator.get_aspect_statistics(aspect)
|
||
print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}度")
|
||
print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}度")
|
||
|
||
elif output_type == 'combined':
|
||
# 生成合并文件
|
||
combined_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope_aspect.tif"
|
||
slope, aspect = generator.generate_combined_tif(combined_file, z_factor, algorithm)
|
||
results['combined'] = combined_file
|
||
|
||
# 获取统计信息
|
||
slope_stats = generator.get_slope_statistics(slope)
|
||
aspect_stats = generator.get_aspect_statistics(aspect)
|
||
print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}度")
|
||
print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}度")
|
||
|
||
elif output_type == 'all':
|
||
# 生成所有文件
|
||
output_dir = output_path if (output_path and os.path.isdir(output_path)) else None
|
||
results = generator.generate_all(output_dir, z_factor, algorithm)
|
||
|
||
else:
|
||
raise ValueError(f"不支持的输出类型: {output_type}")
|
||
|
||
return results
|
||
|
||
|
||
def batch_process_slope_aspect(dem_files: list, output_dir: str,
|
||
output_type: str = 'all',
|
||
z_factor: float = 1.0,
|
||
algorithm: str = 'Horn') -> Dict[str, list]:
|
||
"""
|
||
批量处理多个DEM文件
|
||
|
||
Args:
|
||
dem_files: DEM文件路径列表
|
||
output_dir: 输出目录
|
||
output_type: 输出类型
|
||
z_factor: 垂直单位转换因子
|
||
algorithm: 算法选项
|
||
|
||
Returns:
|
||
包含处理结果的字典
|
||
"""
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
results = {
|
||
'success': [],
|
||
'failed': [],
|
||
'outputs': []
|
||
}
|
||
|
||
total_files = len(dem_files)
|
||
print(f"开始批量处理 {total_files} 个DEM文件...")
|
||
|
||
for i, dem_file in enumerate(dem_files, 1):
|
||
if not os.path.exists(dem_file):
|
||
print(f"[{i}/{total_files}] 文件不存在: {dem_file}")
|
||
results['failed'].append(dem_file)
|
||
continue
|
||
|
||
print(f"\n[{i}/{total_files}] 处理: {os.path.basename(dem_file)}")
|
||
|
||
try:
|
||
# 处理文件
|
||
file_results = create_slope_aspect(
|
||
dem_file,
|
||
output_type,
|
||
output_dir,
|
||
z_factor,
|
||
algorithm
|
||
)
|
||
|
||
results['success'].append(dem_file)
|
||
results['outputs'].append(file_results)
|
||
|
||
print(f"[{i}/{total_files}] 完成")
|
||
|
||
except Exception as e:
|
||
print(f"[{i}/{total_files}] 失败: {e}")
|
||
results['failed'].append(dem_file)
|
||
|
||
# 打印总结
|
||
print(f"\n" + "="*50)
|
||
print(f"批量处理完成!")
|
||
print(f"成功: {len(results['success'])} 个")
|
||
print(f"失败: {len(results['failed'])} 个")
|
||
|
||
if results['failed']:
|
||
print("\n失败的文件:")
|
||
for failed_file in results['failed']:
|
||
print(f" {os.path.basename(failed_file)}")
|
||
|
||
return results
|
||
|
||
|
||
# 主示例代码
|
||
def main_example():
|
||
"""使用示例"""
|
||
|
||
# 示例1: 基本使用
|
||
print("示例1: 基本使用")
|
||
print("=" * 50)
|
||
|
||
# 配置参数
|
||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
dem_file = os.path.join(SCRIPT_DIR, "o_dem_23d110d0.tif")
|
||
|
||
if os.path.exists(dem_file):
|
||
# 方法1: 使用便捷函数
|
||
print("\n方法1: 使用便捷函数")
|
||
slope_file = os.path.join(SCRIPT_DIR, "o_combined.tif")
|
||
results = create_slope_aspect(dem_file, 'combined', slope_file)
|
||
|
||
# 方法2: 直接使用类
|
||
# print("\n方法2: 直接使用类")
|
||
# with SlopeAspectGenerator(dem_file) as gen:
|
||
# # 获取DEM信息
|
||
# dem_info = gen.get_dem_info()
|
||
# print(f"DEM信息: {dem_info}")
|
||
|
||
# # 生成合并文件
|
||
# slope, aspect = gen.generate_combined_tif("o_combined.tif")
|
||
|
||
# # 获取统计信息
|
||
# slope_stats = gen.get_slope_statistics(slope)
|
||
# aspect_stats = gen.get_aspect_statistics(aspect)
|
||
# print(f"坡度统计: {slope_stats}")
|
||
# print(f"坡向统计: {aspect_stats}")
|
||
|
||
else:
|
||
print(f"DEM文件不存在: {dem_file}")
|
||
print("请创建一个示例DEM文件或使用您自己的DEM文件")
|
||
|
||
# 创建一个简单的示例DEM
|
||
print("\n创建一个示例DEM文件用于测试...")
|
||
create_sample_dem("sample_dem.tif")
|
||
|
||
# 使用示例DEM
|
||
print("\n使用示例DEM进行测试...")
|
||
results = create_slope_aspect("sample_dem.tif", 'all')
|
||
|
||
# 示例2: 批量处理
|
||
# print("\n\n示例2: 批量处理")
|
||
# print("=" * 50)
|
||
|
||
# # 假设有多个DEM文件
|
||
# dem_files = ["dem1.tif", "dem2.tif", "dem3.tif"] # 替换为实际文件
|
||
|
||
# # 过滤出存在的文件
|
||
# existing_files = [f for f in dem_files if os.path.exists(f)]
|
||
|
||
# if len(existing_files) > 0:
|
||
# batch_results = batch_process_slope_aspect(
|
||
# existing_files,
|
||
# "batch_output",
|
||
# 'all'
|
||
# )
|
||
# else:
|
||
# print("未找到DEM文件进行批量处理")
|
||
# print("创建示例文件进行批量处理演示...")
|
||
|
||
# # 创建几个示例DEM文件
|
||
# for i in range(1, 4):
|
||
# create_sample_dem(f"sample_dem_{i}.tif", offset=i*100)
|
||
|
||
# sample_files = [f"sample_dem_{i}.tif" for i in range(1, 4)]
|
||
# batch_results = batch_process_slope_aspect(
|
||
# sample_files,
|
||
# "batch_output",
|
||
# 'all'
|
||
# )
|
||
|
||
|
||
def create_sample_dem(output_path: str, rows: int = 100, cols: int = 100,
|
||
cell_size: float = 30.0, offset: float = 0.0):
|
||
"""
|
||
创建一个示例DEM文件用于测试
|
||
|
||
Args:
|
||
output_path: 输出文件路径
|
||
rows: 行数
|
||
cols: 列数
|
||
cell_size: 像元大小
|
||
offset: 高程偏移量
|
||
"""
|
||
from osgeo import osr
|
||
|
||
# 创建一个简单的地形:一个山峰
|
||
x = np.linspace(-5, 5, cols)
|
||
y = np.linspace(-5, 5, rows)
|
||
X, Y = np.meshgrid(x, y)
|
||
|
||
# 创建高程数据(高斯山峰)
|
||
elevation = 1000 * np.exp(-(X**2 + Y**2) / 10) + offset
|
||
|
||
# 添加一些噪声
|
||
elevation += np.random.normal(0, 10, (rows, cols))
|
||
|
||
# 创建栅格文件
|
||
driver = gdal.GetDriverByName('GTiff')
|
||
dataset = driver.Create(
|
||
output_path,
|
||
cols,
|
||
rows,
|
||
1,
|
||
gdal.GDT_Float32
|
||
)
|
||
|
||
# 设置地理变换(左下角坐标为(0,0))
|
||
dataset.SetGeoTransform((0, cell_size, 0, 0, 0, -cell_size))
|
||
|
||
# 设置投影(WGS84)
|
||
srs = osr.SpatialReference()
|
||
srs.ImportFromEPSG(4326)
|
||
dataset.SetProjection(srs.ExportToWkt())
|
||
|
||
# 写入数据
|
||
band = dataset.GetRasterBand(1)
|
||
band.WriteArray(elevation)
|
||
band.SetNoDataValue(-9999.0)
|
||
band.SetDescription('Elevation')
|
||
|
||
dataset.FlushCache()
|
||
dataset = None
|
||
|
||
print(f"示例DEM已创建: {output_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 运行示例
|
||
main_example() |