ai_project_v1/b3dm/tileset_data_source.py

105 lines
3.4 KiB
Python
Raw Permalink Normal View History

2026-01-14 11:37:35 +08:00
# tileset_data_source_py
import numpy as np
from typing import List, Dict, Optional, Tuple
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
import os
2026-01-29 11:51:20 +08:00
from pathlib import Path
from b3dm.data_3dtiles_manager import MinIO3DTilesManager
import b3dm.data_3dtiles_to_dem as data_3dtiles_to_dem
2026-01-14 11:37:35 +08:00
logger = logging.getLogger(__name__)
2026-01-29 11:51:20 +08:00
ENDPOINT_URL = "222.212.85.86:9000"
ACCESS_KEY = "WuRenJi"
SECRET_KEY = "WRJ@2024"
2026-01-14 11:37:35 +08:00
class TilesetDataSource:
"""使用py3dtiles库的数据源"""
2026-01-29 11:51:20 +08:00
def __init__(self, url: str, cache_size: int = 1000):
self.url = url
self.tileset_path = None
self.tileset_dir = None
2026-01-14 11:37:35 +08:00
self._crs = "EPSG:4979"
2026-01-29 11:51:20 +08:00
def parse_minio_url(self, url):
manager = MinIO3DTilesManager(
endpoint_url=ENDPOINT_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=False
)
return manager.parse_minio_url(url)
2026-01-14 11:37:35 +08:00
2026-01-29 11:51:20 +08:00
def upload_file(self, bucket_name, object_name, file_path):
manager = MinIO3DTilesManager(
endpoint_url=ENDPOINT_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=False
)
flag, path = manager.upload_file(bucket_name, object_name, file_path)
if flag :
os.remove(file_path)
return flag, path
2026-01-14 11:37:35 +08:00
2026-01-29 11:51:20 +08:00
def dowload_map_data(self, url: str) :
# 下载3dtiles地图数据
manager = MinIO3DTilesManager(
endpoint_url=ENDPOINT_URL,
access_key=ACCESS_KEY,
secret_key=SECRET_KEY,
secure=False
)
success, tileset_path = manager.download_full_tileset(
tileset_url=url,
save_dir=f"data_3dtiles",
region_filter=None
)
if success :
self.tileset_path = os.path.abspath(tileset_path)
self.tileset_dir = os.path.dirname(tileset_path)
2026-01-14 11:37:35 +08:00
2026-01-29 11:51:20 +08:00
async def get_points_in_polygon(self, polygon_coords, z_range=None):
"""获取多边形内的点数据"""
points = data_3dtiles_to_dem.parse_tileset(self.tileset_path, polygon_coords)
return np.array(points)
2026-01-14 11:37:35 +08:00
async def get_data_bounds(self) -> Dict[str, List[float]]:
"""获取数据边界"""
bounds = {
"min": [float('inf'), float('inf'), float('inf')],
"max": [-float('inf'), -float('inf'), -float('inf')]
}
if self._tileset and hasattr(self._tileset.root_tile, 'bounding_volume'):
bv = self._tileset.root_tile.bounding_volume
if hasattr(bv, 'get_corners'):
corners = bv.get_corners()
if corners is not None:
bounds["min"] = corners.min(axis=0).tolist()
bounds["max"] = corners.max(axis=0).tolist()
return bounds
def get_crs(self) -> str:
2026-01-29 11:51:20 +08:00
return self._crs
async def main() :
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SCRIPT_PAR_DIR = os.path.dirname(SCRIPT_DIR)
tileset_path = os.path.join(SCRIPT_PAR_DIR, "data/3dtiles/tileset.json")
data_source_3d_tiles = TilesetDataSource(tileset_path)
# tileSet = TileSet()
# path = Path(tileset_path)
# tileset_data = tileSet.from_file(path)
print("====================================================")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())