ai_project_v1/b3dm/earthwork_calculator_point_cloud.py

691 lines
24 KiB
Python
Raw Normal View History

2026-01-14 11:37:35 +08:00
# earthwork_calculator.py
import pdal
import pyvista as pv
import numpy as np
import json
from typing import List, Tuple, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import traceback
class EarthworkAlgorithm(Enum):
"""土方量计算算法枚举"""
GRID = "grid"
TIN = "tin"
PRISM = "prism"
@dataclass
class EarthworkResultPointCloud:
"""土方量计算结果"""
cut_volume: float # 挖方量 (m³)
fill_volume: float # 填方量 (m³)
net_volume: float # 净方量 (m³)
area: float # 计算区域面积 (m²)
avg_elevation: float # 平均高程
min_elevation: float # 最低高程
max_elevation: float # 最高高程
points_count: int # 使用的点数
triangle_count: int = 0 # 三角形数量
grid_count: int = 0 # 网格数量仅GRID算法使用
prism_count: int = 0 # 棱柱体数量仅PRISM算法使用
bounding_box: Dict[str, List[float]] = field(default_factory=dict) # 边界框
volume_accuracy: float = 0.95 # 计算精度
algorithm: str = "TIN三角网法" # 使用的算法
resolution: float = 1.0 # 计算分辨率
algorithm_params: Dict[str, Any] = field(default_factory=dict) # 算法参数
def to_dict(self) -> Dict:
"""转换为字典"""
result = {
"volume": {
"cut": round(self.cut_volume, 3),
"fill": round(self.fill_volume, 3),
"net": round(self.net_volume, 3),
"unit": ""
},
"area": {
"value": round(self.area, 3),
"unit": ""
},
"elevation": {
"average": round(self.avg_elevation, 3),
"min": round(self.min_elevation, 3),
"max": round(self.max_elevation, 3),
"unit": "m"
},
"statistics": {
"points_count": self.points_count,
"accuracy": round(self.volume_accuracy, 3),
"algorithm": self.algorithm
},
"bounding_box": self.bounding_box,
"calculation_params": {
"resolution": self.resolution,
"accuracy": self.volume_accuracy,
**self.algorithm_params
}
}
# 根据算法类型添加特定的统计信息
if self.algorithm.startswith("GRID"):
result["statistics"]["grid_count"] = self.grid_count
elif self.algorithm.startswith("TIN"):
result["statistics"]["triangle_count"] = self.triangle_count
elif self.algorithm.startswith("PRISM"):
result["statistics"]["prism_count"] = self.prism_count
return result
class EarthworkCalculatorPointCloud:
"""土方量计算核心类(支持多种算法)"""
def __init__(self, point_cloud_path: str = "./data/your_point_cloud.laz"):
"""
初始化土方量计算器
Args:
point_cloud_path: 点云数据文件路径
"""
self.point_cloud_path = point_cloud_path
def validate_inputs(self, polygon_coords: List[List[float]], design_elev: float) -> Tuple[bool, str]:
"""验证输入参数"""
if not polygon_coords or len(polygon_coords) < 3:
return False, "多边形坐标至少需要3个点"
try:
design_elev = float(design_elev)
except (TypeError, ValueError):
return False, "设计高程必须是有效数字"
return True, ""
def create_polygon_string(self, polygon_coords: List[List[float]]) -> str:
"""创建PDAL多边形字符串"""
coords_list = []
for coord in polygon_coords:
if len(coord) >= 2:
coords_list.append(f"{coord[0]} {coord[1]}")
# 确保多边形闭合
if coords_list and coords_list[0] != coords_list[-1]:
coords_list.append(coords_list[0])
return "POLYGON((" + ", ".join(coords_list) + "))"
def calculate_bounding_box(self, points: np.ndarray) -> Dict[str, List[float]]:
"""
计算边界框
Args:
points: 点云坐标数组
Returns:
Dict: 边界框信息
"""
if len(points) == 0:
return {"min": [0, 0, 0], "max": [0, 0, 0]}
min_vals = np.min(points, axis=0)
max_vals = np.max(points, axis=0)
return {
"min": [float(min_vals[0]), float(min_vals[1]), float(min_vals[2])],
"max": [float(max_vals[0]), float(max_vals[1]), float(max_vals[2])]
}
def clip_point_cloud(self, polygon_coords: List[List[float]], crs: str = "EPSG:4326") -> pv.PolyData:
"""
裁剪点云数据
Args:
polygon_coords: 多边形坐标列表
crs: 坐标系
Returns:
pyvista.PolyData: 裁剪后的点云数据
"""
polygon_str = self.create_polygon_string(polygon_coords)
# PDAL管道配置
pipeline_config = {
"pipeline": [
{
"type": "readers.las",
"filename": self.point_cloud_path,
"spatialreference": crs
},
{
"type": "filters.crop",
"polygon": polygon_str
}
]
}
# 执行PDAL管道
pipeline = pdal.Pipeline(json.dumps(pipeline_config))
try:
pipeline.execute()
if len(pipeline.arrays) == 0:
raise ValueError("多边形区域内没有找到点云数据")
# 获取裁剪后的点云数据
points = pipeline.arrays[0]
x = points["X"]
y = points["Y"]
z = points["Z"]
return pv.PolyData(np.column_stack((x, y, z)))
except RuntimeError as e:
print(f"PDAL执行失败: {str(e)}")
# 如果没有PDAL数据生成模拟数据用于测试
return self.generate_mock_point_cloud(polygon_coords)
def generate_mock_point_cloud(self, polygon_coords: List[List[float]]) -> pv.PolyData:
"""
生成模拟点云数据仅用于测试
Args:
polygon_coords: 多边形坐标列表
Returns:
pyvista.PolyData: 模拟点云数据
"""
print("使用模拟数据进行测试...")
n_points = 1000
# 获取多边形边界
x_coords = [c[0] for c in polygon_coords]
y_coords = [c[1] for c in polygon_coords]
x_min, x_max = min(x_coords), max(x_coords)
y_min, y_max = min(y_coords), max(y_coords)
# 生成随机点
x = np.random.uniform(x_min, x_max, n_points)
y = np.random.uniform(y_min, y_max, n_points)
z = np.random.uniform(100, 120, n_points) # 模拟高程在100-120米之间
return pv.PolyData(np.column_stack((x, y, z)))
def create_tin_mesh(self, point_cloud: pv.PolyData) -> pv.PolyData:
"""
创建三角网TIN算法使用
Args:
point_cloud: 点云数据
Returns:
pyvista.PolyData: 三角网格
"""
if len(point_cloud.points) < 3:
raise ValueError("点云数据不足,无法构网")
try:
return point_cloud.delaunay_2d()
except Exception as e:
raise ValueError(f"三角网构网失败: {str(e)}")
def calculate_volumes_by_tin(self, mesh: pv.PolyData, design_elev: float) -> Dict[str, Any]:
"""
TIN算法计算土方量
Args:
mesh: 三角网格
design_elev: 设计高程
Returns:
Dict: 包含体积计算结果和额外信息
"""
points = mesh.points
elev_diff = points[:, 2] - design_elev
cut_volume = 0.0
fill_volume = 0.0
triangle_count = 0
# 遍历所有三角形面片计算体积
cells = mesh.cells.reshape(-1, 4)
if len(cells) == 0:
raise ValueError("无法生成有效的三角网")
for cell in cells:
if cell[0] == 3: # 三角形VTK格式3个顶点
triangle_count += 1
vertex_indices = cell[1:]
pts = points[vertex_indices]
# 计算三角形面积
v1 = pts[1] - pts[0]
v2 = pts[2] - pts[0]
area = 0.5 * np.linalg.norm(np.cross(v1, v2))
# 计算平均高程差
avg_diff = np.mean(elev_diff[vertex_indices])
vol = area * avg_diff
if vol > 0:
cut_volume += vol
else:
fill_volume += abs(vol)
return {
"cut_volume": cut_volume,
"fill_volume": fill_volume,
"net_volume": cut_volume - fill_volume,
"triangle_count": triangle_count
}
def calculate_volumes_by_grid(self, point_cloud: pv.PolyData, design_elev: float,
grid_size: float = 1.0) -> Dict[str, Any]:
"""
GRID算法计算土方量
Args:
point_cloud: 点云数据
design_elev: 设计高程
grid_size: 网格尺寸
Returns:
Dict: 包含体积计算结果和额外信息
"""
points = point_cloud.points
if len(points) == 0:
raise ValueError("点云数据为空")
# 计算边界
x_min, y_min = np.min(points[:, :2], axis=0)
x_max, y_max = np.max(points[:, :2], axis=0)
# 创建网格
x_edges = np.arange(x_min, x_max + grid_size, grid_size)
y_edges = np.arange(y_min, y_max + grid_size, grid_size)
grid_count = (len(x_edges) - 1) * (len(y_edges) - 1)
cut_volume = 0.0
fill_volume = 0.0
# 对每个网格计算土方量
for i in range(len(x_edges) - 1):
for j in range(len(y_edges) - 1):
# 获取当前网格内的点
mask = (points[:, 0] >= x_edges[i]) & (points[:, 0] < x_edges[i+1]) & \
(points[:, 1] >= y_edges[j]) & (points[:, 1] < y_edges[j+1])
grid_points = points[mask]
if len(grid_points) > 0:
# 计算网格内点的平均高程
avg_elevation = np.mean(grid_points[:, 2])
# 计算高程差
elev_diff = avg_elevation - design_elev
# 计算体积
cell_area = grid_size * grid_size
vol = cell_area * elev_diff
if vol > 0:
cut_volume += vol
else:
fill_volume += abs(vol)
return {
"cut_volume": cut_volume,
"fill_volume": fill_volume,
"net_volume": cut_volume - fill_volume,
"grid_count": grid_count
}
def calculate_volumes_by_prism(self, point_cloud: pv.PolyData, design_elev: float,
influence_radius: float = 0.5) -> Dict[str, Any]:
"""
PRISM算法计算土方量
Args:
point_cloud: 点云数据
design_elev: 设计高程
influence_radius: 影响半径
Returns:
Dict: 包含体积计算结果和额外信息
"""
points = point_cloud.points
if len(points) == 0:
raise ValueError("点云数据为空")
cut_volume = 0.0
fill_volume = 0.0
# 每个点的影响面积
influence_area = np.pi * influence_radius ** 2
prism_count = len(points)
for point in points:
# 计算高程差
elev_diff = point[2] - design_elev
# 计算体积
vol = influence_area * elev_diff
if vol > 0:
cut_volume += vol
else:
fill_volume += abs(vol)
return {
"cut_volume": cut_volume,
"fill_volume": fill_volume,
"net_volume": cut_volume - fill_volume,
"prism_count": prism_count
}
def calculate_statistics(self, point_cloud: pv.PolyData, mesh: pv.PolyData = None) -> Dict[str, float]:
"""
计算统计数据
Args:
point_cloud: 点云数据
mesh: 三角网格仅TIN算法需要
Returns:
Dict: 统计结果
"""
elevations = point_cloud.points[:, 2]
stats = {
"area": 0.0,
"max_elevation": np.max(elevations) if len(elevations) > 0 else 0.0,
"min_elevation": np.min(elevations) if len(elevations) > 0 else 0.0,
"avg_elevation": np.mean(elevations) if len(elevations) > 0 else 0.0,
"points_count": len(point_cloud.points)
}
# 计算面积
if mesh is not None:
stats["area"] = mesh.area
else:
# 对于非TIN算法使用多边形面积近似
if len(point_cloud.points) > 0:
# 使用点云的凸包面积
try:
hull = point_cloud.delaunay_2d()
stats["area"] = hull.area
except:
# 如果无法计算凸包,使用边界框面积
x_min, x_max = np.min(point_cloud.points[:, 0]), np.max(point_cloud.points[:, 0])
y_min, y_max = np.min(point_cloud.points[:, 1]), np.max(point_cloud.points[:, 1])
stats["area"] = (x_max - x_min) * (y_max - y_min)
return stats
def calculate_earthwork(self,
polygon_coords: List[List[float]],
design_elev: float,
algorithm: str = EarthworkAlgorithm.TIN.value,
algorithm_params: Optional[Dict[str, Any]] = None,
crs: str = "EPSG:4326",
volume_accuracy: Optional[float] = None,
resolution: Optional[float] = None) -> EarthworkResultPointCloud:
"""
主计算方法执行完整的土方量计算流程
Args:
polygon_coords: 多边形坐标列表
design_elev: 设计高程
algorithm: 计算算法可选值'grid', 'tin', 'prism'
algorithm_params: 算法特定参数
crs: 坐标系
volume_accuracy: 计算精度0-1之间
resolution: 计算分辨率
Returns:
EarthworkResultPointCloud: 计算结果
"""
try:
# 1. 验证输入
is_valid, message = self.validate_inputs(polygon_coords, design_elev)
if not is_valid:
raise ValueError(message)
design_elev = float(design_elev)
# 2. 验证算法参数
if algorithm not in [a.value for a in EarthworkAlgorithm]:
raise ValueError(f"不支持的算法: {algorithm}。支持的算法: {[a.value for a in EarthworkAlgorithm]}")
# 3. 设置默认参数
if algorithm_params is None:
algorithm_params = {}
# 4. 裁剪点云
point_cloud = self.clip_point_cloud(polygon_coords, crs)
# 5. 根据算法选择计算方法
algorithm_name = ""
mesh = None
if algorithm == EarthworkAlgorithm.TIN.value:
algorithm_name = "TIN三角网法"
mesh = self.create_tin_mesh(point_cloud)
volumes = self.calculate_volumes_by_tin(mesh, design_elev)
algorithm_params = {
"grid_size": algorithm_params.get("grid_size", 1.0)
}
elif algorithm == EarthworkAlgorithm.GRID.value:
algorithm_name = "GRID格网法"
grid_size = algorithm_params.get("grid_size", 1.0)
algorithm_name = f"GRID格网法(网格尺寸={grid_size}m)"
volumes = self.calculate_volumes_by_grid(point_cloud, design_elev, grid_size)
algorithm_params = {
"grid_size": grid_size
}
elif algorithm == EarthworkAlgorithm.PRISM.value:
algorithm_name = "PRISM棱柱体法"
influence_radius = algorithm_params.get("influence_radius", 0.5)
algorithm_name = f"PRISM棱柱体法(影响半径={influence_radius}m)"
volumes = self.calculate_volumes_by_prism(point_cloud, design_elev, influence_radius)
algorithm_params = {
"influence_radius": influence_radius
}
# 6. 计算统计数据
stats = self.calculate_statistics(point_cloud, mesh)
# 7. 计算边界框
bounding_box = self.calculate_bounding_box(point_cloud.points)
# 8. 计算或使用传入的精度和分辨率
if volume_accuracy is None:
# 根据算法和点云密度自动估算精度
volume_accuracy = self.estimate_accuracy(algorithm, point_cloud)
if resolution is None:
# 根据点云密度自动估算分辨率
resolution = self.estimate_resolution(point_cloud)
# 9. 创建EarthworkResultPointCloud对象
result = EarthworkResultPointCloud(
cut_volume=volumes["cut_volume"],
fill_volume=volumes["fill_volume"],
net_volume=volumes["net_volume"],
area=stats["area"],
avg_elevation=stats["avg_elevation"],
min_elevation=stats["min_elevation"],
max_elevation=stats["max_elevation"],
points_count=stats["points_count"],
triangle_count=volumes.get("triangle_count", 0),
grid_count=volumes.get("grid_count", 0),
prism_count=volumes.get("prism_count", 0),
bounding_box=bounding_box,
volume_accuracy=volume_accuracy,
algorithm=algorithm_name,
resolution=resolution,
algorithm_params=algorithm_params
)
return result
except Exception as e:
print(f"计算错误: {str(e)}")
print(traceback.format_exc())
raise
def estimate_accuracy(self, algorithm: str, point_cloud: pv.PolyData) -> float:
"""根据算法和点云密度估计计算精度"""
point_density = len(point_cloud.points) / max(point_cloud.area, 0.1)
# 基础精度
base_accuracy = {
EarthworkAlgorithm.TIN.value: 0.95,
EarthworkAlgorithm.GRID.value: 0.90,
EarthworkAlgorithm.PRISM.value: 0.85
}.get(algorithm, 0.90)
# 根据点云密度调整精度
if point_density > 10: # 高密度点云
accuracy_boost = min(0.05, point_density * 0.002)
elif point_density < 1: # 低密度点云
accuracy_boost = -0.05
else:
accuracy_boost = 0
estimated_accuracy = base_accuracy + accuracy_boost
# 确保精度在合理范围内
return max(0.7, min(0.99, estimated_accuracy))
def estimate_resolution(self, point_cloud: pv.PolyData) -> float:
"""根据点云密度估计分辨率"""
if len(point_cloud.points) < 2:
return 1.0
area = point_cloud.area
if area > 0:
point_count = len(point_cloud.points)
avg_spacing = np.sqrt(area / point_count)
return float(round(avg_spacing, 2))
return 1.0
def get_algorithm_info(self) -> Dict[str, Any]:
"""
获取支持的算法信息
Returns:
Dict: 算法信息
"""
return {
"supported_algorithms": [
{
"id": EarthworkAlgorithm.TIN.value,
"name": "TIN三角网法",
"description": "通过构建不规则三角网计算土方量,精度高,适合复杂地形",
"parameters": [
{
"name": "grid_size",
"type": "float",
"default": 1.0,
"description": "网格尺寸(m),用于点云预处理",
"required": False
}
]
},
{
"id": EarthworkAlgorithm.GRID.value,
"name": "GRID格网法",
"description": "将区域划分为规则网格计算土方量,计算速度快,适合大规模区域",
"parameters": [
{
"name": "grid_size",
"type": "float",
"default": 1.0,
"description": "网格尺寸(m)",
"required": True,
"min": 0.1,
"max": 10.0
}
]
},
{
"id": EarthworkAlgorithm.PRISM.value,
"name": "PRISM棱柱体法",
"description": "将每个点视为一个棱柱体计算土方量,计算简单快速",
"parameters": [
{
"name": "influence_radius",
"type": "float",
"default": 0.5,
"description": "点的影响半径(m)",
"required": True,
"min": 0.1,
"max": 5.0
}
]
}
],
"default_algorithm": EarthworkAlgorithm.TIN.value
}
# 使用示例
if __name__ == "__main__":
# 创建计算器实例
calculator = EarthworkCalculatorPointCloud("./data/sample_point_cloud.laz")
# 获取支持的算法信息
algorithm_info = calculator.get_algorithm_info()
print("支持的算法:")
for algo in algorithm_info["supported_algorithms"]:
print(f" {algo['id']}: {algo['name']} - {algo['description']}")
# 定义多边形区域
polygon = [
[116.3974, 39.9093],
[116.4084, 39.9093],
[116.4084, 39.9193],
[116.3974, 39.9193]
]
# 设计高程
design_elevation = 100.0
# 测试不同算法
algorithms = [
(EarthworkAlgorithm.TIN.value, {}, "TIN算法"),
(EarthworkAlgorithm.GRID.value, {"grid_size": 2.0}, "GRID算法(2米网格)"),
(EarthworkAlgorithm.PRISM.value, {"influence_radius": 1.0}, "PRISM算法(1米影响半径)")
]
for algo_id, params, description in algorithms:
print(f"\n使用{description}计算:")
try:
result = calculator.calculate_earthwork(
polygon_coords=polygon,
design_elev=design_elevation,
algorithm=algo_id,
algorithm_params=params
)
print(f" 挖方量: {result.cut_volume:.3f}")
print(f" 填方量: {result.fill_volume:.3f}")
print(f" 净方量: {result.net_volume:.3f}")
print(f" 计算面积: {result.area:.3f}")
print(f" 计算精度: {result.volume_accuracy:.3%}")
print(f" 分辨率: {result.resolution:.2f} m")
except Exception as e:
print(f" 计算失败: {str(e)}")