ai_project_v1/b3dm/slope_aspect_tif.py
2026-01-29 11:51:20 +08:00

1062 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
from osgeo import gdal
import math
import os
from typing import Optional, Tuple, Dict, Any
class SlopeAspectGenerator:
"""
坡度和坡向生成器
支持生成单独的坡度、坡向文件,或包含两者的合并文件
"""
def __init__(self, dem_path: str):
"""
初始化坡度和坡向生成器
Args:
dem_path: 输入DEM文件路径
"""
self.dem_path = dem_path
self.dataset = gdal.Open(dem_path, gdal.GA_ReadOnly)
if not self.dataset:
raise FileNotFoundError(f"无法打开DEM文件: {dem_path}")
# 获取地理信息
self.geotransform = self.dataset.GetGeoTransform()
self.projection = self.dataset.GetProjection()
self.band = self.dataset.GetRasterBand(1)
self.data = self.band.ReadAsArray()
# 获取栅格信息
self.cols = self.dataset.RasterXSize
self.rows = self.dataset.RasterYSize
self.no_data = self.band.GetNoDataValue()
# 如果DEM没有无效值设置使用-9999
if self.no_data is None:
self.no_data = -9999.0
print(f"警告: DEM未设置无效值将使用 {self.no_data} 作为无效值")
# 像元大小(考虑非正方形像元)
self.cell_size_x = abs(self.geotransform[1])
self.cell_size_y = abs(self.geotransform[5])
print(f"DEM信息: {self.rows}行 x {self.cols}")
print(f"X方向像元大小: {self.cell_size_x:.4f}")
print(f"Y方向像元大小: {self.cell_size_y:.4f}")
print(f"无效值: {self.no_data}")
def calculate_slope(self, z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray:
"""
计算坡度
Args:
z_factor: 垂直单位转换因子
algorithm: 算法选项 'Horn''ZevenbergenThorne'
Returns:
坡度数组(单位:度)
"""
print(f"正在计算坡度 (算法: {algorithm})...")
# 创建输出数组
slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32)
# 创建填充后的数据(用于边界处理)
padded_data = np.pad(
self.data,
pad_width=1,
mode='constant',
constant_values=self.no_data
)
# 创建有效掩码
valid_mask = (self.data != self.no_data)
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
for i in range(self.rows):
for j in range(self.cols):
if not valid_mask[i, j]:
continue
i_pad, j_pad = i + 1, j + 1
# 获取3x3窗口
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
# 如果窗口内有无效值,跳过
if not np.all(window_valid):
continue
if algorithm.upper() == 'ZEVERBERGENTHORNE':
# Zevenbergen & Thorne 算法
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
else:
# Horn 算法(默认)
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
# 计算坡度(弧度转角度)
slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2))
slope_deg = math.degrees(slope_rad)
slope[i, j] = slope_deg
# 边界处理:将边缘设为无效值
slope[0, :] = self.no_data
slope[-1, :] = self.no_data
slope[:, 0] = self.no_data
slope[:, -1] = self.no_data
valid_slope = slope[slope != self.no_data]
if len(valid_slope) > 0:
print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f}")
return slope
def calculate_aspect(self, algorithm: str = 'Horn') -> np.ndarray:
"""
计算坡向
Args:
algorithm: 算法选项 'Horn''ZevenbergenThorne'
Returns:
坡向数组单位0-360北为0度顺时针增加
"""
print(f"正在计算坡向 (算法: {algorithm})...")
# 创建输出数组
aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32) # 平地用-1表示
# 创建填充后的数据
padded_data = np.pad(
self.data,
pad_width=1,
mode='constant',
constant_values=self.no_data
)
# 创建有效掩码
valid_mask = (self.data != self.no_data)
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
for i in range(self.rows):
for j in range(self.cols):
if not valid_mask[i, j]:
continue
i_pad, j_pad = i + 1, j + 1
# 获取3x3窗口
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
# 如果窗口内有无效值,跳过
if not np.all(window_valid):
continue
if algorithm.upper() == 'ZEVERBERGENTHORNE':
# Zevenbergen & Thorne 算法
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
else:
# Horn 算法(默认)
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
# 计算坡向
if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10:
# 平地
aspect[i, j] = -1.0
else:
aspect_rad = math.atan2(dz_dy, -dz_dx) # atan2(y, x)
aspect_deg = math.degrees(aspect_rad)
# 转换为0-360度北为0度顺时针
if aspect_deg < 0:
aspect_deg += 360.0
aspect[i, j] = aspect_deg
# 边界处理:将边缘设为-1
aspect[0, :] = -1.0
aspect[-1, :] = -1.0
aspect[:, 0] = -1.0
aspect[:, -1] = -1.0
valid_aspect = aspect[aspect != -1.0]
if len(valid_aspect) > 0:
print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f}")
return aspect
def calculate_slope_aspect(self, z_factor: float = 1.0,
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
"""
同时计算坡度和坡向(优化版本,减少重复计算)
Args:
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
(slope_array, aspect_array)
"""
print(f"同时计算坡度和坡向 (算法: {algorithm})...")
# 创建输出数组
slope = np.full((self.rows, self.cols), self.no_data, dtype=np.float32)
aspect = np.full((self.rows, self.cols), -1.0, dtype=np.float32)
# 创建填充后的数据
padded_data = np.pad(
self.data,
pad_width=1,
mode='constant',
constant_values=self.no_data
)
# 创建有效掩码
valid_mask = (self.data != self.no_data)
padded_valid = np.pad(valid_mask, pad_width=1, mode='constant', constant_values=False)
for i in range(self.rows):
for j in range(self.cols):
if not valid_mask[i, j]:
continue
i_pad, j_pad = i + 1, j + 1
# 获取3x3窗口
window = padded_data[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
window_valid = padded_valid[i_pad-1:i_pad+2, j_pad-1:j_pad+2]
# 如果窗口内有无效值,跳过
if not np.all(window_valid):
continue
# 计算导数(一次计算,两个都用)
if algorithm.upper() == 'ZEVERBERGENTHORNE':
dz_dx = (window[1, 2] - window[1, 0]) / (2 * self.cell_size_x)
dz_dy = (window[0, 1] - window[2, 1]) / (2 * self.cell_size_y)
else:
dz_dx = ((window[0, 2] + 2 * window[1, 2] + window[2, 2]) -
(window[0, 0] + 2 * window[1, 0] + window[2, 0])) / (8 * self.cell_size_x)
dz_dy = ((window[2, 0] + 2 * window[2, 1] + window[2, 2]) -
(window[0, 0] + 2 * window[0, 1] + window[0, 2])) / (8 * self.cell_size_y)
# 计算坡度
slope_rad = math.atan(z_factor * math.sqrt(dz_dx**2 + dz_dy**2))
slope_deg = math.degrees(slope_rad)
slope[i, j] = slope_deg
# 计算坡向
if abs(dz_dx) < 1e-10 and abs(dz_dy) < 1e-10:
aspect[i, j] = -1.0
else:
aspect_rad = math.atan2(dz_dy, -dz_dx)
aspect_deg = math.degrees(aspect_rad)
if aspect_deg < 0:
aspect_deg += 360.0
aspect[i, j] = aspect_deg
# 边界处理
slope[0, :] = self.no_data
slope[-1, :] = self.no_data
slope[:, 0] = self.no_data
slope[:, -1] = self.no_data
aspect[0, :] = -1.0
aspect[-1, :] = -1.0
aspect[:, 0] = -1.0
aspect[:, -1] = -1.0
# 显示统计信息
valid_slope = slope[slope != self.no_data]
valid_aspect = aspect[aspect != -1.0]
if len(valid_slope) > 0:
print(f"坡度范围: {np.min(valid_slope):.2f} - {np.max(valid_slope):.2f}")
if len(valid_aspect) > 0:
print(f"坡向范围: {np.min(valid_aspect):.2f} - {np.max(valid_aspect):.2f}")
return slope, aspect
def save_raster(self, data: np.ndarray, output_path: str,
band_name: str = '', data_type: int = gdal.GDT_Float32,
no_data_value: Optional[float] = None) -> None:
"""
保存栅格数据为TIFF文件
Args:
data: 栅格数据数组
output_path: 输出文件路径
band_name: 波段名称
data_type: GDAL数据类型
no_data_value: 无效值
"""
driver = gdal.GetDriverByName('GTiff')
out_dataset = driver.Create(
output_path,
self.cols,
self.rows,
1, # 单波段
data_type,
options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES']
)
out_dataset.SetGeoTransform(self.geotransform)
out_dataset.SetProjection(self.projection)
out_band = out_dataset.GetRasterBand(1)
out_band.WriteArray(data)
if no_data_value is not None:
out_band.SetNoDataValue(no_data_value)
elif band_name == 'Slope':
out_band.SetNoDataValue(self.no_data)
elif band_name == 'Aspect':
out_band.SetNoDataValue(-1.0)
if band_name:
out_band.SetDescription(band_name)
# 设置统计信息加速QGIS等软件加载
out_band.ComputeStatistics(False)
out_dataset.FlushCache()
out_dataset = None
print(f"文件已保存: {output_path}")
if band_name:
print(f" 波段: {band_name}")
def save_combined_raster(self, slope_data: np.ndarray, aspect_data: np.ndarray,
output_path: str) -> None:
"""
保存包含坡度和坡向两个波段的TIFF文件
Args:
slope_data: 坡度数据
aspect_data: 坡向数据
output_path: 输出文件路径
"""
driver = gdal.GetDriverByName('GTiff')
# 创建包含2个波段的数据集
out_dataset = driver.Create(
output_path,
self.cols,
self.rows,
2, # 2个波段
gdal.GDT_Float32,
options=['COMPRESS=LZW', 'PREDICTOR=2', 'TILED=YES']
)
# 设置地理信息
out_dataset.SetGeoTransform(self.geotransform)
out_dataset.SetProjection(self.projection)
# 写入坡度波段波段1
slope_band = out_dataset.GetRasterBand(1)
slope_band.WriteArray(slope_data)
slope_band.SetNoDataValue(self.no_data)
slope_band.SetDescription('Slope')
slope_band.SetMetadataItem('UNITS', 'degrees')
slope_band.SetMetadataItem('RANGE', '0-90')
slope_band.ComputeStatistics(False)
# 写入坡向波段波段2
aspect_band = out_dataset.GetRasterBand(2)
aspect_band.WriteArray(aspect_data)
aspect_band.SetNoDataValue(-1.0)
aspect_band.SetDescription('Aspect')
aspect_band.SetMetadataItem('UNITS', 'degrees')
aspect_band.SetMetadataItem('RANGE', '0-360')
aspect_band.SetMetadataItem('DIRECTION', 'Clockwise from North')
aspect_band.ComputeStatistics(False)
# 设置数据集元数据
out_dataset.SetMetadata({
'SOURCE_DEM': os.path.basename(self.dem_path),
'PROCESSING_METHOD': 'SlopeAspectGenerator',
'CELL_SIZE_X': str(self.cell_size_x),
'CELL_SIZE_Y': str(self.cell_size_y)
})
out_dataset.FlushCache()
out_dataset = None
print(f"合并文件已保存: {output_path}")
print(f" 波段1: Slope (坡度, 单位:度)")
print(f" 波段2: Aspect (坡向, 单位:度)")
def generate_slope_tif(self, output_path: Optional[str] = None,
z_factor: float = 1.0, algorithm: str = 'Horn') -> np.ndarray:
"""
生成坡度TIFF文件
Args:
output_path: 输出文件路径(可选)
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
坡度数据数组
"""
if output_path is None:
base_name = os.path.splitext(self.dem_path)[0]
output_path = f"{base_name}_slope.tif"
print(f"\n生成坡度文件...")
slope = self.calculate_slope(z_factor, algorithm)
self.save_raster(slope, output_path, 'Slope', no_data_value=self.no_data)
return slope
def generate_aspect_tif(self, output_path: Optional[str] = None,
algorithm: str = 'Horn') -> np.ndarray:
"""
生成坡向TIFF文件
Args:
output_path: 输出文件路径(可选)
algorithm: 算法选项
Returns:
坡向数据数组
"""
if output_path is None:
base_name = os.path.splitext(self.dem_path)[0]
output_path = f"{base_name}_aspect.tif"
print(f"\n生成坡向文件...")
aspect = self.calculate_aspect(algorithm)
self.save_raster(aspect, output_path, 'Aspect', no_data_value=-1.0)
return aspect
def generate_slope_aspect_separate(self,
slope_output: Optional[str] = None,
aspect_output: Optional[str] = None,
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
"""
分别生成坡度和坡向TIFF文件
Args:
slope_output: 坡度输出路径(可选)
aspect_output: 坡向输出路径(可选)
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
(slope_data, aspect_data)
"""
# 设置输出路径
base_name = os.path.splitext(self.dem_path)[0]
if slope_output is None:
slope_output = f"{base_name}_slope.tif"
if aspect_output is None:
aspect_output = f"{base_name}_aspect.tif"
print(f"\n分别生成坡度和坡向文件...")
# 同时计算坡度和坡向(优化版本)
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
# 保存文件
self.save_raster(slope, slope_output, 'Slope', no_data_value=self.no_data)
self.save_raster(aspect, aspect_output, 'Aspect', no_data_value=-1.0)
return slope, aspect
def generate_combined_tif(self, output_path: Optional[str] = None,
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Tuple[np.ndarray, np.ndarray]:
"""
生成包含坡度和坡向的合并TIFF文件
Args:
output_path: 输出文件路径(可选)
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
(slope_data, aspect_data)
"""
if output_path is None:
base_name = os.path.splitext(self.dem_path)[0]
output_path = f"{base_name}_slope_aspect.tif"
print(f"\n生成合并文件...")
# 同时计算坡度和坡向
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
# 保存合并文件
self.save_combined_raster(slope, aspect, output_path)
return slope, aspect
def generate_all(self, output_dir: Optional[str] = None,
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Dict[str, str]:
"""
生成所有类型的文件:单独的坡度、单独的坡向、合并文件、统计信息
Args:
output_dir: 输出目录(可选)
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
包含所有输出文件路径的字典
"""
# 设置输出路径
base_name = os.path.basename(self.dem_path)
file_base = os.path.splitext(base_name)[0]
if output_dir:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_prefix = os.path.join(output_dir, file_base)
else:
output_prefix = file_base
# 计算坡度和坡向(只计算一次)
print(f"\n计算坡度和坡向...")
slope, aspect = self.calculate_slope_aspect(z_factor, algorithm)
# 生成各种文件
results = {}
# 1. 单独的坡度文件
slope_file = f"{output_prefix}_slope.tif"
self.save_raster(slope, slope_file, 'Slope', no_data_value=self.no_data)
results['slope'] = slope_file
# 2. 单独的坡向文件
aspect_file = f"{output_prefix}_aspect.tif"
self.save_raster(aspect, aspect_file, 'Aspect', no_data_value=-1.0)
results['aspect'] = aspect_file
# 3. 合并文件
combined_file = f"{output_prefix}_slope_aspect.tif"
self.save_combined_raster(slope, aspect, combined_file)
results['combined'] = combined_file
# 4. 统计信息文件(文本)
stats_file = f"{output_prefix}_statistics.txt"
self.save_statistics(slope, aspect, stats_file)
results['statistics'] = stats_file
print(f"\n所有文件生成完成!")
for key, path in results.items():
print(f" {key}: {os.path.basename(path)}")
return results
def save_statistics(self, slope_data: np.ndarray, aspect_data: np.ndarray,
output_path: str) -> None:
"""
保存统计信息到文本文件
Args:
slope_data: 坡度数据
aspect_data: 坡向数据
output_path: 输出文件路径
"""
valid_slope = slope_data[slope_data != self.no_data]
valid_aspect = aspect_data[aspect_data != -1.0]
with open(output_path, 'w', encoding='utf-8') as f:
f.write("坡度和坡向统计信息\n")
f.write("=" * 50 + "\n")
f.write(f"输入DEM: {os.path.basename(self.dem_path)}\n")
f.write(f"栅格大小: {self.rows} x {self.cols}\n")
f.write(f"处理时间: {np.datetime64('now')}\n\n")
f.write("坡度统计:\n")
f.write("-" * 30 + "\n")
if len(valid_slope) > 0:
f.write(f"最小值: {np.min(valid_slope):.4f}\n")
f.write(f"最大值: {np.max(valid_slope):.4f}\n")
f.write(f"平均值: {np.mean(valid_slope):.4f}\n")
f.write(f"标准差: {np.std(valid_slope):.4f}\n")
f.write(f"中位数: {np.median(valid_slope):.4f}\n")
f.write(f"有效像元数: {len(valid_slope)} / {self.rows * self.cols}\n")
# 坡度分级统计
f.write("\n坡度分级统计:\n")
slope_classes = ['平地(0-2)', '缓坡(2-5)', '斜坡(5-15)', '陡坡(15-30)', '急陡坡(30-45)', '峭壁(45-90)']
bins = [0, 2, 5, 15, 30, 45, 90]
hist, _ = np.histogram(valid_slope, bins=bins)
total = sum(hist)
for i in range(len(slope_classes)):
if hist[i] > 0:
percentage = (hist[i] / total) * 100
f.write(f"{slope_classes[i]}: {hist[i]} ({percentage:.2f}%)\n")
else:
f.write("无有效数据\n")
f.write("\n坡向统计:\n")
f.write("-" * 30 + "\n")
if len(valid_aspect) > 0:
f.write(f"最小值: {np.min(valid_aspect):.4f}\n")
f.write(f"最大值: {np.max(valid_aspect):.4f}\n")
f.write(f"平均值: {np.mean(valid_aspect):.4f}\n")
f.write(f"标准差: {np.std(valid_aspect):.4f}\n")
f.write(f"中位数: {np.median(valid_aspect):.4f}\n")
f.write(f"有效像元数: {len(valid_aspect)} / {self.rows * self.cols}\n")
# 坡向分类统计
f.write("\n坡向分类统计:\n")
directions = ['北(0-22.5)', '东北(22.5-67.5)', '东(67.5-112.5)', '东南(112.5-157.5)',
'南(157.5-202.5)', '西南(202.5-247.5)', '西(247.5-292.5)', '西北(292.5-337.5)', '北(337.5-360)']
bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360]
hist, _ = np.histogram(valid_aspect, bins=bins)
total = sum(hist)
for i in range(len(directions)):
if hist[i] > 0:
percentage = (hist[i] / total) * 100
f.write(f"{directions[i]}: {hist[i]} ({percentage:.2f}%)\n")
else:
f.write("无有效数据\n")
print(f"统计信息已保存: {output_path}")
def get_dem_info(self) -> Dict[str, Any]:
"""
获取DEM文件的详细信息
Returns:
包含DEM信息的字典
"""
info = {
'file_path': self.dem_path,
'file_name': os.path.basename(self.dem_path),
'rows': self.rows,
'cols': self.cols,
'cell_size_x': self.cell_size_x,
'cell_size_y': self.cell_size_y,
'no_data_value': self.no_data,
'projection': self.projection
}
# 获取高程统计
valid_data = self.data[self.data != self.no_data]
if len(valid_data) > 0:
info['elevation_min'] = float(np.min(valid_data))
info['elevation_max'] = float(np.max(valid_data))
info['elevation_mean'] = float(np.mean(valid_data))
info['elevation_std'] = float(np.std(valid_data))
info['valid_cells'] = int(len(valid_data))
info['total_cells'] = self.rows * self.cols
info['valid_percentage'] = (len(valid_data) / (self.rows * self.cols)) * 100
return info
def get_slope_statistics(self, slope_data: np.ndarray) -> Dict[str, Any]:
"""
获取坡度统计信息
Args:
slope_data: 坡度数据数组
Returns:
坡度统计信息字典
"""
valid_slope = slope_data[slope_data != self.no_data]
stats = {
'min': float(np.min(valid_slope)) if len(valid_slope) > 0 else None,
'max': float(np.max(valid_slope)) if len(valid_slope) > 0 else None,
'mean': float(np.mean(valid_slope)) if len(valid_slope) > 0 else None,
'std': float(np.std(valid_slope)) if len(valid_slope) > 0 else None,
'median': float(np.median(valid_slope)) if len(valid_slope) > 0 else None,
'valid_cells': int(len(valid_slope)),
'total_cells': self.rows * self.cols
}
return stats
def get_aspect_statistics(self, aspect_data: np.ndarray) -> Dict[str, Any]:
"""
获取坡向统计信息
Args:
aspect_data: 坡向数据数组
Returns:
坡向统计信息字典
"""
valid_aspect = aspect_data[aspect_data != -1.0]
stats = {
'min': float(np.min(valid_aspect)) if len(valid_aspect) > 0 else None,
'max': float(np.max(valid_aspect)) if len(valid_aspect) > 0 else None,
'mean': float(np.mean(valid_aspect)) if len(valid_aspect) > 0 else None,
'std': float(np.std(valid_aspect)) if len(valid_aspect) > 0 else None,
'median': float(np.median(valid_aspect)) if len(valid_aspect) > 0 else None,
'valid_cells': int(len(valid_aspect)),
'total_cells': self.rows * self.cols
}
# 坡向分类统计
if len(valid_aspect) > 0:
bins = [0, 22.5, 67.5, 112.5, 157.5, 202.5, 247.5, 292.5, 337.5, 360]
directions = ['', '东北', '', '东南', '', '西南', '西', '西北', '']
hist, _ = np.histogram(valid_aspect, bins=bins)
direction_stats = {}
for i in range(len(directions)):
direction_stats[directions[i]] = {
'count': int(hist[i]),
'percentage': (hist[i] / len(valid_aspect)) * 100 if len(valid_aspect) > 0 else 0
}
stats['direction_distribution'] = direction_stats
return stats
def close(self):
"""关闭数据集,释放资源"""
if hasattr(self, 'dataset') and self.dataset:
self.dataset = None
if hasattr(self, 'band') and self.band:
self.band = None
print("资源已释放")
def __enter__(self):
"""支持with语句"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出with语句时自动关闭"""
self.close()
# 使用示例和辅助函数
def create_slope_aspect(dem_path: str, output_type: str = 'combined',
output_path: Optional[str] = None,
z_factor: float = 1.0, algorithm: str = 'Horn') -> Dict[str, str]:
"""
创建坡度坡向的便捷函数
Args:
dem_path: DEM文件路径
output_type: 输出类型 'slope', 'aspect', 'separate', 'combined', 'all'
output_path: 输出路径(对于单个文件)或输出目录
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
包含输出文件路径的字典
"""
print(f"处理DEM文件: {dem_path}")
with SlopeAspectGenerator(dem_path) as generator:
# 获取DEM信息
dem_info = generator.get_dem_info()
print(f"DEM信息: {dem_info['rows']}x{dem_info['cols']}, "
f"高程范围: {dem_info.get('elevation_min', 'N/A'):.2f}-"
f"{dem_info.get('elevation_max', 'N/A'):.2f}")
results = {}
if output_type == 'slope':
# 只生成坡度
slope_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope.tif"
slope = generator.generate_slope_tif(slope_file, z_factor, algorithm)
results['slope'] = slope_file
# 获取统计信息
slope_stats = generator.get_slope_statistics(slope)
print(f"坡度统计: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}")
elif output_type == 'aspect':
# 只生成坡向
aspect_file = output_path or f"{os.path.splitext(dem_path)[0]}_aspect.tif"
aspect = generator.generate_aspect_tif(aspect_file, algorithm)
results['aspect'] = aspect_file
# 获取统计信息
aspect_stats = generator.get_aspect_statistics(aspect)
print(f"坡向统计: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}")
elif output_type == 'separate':
# 分别生成坡度和坡向
if output_path and os.path.isdir(output_path):
# 如果输出路径是目录
slope_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_slope.tif")
aspect_file = os.path.join(output_path, f"{os.path.splitext(os.path.basename(dem_path))[0]}_aspect.tif")
slope, aspect = generator.generate_slope_aspect_separate(
slope_file, aspect_file, z_factor, algorithm)
else:
# 如果输出路径是文件前缀
base = output_path or os.path.splitext(dem_path)[0]
slope_file = f"{base}_slope.tif"
aspect_file = f"{base}_aspect.tif"
slope, aspect = generator.generate_slope_aspect_separate(
slope_file, aspect_file, z_factor, algorithm)
results['slope'] = slope_file
results['aspect'] = aspect_file
# 获取统计信息
slope_stats = generator.get_slope_statistics(slope)
aspect_stats = generator.get_aspect_statistics(aspect)
print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}")
print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}")
elif output_type == 'combined':
# 生成合并文件
combined_file = output_path or f"{os.path.splitext(dem_path)[0]}_slope_aspect.tif"
slope, aspect = generator.generate_combined_tif(combined_file, z_factor, algorithm)
results['combined'] = combined_file
# 获取统计信息
slope_stats = generator.get_slope_statistics(slope)
aspect_stats = generator.get_aspect_statistics(aspect)
print(f"坡度范围: {slope_stats['min']:.2f}度 - {slope_stats['max']:.2f}")
print(f"坡向范围: {aspect_stats['min']:.2f}度 - {aspect_stats['max']:.2f}")
elif output_type == 'all':
# 生成所有文件
output_dir = output_path if (output_path and os.path.isdir(output_path)) else None
results = generator.generate_all(output_dir, z_factor, algorithm)
else:
raise ValueError(f"不支持的输出类型: {output_type}")
return results
def batch_process_slope_aspect(dem_files: list, output_dir: str,
output_type: str = 'all',
z_factor: float = 1.0,
algorithm: str = 'Horn') -> Dict[str, list]:
"""
批量处理多个DEM文件
Args:
dem_files: DEM文件路径列表
output_dir: 输出目录
output_type: 输出类型
z_factor: 垂直单位转换因子
algorithm: 算法选项
Returns:
包含处理结果的字典
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
results = {
'success': [],
'failed': [],
'outputs': []
}
total_files = len(dem_files)
print(f"开始批量处理 {total_files} 个DEM文件...")
for i, dem_file in enumerate(dem_files, 1):
if not os.path.exists(dem_file):
print(f"[{i}/{total_files}] 文件不存在: {dem_file}")
results['failed'].append(dem_file)
continue
print(f"\n[{i}/{total_files}] 处理: {os.path.basename(dem_file)}")
try:
# 处理文件
file_results = create_slope_aspect(
dem_file,
output_type,
output_dir,
z_factor,
algorithm
)
results['success'].append(dem_file)
results['outputs'].append(file_results)
print(f"[{i}/{total_files}] 完成")
except Exception as e:
print(f"[{i}/{total_files}] 失败: {e}")
results['failed'].append(dem_file)
# 打印总结
print(f"\n" + "="*50)
print(f"批量处理完成!")
print(f"成功: {len(results['success'])}")
print(f"失败: {len(results['failed'])}")
if results['failed']:
print("\n失败的文件:")
for failed_file in results['failed']:
print(f" {os.path.basename(failed_file)}")
return results
# 主示例代码
def main_example():
"""使用示例"""
# 示例1: 基本使用
print("示例1: 基本使用")
print("=" * 50)
# 配置参数
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
dem_file = os.path.join(SCRIPT_DIR, "o_dem_23d110d0.tif")
if os.path.exists(dem_file):
# 方法1: 使用便捷函数
print("\n方法1: 使用便捷函数")
slope_file = os.path.join(SCRIPT_DIR, "o_combined.tif")
results = create_slope_aspect(dem_file, 'combined', slope_file)
# 方法2: 直接使用类
# print("\n方法2: 直接使用类")
# with SlopeAspectGenerator(dem_file) as gen:
# # 获取DEM信息
# dem_info = gen.get_dem_info()
# print(f"DEM信息: {dem_info}")
# # 生成合并文件
# slope, aspect = gen.generate_combined_tif("o_combined.tif")
# # 获取统计信息
# slope_stats = gen.get_slope_statistics(slope)
# aspect_stats = gen.get_aspect_statistics(aspect)
# print(f"坡度统计: {slope_stats}")
# print(f"坡向统计: {aspect_stats}")
else:
print(f"DEM文件不存在: {dem_file}")
print("请创建一个示例DEM文件或使用您自己的DEM文件")
# 创建一个简单的示例DEM
print("\n创建一个示例DEM文件用于测试...")
create_sample_dem("sample_dem.tif")
# 使用示例DEM
print("\n使用示例DEM进行测试...")
results = create_slope_aspect("sample_dem.tif", 'all')
# 示例2: 批量处理
# print("\n\n示例2: 批量处理")
# print("=" * 50)
# # 假设有多个DEM文件
# dem_files = ["dem1.tif", "dem2.tif", "dem3.tif"] # 替换为实际文件
# # 过滤出存在的文件
# existing_files = [f for f in dem_files if os.path.exists(f)]
# if len(existing_files) > 0:
# batch_results = batch_process_slope_aspect(
# existing_files,
# "batch_output",
# 'all'
# )
# else:
# print("未找到DEM文件进行批量处理")
# print("创建示例文件进行批量处理演示...")
# # 创建几个示例DEM文件
# for i in range(1, 4):
# create_sample_dem(f"sample_dem_{i}.tif", offset=i*100)
# sample_files = [f"sample_dem_{i}.tif" for i in range(1, 4)]
# batch_results = batch_process_slope_aspect(
# sample_files,
# "batch_output",
# 'all'
# )
def create_sample_dem(output_path: str, rows: int = 100, cols: int = 100,
cell_size: float = 30.0, offset: float = 0.0):
"""
创建一个示例DEM文件用于测试
Args:
output_path: 输出文件路径
rows: 行数
cols: 列数
cell_size: 像元大小
offset: 高程偏移量
"""
from osgeo import osr
# 创建一个简单的地形:一个山峰
x = np.linspace(-5, 5, cols)
y = np.linspace(-5, 5, rows)
X, Y = np.meshgrid(x, y)
# 创建高程数据(高斯山峰)
elevation = 1000 * np.exp(-(X**2 + Y**2) / 10) + offset
# 添加一些噪声
elevation += np.random.normal(0, 10, (rows, cols))
# 创建栅格文件
driver = gdal.GetDriverByName('GTiff')
dataset = driver.Create(
output_path,
cols,
rows,
1,
gdal.GDT_Float32
)
# 设置地理变换(左下角坐标为(0,0)
dataset.SetGeoTransform((0, cell_size, 0, 0, 0, -cell_size))
# 设置投影WGS84
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
dataset.SetProjection(srs.ExportToWkt())
# 写入数据
band = dataset.GetRasterBand(1)
band.WriteArray(elevation)
band.SetNoDataValue(-9999.0)
band.SetDescription('Elevation')
dataset.FlushCache()
dataset = None
print(f"示例DEM已创建: {output_path}")
if __name__ == "__main__":
# 运行示例
main_example()