import os import numpy as np import pandas as pd import py3dtiles from py3dtiles import Tileset, BoundingVolumeBox from shapely.geometry import Polygon, Point import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap import argparse from tqdm import tqdm import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class TilesetProcessor: """3D Tiles数据集处理器,用于加载、分析和比较两个3D Tiles模型""" def __init__(self, tileset_path1, tileset_path2, resolution=1.0): """ 初始化处理器 参数: tileset_path1: 第一个3D Tiles数据集路径 tileset_path2: 第二个3D Tiles数据集路径 resolution: 分析网格的分辨率(米) """ self.tileset1 = self._load_tileset(tileset_path1) self.tileset2 = self._load_tileset(tileset_path2) self.resolution = resolution self.analysis_area = None self.height_difference_grid = None self.grid_bounds = None def _load_tileset(self, path): """加载3D Tiles数据集""" try: logger.info(f"加载3D Tiles数据集: {path}") tileset = Tileset.from_file(path) logger.info(f"成功加载,包含 {len(tileset.root.children)} 个根瓦片") return tileset except Exception as e: logger.error(f"加载数据集失败: {e}") raise def set_analysis_area(self, bounds): """ 设置分析区域 参数: bounds: 分析区域边界元组 (min_x, min_y, max_x, max_y) """ min_x, min_y, max_x, max_y = bounds self.analysis_area = Polygon([ (min_x, min_y), (max_x, min_y), (max_x, max_y), (min_x, max_y) ]) self.grid_bounds = bounds logger.info(f"设置分析区域: {bounds}") logger.info(f"分析区域面积: {self.analysis_area.area:.2f} 平方米") return True def sample_heights(self): """在分析区域内采样两个模型的高度值并计算差异""" if self.analysis_area is None: logger.error("请先设置分析区域") return False logger.info("开始在分析区域内采样高度值...") # 创建网格 min_x, min_y, max_x, max_y = self.grid_bounds rows = int((max_y - min_y) / self.resolution) + 1 cols = int((max_x - min_x) / self.resolution) + 1 # 初始化高度差异网格 self.height_difference_grid = np.zeros((rows, cols), dtype=np.float32) self.height_difference_grid[:] = np.nan # 初始化为NaN,表示未采样 # 对每个网格点进行采样 total_points = rows * cols logger.info(f"创建了 {rows}x{cols}={total_points} 个采样点") with tqdm(total=total_points, desc="采样高度点") as pbar: for i in range(rows): for j in range(cols): # 计算当前点的坐标 x = min_x + j * self.resolution y = min_y + i * self.resolution point = Point(x, y) # 检查点是否在分析区域内 if not self.analysis_area.contains(point): pbar.update(1) continue # 采样两个模型的高度 height1 = self._sample_height_at_point(self.tileset1, x, y) height2 = self._sample_height_at_point(self.tileset2, x, y) # 计算高度差异 if height1 is not None and height2 is not None: self.height_difference_grid[i, j] = height2 - height1 pbar.update(1) # 统计结果 valid_differences = self.height_difference_grid[~np.isnan(self.height_difference_grid)] if len(valid_differences) > 0: logger.info(f"高度变化统计:") logger.info(f" 平均变化: {np.mean(valid_differences):.2f}m") logger.info(f" 最大上升: {np.max(valid_differences):.2f}m") logger.info(f" 最大下降: {np.min(valid_differences):.2f}m") logger.info(f" 变化标准差: {np.std(valid_differences):.2f}m") else: logger.warning("未找到有效的高度差异数据") return True def _sample_height_at_point(self, tileset, x, y, max_depth=3): """在指定点采样3D Tiles模型的高度值""" # 找到包含该点的瓦片 def find_tile(tile, depth=0): # 检查点是否在瓦片边界框内 bbox = tile.bounding_volume.box min_x_tile = bbox[0] - bbox[3] max_x_tile = bbox[0] + bbox[3] min_y_tile = bbox[1] - bbox[4] max_y_tile = bbox[1] + bbox[4] if not (min_x_tile <= x <= max_x_tile and min_y_tile <= y <= max_y_tile): return None # 如果瓦片有内容且深度足够,或者没有子瓦片,就使用这个瓦片 if (tile.content is not None and depth >= max_depth) or not tile.children: return tile # 否则递归查找子瓦片 for child in tile.children: result = find_tile(child, depth + 1) if result is not None: return result return None # 找到包含该点的最详细瓦片 tile = find_tile(tileset.root) if tile is None or tile.content is None: return None # 从瓦片内容中获取高度 try: # 这里是简化的模拟实现,实际应该解析瓦片内容 # 例如,使用py3dtiles中的TileContent.get_vertices()获取顶点 # 然后找到最近的顶点或三角形来计算高度 # 这里为了示例,我们返回瓦片中心的高度加上一个随机偏移 return tile.bounding_volume.box[2] + np.random.uniform(-0.5, 0.5) except Exception as e: logger.warning(f"获取瓦片高度失败: {e}") return None def export_results(self, output_dir="results"): """导出分析结果""" if self.height_difference_grid is None: logger.error("请先采样高度值") return os.makedirs(output_dir, exist_ok=True) # 导出CSV文件 csv_path = os.path.join(output_dir, "height_differences.csv") logger.info(f"导出CSV文件: {csv_path}") # 创建DataFrame min_x, min_y, max_x, max_y = self.grid_bounds rows, cols = self.height_difference_grid.shape data = [] for i in range(rows): for j in range(cols): if not np.isnan(self.height_difference_grid[i, j]): x = min_x + j * self.resolution y = min_y + i * self.resolution data.append({ 'x': x, 'y': y, 'height_difference': self.height_difference_grid[i, j] }) df = pd.DataFrame(data) df.to_csv(csv_path, index=False) # 生成热图 self._generate_heatmap(output_dir) logger.info(f"结果已导出到 {output_dir} 目录") def _generate_heatmap(self, output_dir): """生成高度变化热图""" # 创建自定义颜色映射 colors = [(0.0, 0.0, 1.0), (1.0, 1.0, 1.0), (1.0, 0.0, 0.0)] # 蓝-白-红 cmap = LinearSegmentedColormap.from_list('height_diff_cmap', colors, N=256) # 准备数据 data = self.height_difference_grid.copy() valid_mask = ~np.isnan(data) if not np.any(valid_mask): logger.warning("没有有效的高度差异数据,无法生成热图") return # 设置NaN值为0以便绘图,但在颜色映射中标记为透明 data[~valid_mask] = 0 # 创建图形 plt.figure(figsize=(12, 10)) plt.imshow(data, cmap=cmap, origin='lower', extent=[self.grid_bounds[0], self.grid_bounds[2], self.grid_bounds[1], self.grid_bounds[3]], alpha=0.9) # 添加颜色条 cbar = plt.colorbar() cbar.set_label('高度变化 (米)', fontsize=12) # 设置标题和坐标轴 plt.title('两个3D Tiles模型的高度变化分布', fontsize=16) plt.xlabel('X坐标 (米)', fontsize=12) plt.ylabel('Y坐标 (米)', fontsize=12) # 保存图形 heatmap_path = os.path.join(output_dir, "height_difference_heatmap.png") plt.savefig(heatmap_path, dpi=300, bbox_inches='tight') plt.close() logger.info(f"热图已保存到: {heatmap_path}") def main(): parser = argparse.ArgumentParser(description='分析两个3D Tiles模型指定区域的高度变化') parser.add_argument('--tileset1', required=True, help='第一个3D Tiles数据集路径') parser.add_argument('--tileset2', required=True, help='第二个3D Tiles数据集路径') parser.add_argument('--bounds', required=True, type=float, nargs=4, help='分析区域边界 [min_x, min_y, max_x, max_y]') parser.add_argument('--resolution', type=float, default=1.0, help='采样分辨率(米)') parser.add_argument('--output', default='results', help='输出目录') args = parser.parse_args() processor = TilesetProcessor(args.tileset1, args.tileset2, args.resolution) # 设置分析区域 if processor.set_analysis_area(args.bounds): if processor.sample_heights(): processor.export_results(args.output) print("分析完成!结果已导出到指定目录。") else: print("高度采样失败,无法完成分析。") else: print("设置分析区域失败,无法进行分析。") if __name__ == "__main__": main()