ai_project_v1/b3dm/tileset_data_source.py
2026-01-14 11:37:35 +08:00

167 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# tileset_data_source_py
import numpy as np
from typing import List, Dict, Optional, Tuple
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
import os
logger = logging.getLogger(__name__)
class TilesetDataSource:
"""使用py3dtiles库的数据源"""
def __init__(self, tileset_path: str, cache_size: int = 1000):
self.tileset_path = os.path.abspath(tileset_path)
self.tileset_dir = os.path.dirname(self.tileset_path)
self.cache_size = cache_size
self._tileset = None
self._point_cache = {}
self._executor = ThreadPoolExecutor(max_workers=4)
self._crs = "EPSG:4979"
async def initialize(self):
"""初始化"""
try:
# 尝试导入py3dtiles
try:
import py3dtiles
from py3dtiles.tileset import TileSet
except ImportError:
logger.warning("py3dtiles未安装将使用简化数据源")
raise ImportError("请安装py3dtiles: pip install py3dtiles")
loop = asyncio.get_event_loop()
self._tileset = await loop.run_in_executor(
self._executor,
TileSet.from_file,
self.tileset_path
)
logger.info(f"py3dtiles数据源初始化完成: {self.tileset_path}")
except Exception as e:
logger.error(f"py3dtiles初始化失败: {str(e)}")
# 回退到简化数据源
self._tileset = None
async def get_points_in_polygon(self,
polygon_coords: List[List[float]],
z_range: Optional[Tuple[float, float]] = None) -> np.ndarray:
"""获取点数据"""
if self._tileset is None:
# 使用简化数据源
return await self._get_simulated_points(polygon_coords, z_range)
try:
# 使用py3dtiles API获取数据
points = []
polygon_np = np.array(polygon_coords)
# 遍历tileset中的所有tile
for tile in self._tileset.root_tile.traverse():
tile_points = self._extract_tile_points(tile)
if tile_points.size > 0:
# 筛选多边形内的点
points_in_polygon = self._filter_points_by_polygon(tile_points, polygon_np)
if points_in_polygon.size > 0:
points.append(points_in_polygon)
if points:
all_points = np.vstack(points)
if z_range:
mask = (all_points[:, 2] >= z_range[0]) & (all_points[:, 2] <= z_range[1])
all_points = all_points[mask]
return all_points
return np.array([])
except Exception as e:
logger.error(f"py3dtiles获取数据失败: {str(e)}")
return await self._get_simulated_points(polygon_coords, z_range)
def _extract_tile_points(self, tile) -> np.ndarray:
"""从tile提取点数据"""
try:
if hasattr(tile, 'content') and tile.content:
# 尝试获取点数据
if hasattr(tile.content, 'points'):
return tile.content.points.positions
elif hasattr(tile.content, 'body'):
# 处理其他格式
return np.array([])
return np.array([])
except:
return np.array([])
def _filter_points_by_polygon(self, points: np.ndarray, polygon: np.ndarray) -> np.ndarray:
"""筛选多边形内的点"""
from matplotlib.path import Path
if points.size == 0:
return points
path = Path(polygon[:, :2])
mask = path.contains_points(points[:, :2])
return points[mask]
async def _get_simulated_points(self,
polygon_coords: List[List[float]],
z_range: Optional[Tuple[float, float]] = None) -> np.ndarray:
"""获取模拟点数据"""
# 同上一个类的生成模拟数据方法
polygon_np = np.array(polygon_coords)
if polygon_np.shape[0] < 3:
return np.array([])
x_min, y_min = polygon_np.min(axis=0)
x_max, y_max = polygon_np.max(axis=0)
grid_size = 100
x = np.linspace(x_min, x_max, grid_size)
y = np.linspace(y_min, y_max, grid_size)
xx, yy = np.meshgrid(x, y)
points = np.column_stack([xx.ravel(), yy.ravel()])
np.random.seed(42)
base_elevation = 50.0
terrain_variation = 10.0
z = base_elevation + terrain_variation * np.sin(0.1 * xx).ravel() * np.cos(0.1 * yy).ravel()
points = np.column_stack([points[:, 0], points[:, 1], z])
from matplotlib.path import Path
path = Path(polygon_np[:, :2])
mask = path.contains_points(points[:, :2])
points = points[mask]
if z_range:
mask = (points[:, 2] >= z_range[0]) & (points[:, 2] <= z_range[1])
points = points[mask]
logger.info(f"生成 {points.shape[0]} 个模拟点")
return points
async def get_data_bounds(self) -> Dict[str, List[float]]:
"""获取数据边界"""
if self._tileset is None:
await self.initialize()
bounds = {
"min": [float('inf'), float('inf'), float('inf')],
"max": [-float('inf'), -float('inf'), -float('inf')]
}
if self._tileset and hasattr(self._tileset.root_tile, 'bounding_volume'):
bv = self._tileset.root_tile.bounding_volume
if hasattr(bv, 'get_corners'):
corners = bv.get_corners()
if corners is not None:
bounds["min"] = corners.min(axis=0).tolist()
bounds["max"] = corners.max(axis=0).tolist()
return bounds
def get_crs(self) -> str:
return self._crs