ai_project_v1/b3dm/slope_aspect_tif.py

1062 lines
39 KiB
Python
Raw Permalink Normal View History

2026-01-29 11:51:20 +08:00
import numpy as np
from osgeo import gdal
import math
import os
from typing import Optional, Tuple, Dict, Any
class SlopeAspectGenerator:
"""
坡度和坡向生成器
支持生成单独的坡度坡向文件或包含两者的合并文件
"""
def __init__(self, dem_path: str):
"""
初始化坡度和坡向生成器
Args:
dem_path: 输入DEM文件路径
"""
self.dem_path = dem_path
self.dataset = gdal.Open(dem_path, gdal.GA_ReadOnly)
if not self.dataset:
raise FileNotFoundError(f"无法打开DEM文件: {dem_path}")
# 获取地理信息
self.geotransform = self.dataset.GetGeoTransform()
self.projection = self.dataset.GetProjection()
self.band = self.dataset.GetRasterBand(1)
self.data = self.band.ReadAsArray()
# 获取栅格信息
self.cols = self.dataset.RasterXSize
self.rows = self.dataset.RasterYSize
self.no_data = self.band.GetNoDataValue()
# 如果DEM没有无效值设置使用-9999
if self.no_data is None:
self.no_data = -9999.0
print(f"警告: DEM未设置无效值将使用 {self.no_data} 作为无效值")
# 像元大小(考虑非正方形像元)
self.cell_size_x = abs(self.geotransform[1])
self.cell_size_y = abs(self.geotransform[5])
print(f"DEM信息: {self.rows}行 x {self.cols}")
print(f"X方向像元大小: {self.cell_size_x:.4f}")
print(f"Y方向像元大小: {self.cell_size_y:.4f}")
print(f"无效值: {self.no_data}")
def calculate_slope(self, z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray:
"""
计算坡度
Args:
z_factor: 垂直单位转换因子
algorithm: 算法选项 'Horn' 'ZevenbergenThorne'
Returns:
坡度数组单位
"""
print(f"正在计算坡度 (算法: {algorithm})...")
# 创建输出数组
slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32)
# 创建填充后的数据(用于边界处理)
padded_data = np.pad(
self.data,
pad_width=1,
mode='constant',
constant_values=self.no_data
)
# 创建有效掩码
valid_mask = (self.data != self.no_data)
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
for i in range(self.rows):
for j in range(self.cols):
if not valid_mask[i, j]:
continue
i_pad, j_pad = i + 1, j + 1
# 获取3x3窗口
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
# 如果窗口内有无效值,跳过
if not np.all(window_valid):
continue
if algorithm.upper() == 'ZEVERBERGENTHORNE':
# Zevenbergen & Thorne 算法
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
else:
# Horn 算法(默认)
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
# 计算坡度(弧度转角度)
slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2))
slope_deg = math.degrees(slope_rad)
slope[i, j] = slope_deg
# 边界处理:将边缘设为无效值
slope[0, :] = self.no_data
slope[-1, :] = self.no_data
slope[:, 0] = self.no_data
slope[:, -1] = self.no_data
valid_slope = slope[slope != self.no_data]
if len(valid_slope) > 0:
print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f}")
return slope
def calculate_aspect(self, algorithm: str = 'Horn') -> np.ndarray:
"""
计算坡向
Args:
algorithm: 算法选项 'Horn' 'ZevenbergenThorne'
Returns:
坡向数组单位0-360北为0度顺时针增加
"""
print(f"正在计算坡向 (算法: {algorithm})...")
# 创建输出数组
aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32) # 平地用-1表示
# 创建填充后的数据
padded_data = np.pad(
self.data,
pad_width=1,
mode='constant',
constant_values=self.no_data
)
# 创建有效掩码
valid_mask = (self.data != self.no_data)
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
for i in range(self.rows):
for j in range(self.cols):
if not valid_mask[i, j]:
continue
i_pad, j_pad = i + 1, j + 1
# 获取3x3窗口
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
# 如果窗口内有无效值,跳过
if not np.all(window_valid):
continue
if algorithm.upper() == 'ZEVERBERGENTHORNE':
# Zevenbergen & Thorne 算法
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
else:
# Horn 算法(默认)
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
# 计算坡向
if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10:
# 平地
aspect[i, j] = -1.0
else:
aspect_rad = math.atan2(dz_dy, -dz_dx) # atan2(y, x)
aspect_deg = math.degrees(aspect_rad)
# 转换为0-360度北为0度顺时针
if aspect_deg < 0:
aspect_deg += 360.0
aspect[i, j] = aspect_deg
# 边界处理:将边缘设为-1
aspect[0, :] = -1.0
aspect[-1, :] = -1.0
aspect[:, 0] = -1.0
aspect[:, -1] = -1.0
valid_aspect = aspect[aspect != -1.0]
if len(valid_aspect) > 0:
print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f}")
return aspect
def calculate_slope_aspect(self, z_factor: float = 1.0,
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
"""
同时计算坡度和坡向优化版本减少重复计算
Args:
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
(slope_array, aspect_array)
"""
print(f"同时计算坡度和坡向 (算法: {algorithm})...")
# 创建输出数组
slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32)
aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32)
# 创建填充后的数据
padded_data = np.pad(
self.data,
pad_width=1,
mode='constant',
constant_values=self.no_data
)
# 创建有效掩码
valid_mask = (self.data != self.no_data)
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
for i in range(self.rows):
for j in range(self.cols):
if not valid_mask[i, j]:
continue
i_pad, j_pad = i + 1, j + 1
# 获取3x3窗口
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
# 如果窗口内有无效值,跳过
if not np.all(window_valid):
continue
# 计算导数(一次计算,两个都用)
if algorithm.upper() == 'ZEVERBERGENTHORNE':
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
else:
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
# 计算坡度
slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2))
slope_deg = math.degrees(slope_rad)
slope[i, j] = slope_deg
# 计算坡向
if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10:
aspect[i, j] = -1.0
else:
aspect_rad = math.atan2(dz_dy, -dz_dx)
aspect_deg = math.degrees(aspect_rad)
if aspect_deg < 0:
aspect_deg += 360.0
aspect[i, j] = aspect_deg
# 边界处理
slope[0, :] = self.no_data
slope[-1, :] = self.no_data
slope[:, 0] = self.no_data
slope[:, -1] = self.no_data
aspect[0, :] = -1.0
aspect[-1, :] = -1.0
aspect[:, 0] = -1.0
aspect[:, -1] = -1.0
# 显示统计信息
valid_slope = slope[slope != self.no_data]
valid_aspect = aspect[aspect != -1.0]
if len(valid_slope) > 0:
print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f}")
if len(valid_aspect) > 0:
print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f}")
return slope, aspect
def save_raster(self, data: np.ndarray, output_path: str,
band_name: str = '', data_type: int = gdal.GDT_Float32,
no_data_value: Optional[float] = None) -> None:
"""
保存栅格数据为TIFF文件
Args:
data: 栅格数据数组
output_path: 输出文件路径
band_name: 波段名称
data_type: GDAL数据类型
no_data_value: 无效值
"""
driver = gdal.GetDriverByName('GTiff')
out_dataset = driver.Create(
output_path,
self.cols,
self.rows,
1, # 单波段
data_type,
options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES']
)
out_dataset.SetGeoTransform(self.geotransform)
out_dataset.SetProjection(self.projection)
out_band = out_dataset.GetRasterBand(1)
out_band.WriteArray(data)
if no_data_value is not None:
out_band.SetNoDataValue(no_data_value)
elif band_name == 'Slope':
out_band.SetNoDataValue(self.no_data)
elif band_name == 'Aspect':
out_band.SetNoDataValue(-1.0)
if band_name:
out_band.SetDescription(band_name)
# 设置统计信息加速QGIS等软件加载
out_band.ComputeStatistics(False)
out_dataset.FlushCache()
out_dataset = None
print(f"文件已保存: {output_path}")
if band_name:
print(f" 波段: {band_name}")
def save_combined_raster(self, slope_data: np.ndarray, aspect_data: np.ndarray,
output_path: str) -> None:
"""
保存包含坡度和坡向两个波段的TIFF文件
Args:
slope_data: 坡度数据
aspect_data: 坡向数据
output_path: 输出文件路径
"""
driver = gdal.GetDriverByName('GTiff')
# 创建包含2个波段的数据集
out_dataset = driver.Create(
output_path,
self.cols,
self.rows,
2, # 2个波段
gdal.GDT_Float32,
options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES']
)
# 设置地理信息
out_dataset.SetGeoTransform(self.geotransform)
out_dataset.SetProjection(self.projection)
# 写入坡度波段波段1
slope_band = out_dataset.GetRasterBand(1)
slope_band.WriteArray(slope_data)
slope_band.SetNoDataValue(self.no_data)
slope_band.SetDescription('Slope')
slope_band.SetMetadataItem('UNITS', 'degrees')
slope_band.SetMetadataItem('RANGE', '0-90')
slope_band.ComputeStatistics(False)
# 写入坡向波段波段2
aspect_band = out_dataset.GetRasterBand(2)
aspect_band.WriteArray(aspect_data)
aspect_band.SetNoDataValue(-1.0)
aspect_band.SetDescription('Aspect')
aspect_band.SetMetadataItem('UNITS', 'degrees')
aspect_band.SetMetadataItem('RANGE', '0-360')
aspect_band.SetMetadataItem('DIRECTION', 'Clockwise from North')
aspect_band.ComputeStatistics(False)
# 设置数据集元数据
out_dataset.SetMetadata({
'SOURCE_DEM': os.path.basename(self.dem_path),
'PROCESSING_METHOD': 'SlopeAspectGenerator',
'CELL_SIZE_X': str(self.cell_size_x),
'CELL_SIZE_Y': str(self.cell_size_y)
})
out_dataset.FlushCache()
out_dataset = None
print(f"合并文件已保存: {output_path}")
print(f" 波段1: Slope (坡度, 单位:度)")
print(f" 波段2: Aspect (坡向, 单位:度)")
def generate_slope_tif(self, output_path: Optional[str] = None,
z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray:
"""
生成坡度TIFF文件
Args:
output_path: 输出文件路径可选
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
坡度数据数组
"""
if output_path is None:
base_name = os.path.splitext(self.dem_path)[0]
output_path = f"{base_name}_slope.tif"
print(f"\n生成坡度文件...")
slope = self.calculate_slope(z_factor, algorithm)
self.save_raster(slope, output_path, 'Slope', no_data_value=self.no_data)
return slope
def generate_aspect_tif(self, output_path: Optional[str] = None,
algorithm: str = 'Horn') -> np.ndarray:
"""
生成坡向TIFF文件
Args:
output_path: 输出文件路径可选
algorithm: 算法选项
Returns:
坡向数据数组
"""
if output_path is None:
base_name = os.path.splitext(self.dem_path)[0]
output_path = f"{base_name}_aspect.tif"
print(f"\n生成坡向文件...")
aspect = self.calculate_aspect(algorithm)
self.save_raster(aspect, output_path, 'Aspect', no_data_value=-1.0)
return aspect
def generate_slope_aspect_separate(self,
slope_output: Optional[str] = None,
aspect_output: Optional[str] = None,
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
"""
分别生成坡度和坡向TIFF文件
Args:
slope_output: 坡度输出路径可选
aspect_output: 坡向输出路径可选
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
(slope_data, aspect_data)
"""
# 设置输出路径
base_name = os.path.splitext(self.dem_path)[0]
if slope_output is None:
slope_output = f"{base_name}_slope.tif"
if aspect_output is None:
aspect_output = f"{base_name}_aspect.tif"
print(f"\n分别生成坡度和坡向文件...")
# 同时计算坡度和坡向(优化版本)
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
# 保存文件
self.save_raster(slope, slope_output, 'Slope', no_data_value=self.no_data)
self.save_raster(aspect, aspect_output, 'Aspect', no_data_value=-1.0)
return slope, aspect
def generate_combined_tif(self, output_path: Optional[str] = None,
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
"""
生成包含坡度和坡向的合并TIFF文件
Args:
output_path: 输出文件路径可选
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
(slope_data, aspect_data)
"""
if output_path is None:
base_name = os.path.splitext(self.dem_path)[0]
output_path = f"{base_name}_slope_aspect.tif"
print(f"\n生成合并文件...")
# 同时计算坡度和坡向
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
# 保存合并文件
self.save_combined_raster(slope, aspect, output_path)
return slope, aspect
def generate_all(self, output_dir: Optional[str] = None,
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Dict[str, str]:
"""
生成所有类型的文件单独的坡度单独的坡向合并文件统计信息
Args:
output_dir: 输出目录可选
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
包含所有输出文件路径的字典
"""
# 设置输出路径
base_name = os.path.basename(self.dem_path)
file_base = os.path.splitext(base_name)[0]
if output_dir:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_prefix = os.path.join(output_dir, file_base)
else:
output_prefix = file_base
# 计算坡度和坡向(只计算一次)
print(f"\n计算坡度和坡向...")
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
# 生成各种文件
results = {}
# 1. 单独的坡度文件
slope_file = f"{output_prefix}_slope.tif"
self.save_raster(slope, slope_file, 'Slope', no_data_value=self.no_data)
results['slope'] = slope_file
# 2. 单独的坡向文件
aspect_file = f"{output_prefix}_aspect.tif"
self.save_raster(aspect, aspect_file, 'Aspect', no_data_value=-1.0)
results['aspect'] = aspect_file
# 3. 合并文件
combined_file = f"{output_prefix}_slope_aspect.tif"
self.save_combined_raster(slope, aspect, combined_file)
results['combined'] = combined_file
# 4. 统计信息文件(文本)
stats_file = f"{output_prefix}_statistics.txt"
self.save_statistics(slope, aspect, stats_file)
results['statistics'] = stats_file
print(f"\n所有文件生成完成!")
for key, path in results.items():
print(f" {key}: {os.path.basename(path)}")
return results
def save_statistics(self, slope_data: np.ndarray, aspect_data: np.ndarray,
output_path: str) -> None:
"""
保存统计信息到文本文件
Args:
slope_data: 坡度数据
aspect_data: 坡向数据
output_path: 输出文件路径
"""
valid_slope = slope_data[slope_data != self.no_data]
valid_aspect = aspect_data[aspect_data != -1.0]
with open(output_path, 'w', encoding='utf-8') as f:
f.write("坡度和坡向统计信息\n")
f.write("=" * 50 + "\n")
f.write(f"输入DEM: {os.path.basename(self.dem_path)}\n")
f.write(f"栅格大小: {self.rows} x {self.cols}\n")
f.write(f"处理时间: {np.datetime64('now')}\n\n")
f.write("坡度统计:\n")
f.write("-" * 30 + "\n")
if len(valid_slope) > 0:
f.write(f"最小值: {np.min(valid_slope):.4f}\n")
f.write(f"最大值: {np.max(valid_slope):.4f}\n")
f.write(f"平均值: {np.mean(valid_slope):.4f}\n")
f.write(f"标准差: {np.std(valid_slope):.4f}\n")
f.write(f"中位数: {np.median(valid_slope):.4f}\n")
f.write(f"有效像元数: {len(valid_slope)} / {self.rows * self.cols}\n")
# 坡度分级统计
f.write("\n坡度分级统计:\n")
slope_classes = ['平地(0-2)', '缓坡(2-5)', '斜坡(5-15)', '陡坡(15-30)', '急陡坡(30-45)', '峭壁(45-90)']
bins = [0, 2, 5, 15, 30, 45, 90]
hist, _ = np.histogram(valid_slope, bins=bins)
total = sum(hist)
for i in range(len(slope_classes)):
if hist[i] > 0:
percentage = (hist[i] / total) * 100
f.write(f"{slope_classes[i]}: {hist[i]} ({percentage:.2f}%)\n")
else:
f.write("无有效数据\n")
f.write("\n坡向统计:\n")
f.write("-" * 30 + "\n")
if len(valid_aspect) > 0:
f.write(f"最小值: {np.min(valid_aspect):.4f}\n")
f.write(f"最大值: {np.max(valid_aspect):.4f}\n")
f.write(f"平均值: {np.mean(valid_aspect):.4f}\n")
f.write(f"标准差: {np.std(valid_aspect):.4f}\n")
f.write(f"中位数: {np.median(valid_aspect):.4f}\n")
f.write(f"有效像元数: {len(valid_aspect)} / {self.rows * self.cols}\n")
# 坡向分类统计
f.write("\n坡向分类统计:\n")
directions = ['北(0-22.5)', '东北(22.5-67.5)', '东(67.5-112.5)', '东南(112.5-157.5)',
'南(157.5-202.5)', '西南(202.5-247.5)', '西(247.5-292.5)', '西北(292.5-337.5)', '北(337.5-360)']
bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360]
hist, _ = np.histogram(valid_aspect, bins=bins)
total = sum(hist)
for i in range(len(directions)):
if hist[i] > 0:
percentage = (hist[i] / total) * 100
f.write(f"{directions[i]}: {hist[i]} ({percentage:.2f}%)\n")
else:
f.write("无有效数据\n")
print(f"统计信息已保存: {output_path}")
def get_dem_info(self) -> Dict[str, Any]:
"""
获取DEM文件的详细信息
Returns:
包含DEM信息的字典
"""
info = {
'file_path': self.dem_path,
'file_name': os.path.basename(self.dem_path),
'rows': self.rows,
'cols': self.cols,
'cell_size_x': self.cell_size_x,
'cell_size_y': self.cell_size_y,
'no_data_value': self.no_data,
'projection': self.projection
}
# 获取高程统计
valid_data = self.data[self.data != self.no_data]
if len(valid_data) > 0:
info['elevation_min'] = float(np.min(valid_data))
info['elevation_max'] = float(np.max(valid_data))
info['elevation_mean'] = float(np.mean(valid_data))
info['elevation_std'] = float(np.std(valid_data))
info['valid_cells'] = int(len(valid_data))
info['total_cells'] = self.rows * self.cols
info['valid_percentage'] = (len(valid_data) / (self.rows * self.cols)) * 100
return info
def get_slope_statistics(self, slope_data: np.ndarray) -> Dict[str, Any]:
"""
获取坡度统计信息
Args:
slope_data: 坡度数据数组
Returns:
坡度统计信息字典
"""
valid_slope = slope_data[slope_data != self.no_data]
stats = {
'min': float(np.min(valid_slope)) if len(valid_slope) > 0 else None,
'max': float(np.max(valid_slope)) if len(valid_slope) > 0 else None,
'mean': float(np.mean(valid_slope)) if len(valid_slope) > 0 else None,
'std': float(np.std(valid_slope)) if len(valid_slope) > 0 else None,
'median': float(np.median(valid_slope)) if len(valid_slope) > 0 else None,
'valid_cells': int(len(valid_slope)),
'total_cells': self.rows * self.cols
}
return stats
def get_aspect_statistics(self, aspect_data: np.ndarray) -> Dict[str, Any]:
"""
获取坡向统计信息
Args:
aspect_data: 坡向数据数组
Returns:
坡向统计信息字典
"""
valid_aspect = aspect_data[aspect_data != -1.0]
stats = {
'min': float(np.min(valid_aspect)) if len(valid_aspect) > 0 else None,
'max': float(np.max(valid_aspect)) if len(valid_aspect) > 0 else None,
'mean': float(np.mean(valid_aspect)) if len(valid_aspect) > 0 else None,
'std': float(np.std(valid_aspect)) if len(valid_aspect) > 0 else None,
'median': float(np.median(valid_aspect)) if len(valid_aspect) > 0 else None,
'valid_cells': int(len(valid_aspect)),
'total_cells': self.rows * self.cols
}
# 坡向分类统计
if len(valid_aspect) > 0:
bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360]
directions = ['', '东北', '', '东南', '', '西南', '西', '西北', '']
hist, _ = np.histogram(valid_aspect, bins=bins)
direction_stats = {}
for i in range(len(directions)):
direction_stats[directions[i]] = {
'count': int(hist[i]),
'percentage': (hist[i] / len(valid_aspect)) * 100 if len(valid_aspect) > 0 else 0
}
stats['direction_distribution'] = direction_stats
return stats
def close(self):
"""关闭数据集,释放资源"""
if hasattr(self, 'dataset') and self.dataset:
self.dataset = None
if hasattr(self, 'band') and self.band:
self.band = None
print("资源已释放")
def __enter__(self):
"""支持with语句"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出with语句时自动关闭"""
self.close()
# 使用示例和辅助函数
def create_slope_aspect(dem_path: str, output_type: str = 'combined',
output_path: Optional[str] = None,
z_factor: float = 1.0, algorithm: str = 'Horn') -> Dict[str, str]:
"""
创建坡度坡向的便捷函数
Args:
dem_path: DEM文件路径
output_type: 输出类型 'slope', 'aspect', 'separate', 'combined', 'all'
output_path: 输出路径对于单个文件或输出目录
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
包含输出文件路径的字典
"""
print(f"处理DEM文件: {dem_path}")
with SlopeAspectGenerator(dem_path) as generator:
# 获取DEM信息
dem_info = generator.get_dem_info()
print(f"DEM信息: {dem_info['rows']}x{dem_info['cols']}, "
f"高程范围: {dem_info.get('elevation_min', 'N/A'):.2f}-"
f"{dem_info.get('elevation_max', 'N/A'):.2f}")
results = {}
if output_type == 'slope':
# 只生成坡度
slope_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope.tif"
slope = generator.generate_slope_tif(slope_file, z_factor, algorithm)
results['slope'] = slope_file
# 获取统计信息
slope_stats = generator.get_slope_statistics(slope)
print(f"坡度统计: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}")
elif output_type == 'aspect':
# 只生成坡向
aspect_file = output_path or f"{os.path.splitext(dem_path)[0]}_aspect.tif"
aspect = generator.generate_aspect_tif(aspect_file, algorithm)
results['aspect'] = aspect_file
# 获取统计信息
aspect_stats = generator.get_aspect_statistics(aspect)
print(f"坡向统计: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}")
elif output_type == 'separate':
# 分别生成坡度和坡向
if output_path and os.path.isdir(output_path):
# 如果输出路径是目录
slope_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_slope.tif")
aspect_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_aspect.tif")
slope, aspect = generator.generate_slope_aspect_separate(
slope_file, aspect_file, z_factor, algorithm)
else:
# 如果输出路径是文件前缀
base = output_path or os.path.splitext(dem_path)[0]
slope_file = f"{base}_slope.tif"
aspect_file = f"{base}_aspect.tif"
slope, aspect = generator.generate_slope_aspect_separate(
slope_file, aspect_file, z_factor, algorithm)
results['slope'] = slope_file
results['aspect'] = aspect_file
# 获取统计信息
slope_stats = generator.get_slope_statistics(slope)
aspect_stats = generator.get_aspect_statistics(aspect)
print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}")
print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}")
elif output_type == 'combined':
# 生成合并文件
combined_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope_aspect.tif"
slope, aspect = generator.generate_combined_tif(combined_file, z_factor, algorithm)
results['combined'] = combined_file
# 获取统计信息
slope_stats = generator.get_slope_statistics(slope)
aspect_stats = generator.get_aspect_statistics(aspect)
print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}")
print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}")
elif output_type == 'all':
# 生成所有文件
output_dir = output_path if (output_path and os.path.isdir(output_path)) else None
results = generator.generate_all(output_dir, z_factor, algorithm)
else:
raise ValueError(f"不支持的输出类型: {output_type}")
return results
def batch_process_slope_aspect(dem_files: list, output_dir: str,
output_type: str = 'all',
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Dict[str, list]:
"""
批量处理多个DEM文件
Args:
dem_files: DEM文件路径列表
output_dir: 输出目录
output_type: 输出类型
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
包含处理结果的字典
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
results = {
'success': [],
'failed': [],
'outputs': []
}
total_files = len(dem_files)
print(f"开始批量处理 {total_files} 个DEM文件...")
for i, dem_file in enumerate(dem_files, 1):
if not os.path.exists(dem_file):
print(f"[{i}/{total_files}] 文件不存在: {dem_file}")
results['failed'].append(dem_file)
continue
print(f"\n[{i}/{total_files}] 处理: {os.path.basename(dem_file)}")
try:
# 处理文件
file_results = create_slope_aspect(
dem_file,
output_type,
output_dir,
z_factor,
algorithm
)
results['success'].append(dem_file)
results['outputs'].append(file_results)
print(f"[{i}/{total_files}] 完成")
except Exception as e:
print(f"[{i}/{total_files}] 失败: {e}")
results['failed'].append(dem_file)
# 打印总结
print(f"\n" + "="*50)
print(f"批量处理完成!")
print(f"成功: {len(results['success'])}")
print(f"失败: {len(results['failed'])}")
if results['failed']:
print("\n失败的文件:")
for failed_file in results['failed']:
print(f" {os.path.basename(failed_file)}")
return results
# 主示例代码
def main_example():
"""使用示例"""
# 示例1: 基本使用
print("示例1: 基本使用")
print("=" * 50)
# 配置参数
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
dem_file = os.path.join(SCRIPT_DIR, "o_dem_23d110d0.tif")
if os.path.exists(dem_file):
# 方法1: 使用便捷函数
print("\n方法1: 使用便捷函数")
slope_file = os.path.join(SCRIPT_DIR, "o_combined.tif")
results = create_slope_aspect(dem_file, 'combined', slope_file)
# 方法2: 直接使用类
# print("\n方法2: 直接使用类")
# with SlopeAspectGenerator(dem_file) as gen:
# # 获取DEM信息
# dem_info = gen.get_dem_info()
# print(f"DEM信息: {dem_info}")
# # 生成合并文件
# slope, aspect = gen.generate_combined_tif("o_combined.tif")
# # 获取统计信息
# slope_stats = gen.get_slope_statistics(slope)
# aspect_stats = gen.get_aspect_statistics(aspect)
# print(f"坡度统计: {slope_stats}")
# print(f"坡向统计: {aspect_stats}")
else:
print(f"DEM文件不存在: {dem_file}")
print("请创建一个示例DEM文件或使用您自己的DEM文件")
# 创建一个简单的示例DEM
print("\n创建一个示例DEM文件用于测试...")
create_sample_dem("sample_dem.tif")
# 使用示例DEM
print("\n使用示例DEM进行测试...")
results = create_slope_aspect("sample_dem.tif", 'all')
# 示例2: 批量处理
# print("\n\n示例2: 批量处理")
# print("=" * 50)
# # 假设有多个DEM文件
# dem_files = ["dem1.tif", "dem2.tif", "dem3.tif"] # 替换为实际文件
# # 过滤出存在的文件
# existing_files = [f for f in dem_files if os.path.exists(f)]
# if len(existing_files) > 0:
# batch_results = batch_process_slope_aspect(
# existing_files,
# "batch_output",
# 'all'
# )
# else:
# print("未找到DEM文件进行批量处理")
# print("创建示例文件进行批量处理演示...")
# # 创建几个示例DEM文件
# for i in range(1, 4):
# create_sample_dem(f"sample_dem_{i}.tif", offset=i*100)
# sample_files = [f"sample_dem_{i}.tif" for i in range(1, 4)]
# batch_results = batch_process_slope_aspect(
# sample_files,
# "batch_output",
# 'all'
# )
def create_sample_dem(output_path: str, rows: int = 100, cols: int = 100,
cell_size: float = 30.0, offset: float = 0.0):
"""
创建一个示例DEM文件用于测试
Args:
output_path: 输出文件路径
rows: 行数
cols: 列数
cell_size: 像元大小
offset: 高程偏移量
"""
from osgeo import osr
# 创建一个简单的地形:一个山峰
x = np.linspace(-5, 5, cols)
y = np.linspace(-5, 5, rows)
X, Y = np.meshgrid(x, y)
# 创建高程数据(高斯山峰)
elevation = 1000 * np.exp(-(X**2 + Y**2) / 10) + offset
# 添加一些噪声
elevation += np.random.normal(0, 10, (rows, cols))
# 创建栅格文件
driver = gdal.GetDriverByName('GTiff')
dataset = driver.Create(
output_path,
cols,
rows,
1,
gdal.GDT_Float32
)
# 设置地理变换(左下角坐标为(0,0)
dataset.SetGeoTransform((0, cell_size, 0, 0, 0, -cell_size))
# 设置投影WGS84
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
dataset.SetProjection(srs.ExportToWkt())
# 写入数据
band = dataset.GetRasterBand(1)
band.WriteArray(elevation)
band.SetNoDataValue(-9999.0)
band.SetDescription('Elevation')
dataset.FlushCache()
dataset = None
print(f"示例DEM已创建: {output_path}")
if __name__ == "__main__":
# 运行示例
main_example()