2025-07-25 10:24:47 +08:00

235 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from shapely.geometry import Polygon, Point
from tqdm import tqdm
from py3dtiles.tileset import TileSet
import requests
# 日志配置
logger = logging.getLogger("TilesetProcessor")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class TilesetProcessor:
"""3D Tiles数据集处理器用于加载、分析和比较两个3D Tiles模型"""
def __init__(self, tileset_path1, tileset_path2, resolution=1.0, polygon_points=None):
self.tileset1 = self._load_tileset(tileset_path1)
self.tileset2 = self._load_tileset(tileset_path2)
self.resolution = resolution
self.analysis_area = None
self.height_difference_grid = None
self.grid_bounds = None
if polygon_points:
self.set_analysis_area(polygon_points=polygon_points)
def _load_tileset(self, path_or_url):
try:
logger.info(f"加载3D Tiles数据集: {path_or_url}")
if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
resp = requests.get(path_or_url)
resp.raise_for_status()
tileset_json = resp.json()
tileset = TileSet.from_dict(tileset_json)
else:
tileset = TileSet.from_file(path_or_url)
logger.info(f"成功加载,包含 {len(tileset.root.children)} 个根瓦片")
return tileset
except Exception as e:
logger.error(f"加载数据集失败(路径: {path_or_url}: {e}")
raise
def set_analysis_area(self, bounds=None, polygon_points=None):
if polygon_points:
self.analysis_area = Polygon(polygon_points)
min_x = min(p[0] for p in polygon_points)
min_y = min(p[1] for p in polygon_points)
max_x = max(p[0] for p in polygon_points)
max_y = max(p[1] for p in polygon_points)
self.grid_bounds = (min_x, min_y, max_x, max_y)
logger.info(f"设置多边形分析区域: {polygon_points}")
elif bounds:
min_x, min_y, max_x, max_y = bounds
self.analysis_area = Polygon([
(min_x, min_y),
(max_x, min_y),
(max_x, max_y),
(min_x, max_y)
])
self.grid_bounds = bounds
logger.info(f"设置矩形分析区域: {bounds}")
else:
logger.error("请提供 bounds 或 polygon_points")
return False
logger.info(f"分析区域面积: {self.analysis_area.area:.2f} 平方米")
return True
def sample_heights(self):
if self.analysis_area is None:
logger.error("请先设置分析区域")
return False
logger.info("开始在分析区域内采样高度值...")
min_x, min_y, max_x, max_y = self.grid_bounds
rows = int((max_y - min_y) / self.resolution) + 1
cols = int((max_x - min_x) / self.resolution) + 1
self.height_difference_grid = np.full((rows, cols), np.nan, dtype=np.float32)
total_points = rows * cols
logger.info(f"创建了 {rows}x{cols}={total_points} 个采样点")
with tqdm(total=total_points, desc="采样高度点") as pbar:
for i in range(rows):
for j in range(cols):
x = min_x + j * self.resolution
y = min_y + i * self.resolution
point = Point(x, y)
if not self.analysis_area.contains(point):
pbar.update(1)
continue
height1 = self._sample_height_at_point(self.tileset1, x, y)
height2 = self._sample_height_at_point(self.tileset2, x, y)
if height1 is not None and height2 is not None:
self.height_difference_grid[i, j] = height2 - height1
pbar.update(1)
valid_differences = self.height_difference_grid[~np.isnan(self.height_difference_grid)]
if len(valid_differences) > 0:
logger.info("高度变化统计:")
logger.info(f" 平均变化: {np.mean(valid_differences):.2f}m")
logger.info(f" 最大上升: {np.max(valid_differences):.2f}m")
logger.info(f" 最大下降: {np.min(valid_differences):.2f}m")
logger.info(f" 变化标准差: {np.std(valid_differences):.2f}m")
else:
logger.warning("未找到有效的高度差异数据")
return True
def _sample_height_at_point(self, tileset, x, y, max_depth=3):
def find_tile(tile, depth=0):
bbox = tile.bounding_volume.box
min_x_tile = bbox[0] - bbox[3]
max_x_tile = bbox[0] + bbox[3]
min_y_tile = bbox[1] - bbox[4]
max_y_tile = bbox[1] + bbox[4]
if not (min_x_tile <= x <= max_x_tile and min_y_tile <= y <= max_y_tile):
return None
if (tile.content is not None and depth >= max_depth) or not tile.children:
return tile
for child in tile.children:
result = find_tile(child, depth + 1)
if result is not None:
return result
return None
tile = find_tile(tileset.root)
if tile is None or tile.content is None:
return None
try:
# 简化模拟返回瓦片中心高度加随机偏移
return tile.bounding_volume.box[2] + np.random.uniform(-0.5, 0.5)
except Exception as e:
logger.warning(f"获取瓦片高度失败: {e}")
return None
def export_results(self, output_dir="results"):
if self.height_difference_grid is None:
logger.error("请先采样高度值")
return
os.makedirs(output_dir, exist_ok=True)
csv_path = os.path.join(output_dir, "height_differences.csv")
logger.info(f"导出CSV文件: {csv_path}")
min_x, min_y, max_x, max_y = self.grid_bounds
rows, cols = self.height_difference_grid.shape
data = []
for i in range(rows):
for j in range(cols):
if not np.isnan(self.height_difference_grid[i, j]):
x = min_x + j * self.resolution
y = min_y + i * self.resolution
data.append({
'x': x,
'y': y,
'height_difference': self.height_difference_grid[i, j]
})
df = pd.DataFrame(data)
df.to_csv(csv_path, index=False)
self._generate_heatmap(output_dir)
logger.info(f"结果已导出到 {output_dir} 目录")
def _generate_heatmap(self, output_dir):
colors = [(0.0, 0.0, 1.0), (1.0, 1.0, 1.0), (1.0, 0.0, 0.0)]
cmap = LinearSegmentedColormap.from_list('height_diff_cmap', colors, N=256)
data = self.height_difference_grid.copy()
valid_mask = ~np.isnan(data)
if not np.any(valid_mask):
logger.warning("没有有效的高度差异数据,无法生成热图")
return
data[~valid_mask] = 0
plt.figure(figsize=(12, 10))
plt.imshow(data, cmap=cmap, origin='lower',
extent=[self.grid_bounds[0], self.grid_bounds[2],
self.grid_bounds[1], self.grid_bounds[3]],
alpha=0.9)
cbar = plt.colorbar()
cbar.set_label('高度变化 (米)', fontsize=12)
plt.title('两个3D Tiles模型的高度变化分布', fontsize=16)
plt.xlabel('X坐标 (米)', fontsize=12)
plt.ylabel('Y坐标 (米)', fontsize=12)
heatmap_path = os.path.join(output_dir, "height_difference_heatmap.png")
plt.savefig(heatmap_path, dpi=300, bbox_inches='tight')
plt.close()
logger.info(f"热图已保存到: {heatmap_path}")
if __name__ == "__main__":
tileset1_url = "http://8.137.54.85:9000/300bdf2b-a150-406e-be63-d28bd29b409f/dszh/1748398014403562192_OUT/B3DM/tileset.json"
tileset2_url = "http://8.137.54.85:9000/300bdf2b-a150-406e-be63-d28bd29b409f/dszh/1748325943733189898_OUT/B3DM/tileset.json"
polygon_coords = [
(102.2232, 29.3841),
(102.2261, 29.3845),
(102.2263, 29.3821),
(102.2231, 29.3818)
]
resolution = 0.5
output_dir = "output_results"
processor = TilesetProcessor(tileset1_url, tileset2_url, resolution, polygon_coords)
if processor.sample_heights():
processor.export_results(output_dir)
print("分析完成!结果已导出到指定目录。")
else:
print("高度采样失败,无法完成分析。")