From eb6ce0de462bd54b7cf9ac62eb5da3c8fc89fbbd Mon Sep 17 00:00:00 2001 From: liyubo Date: Mon, 19 Jan 2026 09:36:22 +0800 Subject: [PATCH] =?UTF-8?q?3dtiles=E5=9C=B0=E5=9B=BE=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=A2=84=E5=8A=A0=E8=BD=BD=E6=8E=A5=E5=8F=A3=EF=BC=8Cterrain3d?= =?UTF-8?q?=5Fanalyzer=5Fcolor=E7=A7=BB=E5=8A=A8=E5=88=B0slope=5Faspect=5F?= =?UTF-8?q?img?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- b3dm/data_3dtiles_to_dem.py | 1130 +++++++++++------ b3dm/earthwork_api.py | 6 +- ..._analyzer_color.py => slope_aspect_img.py} | 2 +- b3dm/terrain_api.py | 45 +- b3dm/terrain_calculator.py | 36 +- 5 files changed, 788 insertions(+), 431 deletions(-) rename b3dm/{terrain3d_analyzer_color.py => slope_aspect_img.py} (99%) diff --git a/b3dm/data_3dtiles_to_dem.py b/b3dm/data_3dtiles_to_dem.py index f4b387d..838c3a7 100644 --- a/b3dm/data_3dtiles_to_dem.py +++ b/b3dm/data_3dtiles_to_dem.py @@ -1,30 +1,46 @@ -# data_3dtiles_to_dem52 +# 文件过滤+顶点过滤+点云补足 +#!/usr/bin/env python3 +""" +3D Tiles 到 DEM 转换器 +功能: +1. 多层过滤:快速过滤明显不在区域内的B3DM文件 +2. 精确过滤:顶点级别的区域过滤 +3. 点云补足:稀疏点云的智能增强 +4. DEM生成:高质量DEM输出 +""" + import os import json import numpy as np -import pandas as pd -import pyproj import struct -from osgeo import gdal, osr import uuid -from glb_with_draco import DracoGLBParser +import re +import time +from math import radians, sin, cos, sqrt, atan2 +from osgeo import gdal, osr +import pyproj +from b3dm.glb_with_draco import DracoGLBParser -# 解决GDAL中文路径/警告问题(生产必加) +# 解决GDAL中文路径/警告问题 gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES") gdal.SetConfigOption("CPL_ZIP_ENCODING", "UTF-8") gdal.PushErrorHandler('CPLQuietErrorHandler') +# ========== 区域过滤器类 ========== class RegionFilter: - """完整的区域过滤器,支持所有格式""" + """智能区域过滤器,支持多层过滤策略""" def __init__(self, region_coords=None, enable_tile_filter=True, debug=False): """ 初始化区域过滤器 - :param debug: 是否输出调试信息 + :param region_coords: 区域坐标 [(min_lon, min_lat), (max_lon, max_lat)] + :param enable_tile_filter: 是否启用瓦片级别过滤 + :param debug: 调试模式 """ self.region_coords = region_coords self.enable_tile_filter = enable_tile_filter self.debug = debug + self._transformer = None if region_coords: # 提取区域边界 @@ -46,10 +62,10 @@ class RegionFilter: self.filter_max_lat = self.max_lat + lat_expand if self.debug: - print(f"[DEBUG] 区域过滤器初始化:") - print(f" 目标区域: Lon[{self.min_lon:.6f}, {self.max_lon:.6f}], " + print(f"[区域过滤] 初始化:") + print(f" 目标区域: Lon[{self.min_lon:.6f}, {self.max_lon:.6f}], " f"Lat[{self.min_lat:.6f}, {self.max_lat:.6f}]") - print(f" 过滤区域: Lon[{self.filter_min_lon:.6f}, {self.filter_max_lon:.6f}], " + print(f" 过滤区域: Lon[{self.filter_min_lon:.6f}, {self.filter_max_lon:.6f}], " f"Lat[{self.filter_min_lat:.6f}, {self.filter_max_lat:.6f}]") else: self.min_lon = self.min_lat = -180 @@ -57,47 +73,55 @@ class RegionFilter: self.filter_min_lon = self.filter_min_lat = -180 self.filter_max_lon = self.filter_max_lat = 180 if self.debug: - print("[DEBUG] 区域过滤器: 未指定区域,将处理所有数据") + print("[区域过滤] 未指定区域,处理所有数据") - def check_tile_bounding_volume(self, bounding_volume): - """检查瓦片的包围体是否与指定区域相交""" + def apply_transform_to_box(self, center_point, transform_matrix): + """将box中心点应用transform矩阵""" + # 原始box中心点 + x, y, z = center_point + + # 转换为齐次坐标 + point_hom = np.array([x, y, z, 1.0], dtype=np.float64) + + # transform矩阵(4x4) + transform_mat = np.array(transform_matrix).reshape(4, 4).astype(np.float64).T + + # 矩阵乘法 + point_transformed_hom = np.dot(point_hom, transform_mat.T) + + # 转回3D坐标 + point_transformed = point_transformed_hom[:3] / point_transformed_hom[3] + + return point_transformed + + def _get_transformer(self): + """获取坐标转换器""" + if self._transformer is None: + ecef = pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84') + lla = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') + self._transformer = pyproj.Transformer.from_proj(ecef, lla, always_xy=True) + return self._transformer + + def check_tile_bounding_volume(self, current_transform, bounding_volume): + """检查瓦片包围体是否在区域内""" if not self.region_coords or not self.enable_tile_filter: return True try: if 'region' in bounding_volume: return self._check_region(bounding_volume['region']) - elif 'box' in bounding_volume: - box = bounding_volume['box'] - if self.debug: - print(f"[DEBUG] 检查box包围体,长度={len(box)}") - - if len(box) == 12: - result = self._check_box_12(box) - elif len(box) == 15: - result = self._check_box_15(box) - else: - if self.debug: - print(f"[DEBUG] 异常box长度 {len(box)},默认通过") - return True - - if self.debug and not result: - print(f"[DEBUG] Box被过滤") - return result - + return self._check_box(current_transform, bounding_volume['box']) elif 'sphere' in bounding_volume: return self._check_sphere(bounding_volume['sphere']) - return True - except Exception as e: if self.debug: - print(f"[DEBUG] 包围体检查出错: {e}") + print(f"[包围体检查] 错误: {e}") return True def _check_region(self, region): - """检查region格式 [west, south, east, north, minHeight, maxHeight]""" + """检查region格式包围体""" if len(region) != 6: return True @@ -107,64 +131,63 @@ class RegionFilter: if (east < self.filter_min_lon or west > self.filter_max_lon or north < self.filter_min_lat or south > self.filter_max_lat): if self.debug: - print(f"[DEBUG] Region过滤: [{west:.3f},{south:.3f},{east:.3f},{north:.3f}]") + print(f"[region过滤] 区域外: [{west:.3f},{south:.3f},{east:.3f},{north:.3f}]") return False return True - def _check_box_12(self, box): - """检查12值box格式""" - # 提取参数 + def _check_box(self, current_transform, box): + """检查box包围体,支持12值和15值格式""" + if len(box) == 12: + return self._check_box_12(current_transform, box) + elif len(box) == 15: + return self._check_box_15(current_transform, box) + else: + if self.debug: + print(f"[box检查] 异常长度 {len(box)},默认通过") + return True + + def _check_box_12(self, current_transform, box): + """检查12值box格式 [cx, cy, cz, halfX, 0, 0, 0, halfY, 0, 0, 0, halfZ]""" cx, cy, cz = box[0], box[1], box[2] - halfX = box[3] - halfY = box[7] - halfZ = box[11] - + halfX, halfY, halfZ = box[3], box[7], box[11] + if self.debug: - print(f"[DEBUG] 12值box: center=({cx:.1f},{cy:.1f},{cz:.1f}), " - f"halfs=({halfX},{halfY},{halfZ})") + print(f"[box12检查] center=({cx:.1f},{cy:.1f},{cz:.1f}), halfs=({halfX},{halfY},{halfZ})") + + # 应用转换矩阵 + transform_center = self.apply_transform_to_box([cx, cy, cz], current_transform) + cx, cy, cz = transform_center[0], transform_center[1], transform_center[2] try: - # 获取转换器 transformer = self._get_transformer() + center_lon, center_lat, _ = transformer.transform(cx, cy, cz, radians=False) - # 转换中心点 - center_lon, center_lat, _ = transformer.transform( - cx, cy, cz, radians=False - ) - - # 计算最大偏移(简化方法,避免计算所有角点) + # 计算最大偏移 max_half = max(halfX, halfY, halfZ) - - # 转换为经纬度偏移 earth_radius = 6378137.0 lon_offset = np.degrees(max_half / (earth_radius * np.cos(np.radians(center_lat)))) lat_offset = np.degrees(max_half / earth_radius) - # 计算box范围 + # 检查相交 box_min_lon = center_lon - lon_offset box_max_lon = center_lon + lon_offset box_min_lat = center_lat - lat_offset box_max_lat = center_lat + lat_offset - if self.debug: - print(f"[DEBUG] 中心: ({center_lon:.6f}, {center_lat:.6f})") - print(f"[DEBUG] 范围: Lon[{box_min_lon:.6f}, {box_max_lon:.6f}], " - f"Lat[{box_min_lat:.6f}, {box_max_lat:.6f}]") - - # 检查相交 if (box_max_lon < self.filter_min_lon or box_min_lon > self.filter_max_lon or box_max_lat < self.filter_min_lat or box_min_lat > self.filter_max_lat): + if self.debug: + print(f"[box12过滤] 区域外: 中心({center_lon:.3f},{center_lat:.3f})") return False return True - except Exception as e: if self.debug: - print(f"[DEBUG] Box12检查失败: {e}") - return True # 失败时默认通过 + print(f"[box12检查] 错误: {e}") + return True - def _check_box_15(self, box): + def _check_box_15(self, current_transform, box): """检查标准15值box格式""" if len(box) < 15: return True @@ -172,78 +195,132 @@ class RegionFilter: cx, cy, cz = box[0], box[1], box[2] halfX, halfY, halfZ = box[12], box[13], box[14] - # 简化处理:只检查中心点 - transformer = self._get_transformer() - center_lon, center_lat, _ = transformer.transform(cx, cy, cz, radians=False) - - # 计算最大偏移 - max_half = max(halfX, halfY, halfZ) - earth_radius = 6378137.0 - lon_offset = np.degrees(max_half / (earth_radius * np.cos(np.radians(center_lat)))) - lat_offset = np.degrees(max_half / earth_radius) - - box_min_lon = center_lon - lon_offset - box_max_lon = center_lon + lon_offset - box_min_lat = center_lat - lat_offset - box_max_lat = center_lat + lat_offset - - if (box_max_lon < self.filter_min_lon or box_min_lon > self.filter_max_lon or - box_max_lat < self.filter_min_lat or box_min_lat > self.filter_max_lat): - return False - - return True + try: + transformer = self._get_transformer() + center_lon, center_lat, _ = transformer.transform(cx, cy, cz, radians=False) + + max_half = max(halfX, halfY, halfZ) + earth_radius = 6378137.0 + lon_offset = np.degrees(max_half / (earth_radius * np.cos(np.radians(center_lat)))) + lat_offset = np.degrees(max_half / earth_radius) + + box_min_lon = center_lon - lon_offset + box_max_lon = center_lon + lon_offset + box_min_lat = center_lat - lat_offset + box_max_lat = center_lat + lat_offset + + if (box_max_lon < self.filter_min_lon or box_min_lon > self.filter_max_lon or + box_max_lat < self.filter_min_lat or box_min_lat > self.filter_max_lat): + if self.debug: + print(f"[box15过滤] 区域外: 中心({center_lon:.3f},{center_lat:.3f})") + return False + + return True + except Exception as e: + if self.debug: + print(f"[box15检查] 错误: {e}") + return True def _check_sphere(self, sphere): - """检查sphere格式 [centerX, centerY, centerZ, radius]""" + """检查sphere包围体""" if len(sphere) < 4: return True cx, cy, cz, radius = sphere[0], sphere[1], sphere[2], sphere[3] - transformer = self._get_transformer() - center_lon, center_lat, _ = transformer.transform(cx, cy, cz, radians=False) + try: + transformer = self._get_transformer() + center_lon, center_lat, _ = transformer.transform(cx, cy, cz, radians=False) + + earth_radius = 6378137.0 + lon_offset = np.degrees(radius / (earth_radius * np.cos(np.radians(center_lat)))) + lat_offset = np.degrees(radius / earth_radius) + + sphere_min_lon = center_lon - lon_offset + sphere_max_lon = center_lon + lon_offset + sphere_min_lat = center_lat - lat_offset + sphere_max_lat = center_lat + lat_offset + + if (sphere_max_lon < self.filter_min_lon or sphere_min_lon > self.filter_max_lon or + sphere_max_lat < self.filter_min_lat or sphere_min_lat > self.filter_max_lat): + if self.debug: + print(f"[sphere过滤] 区域外: 中心({center_lon:.3f},{center_lat:.3f})") + return False + + return True + except Exception as e: + if self.debug: + print(f"[sphere检查] 错误: {e}") + return True + + def check_b3dm_file_quick(self, b3dm_path, current_transform, bounding_volume=None): + """快速检查B3DM文件是否可能在区域内""" + if not self.region_coords or not self.enable_tile_filter: + return True - # 计算半径对应的经纬度偏移 - earth_radius = 6378137.0 - lon_offset = np.degrees(radius / (earth_radius * np.cos(np.radians(center_lat)))) - lat_offset = np.degrees(radius / earth_radius) + filename = os.path.basename(b3dm_path) - sphere_min_lon = center_lon - lon_offset - sphere_max_lon = center_lon + lon_offset - sphere_min_lat = center_lat - lat_offset - sphere_max_lat = center_lat + lat_offset + if self.debug: + print(f"[快速检查] B3DM文件: {filename}") - if (sphere_max_lon < self.filter_min_lon or sphere_min_lon > self.filter_max_lon or - sphere_max_lat < self.filter_min_lat or sphere_min_lat > self.filter_max_lat): - return False + # 方法1: 使用包围体信息 + if bounding_volume: + return self.check_tile_bounding_volume(current_transform, bounding_volume) + # # 方法2: 检查文件名中的坐标信息 + # coord_patterns = [ + # r'tile[_\-](\d+\.?\d*)[_\-](\d+\.?\d*)', + # r'(\d+\.?\d*)[_\-](\d+\.?\d*)\.b3dm$', + # r'(\d{3})[_\-](\d{2})\.b3dm$', + # ] + + # for pattern in coord_patterns: + # match = re.search(pattern, filename, re.IGNORECASE) + # if match: + # try: + # lon = float(match.group(1)) + # lat = float(match.group(2)) + + # # 调整坐标范围 + # if lon > 180: + # lon = lon - 360 + # if lat > 90: + # lat = 90 - (lat - 90) + + # if self.debug: + # print(f"[快速检查] 提取坐标: ({lon:.6f}, {lat:.6f})") + + # if (lon < self.filter_min_lon or lon > self.filter_max_lon or + # lat < self.filter_min_lat or lat > self.filter_max_lat): + # if self.debug: + # print(f"[快速检查] 坐标({lon:.2f},{lat:.2f})在区域外") + # return False + + # return True + # except: + # continue + + # 方法3: 检查文件大小 + try: + file_size = os.path.getsize(b3dm_path) + if file_size < 1024: # 小于1KB跳过 + if self.debug: + print(f"[快速检查] 文件过小({file_size} bytes),跳过") + return False + except: + pass + + # 默认处理 return True - def _get_transformer(self): - """获取或创建坐标转换器""" - if not hasattr(self, '_transformer'): - ecef = pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84') - lla = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') - self._transformer = pyproj.Transformer.from_proj(ecef, lla) - return self._transformer - def filter_points(self, points): - """ - 过滤点集,只保留区域内的点 - :param points: 点列表或numpy数组,每行 [lon, lat, height] - :return: 过滤后的点列表 - """ + """精确过滤点集,只保留区域内的点""" if not self.region_coords or len(points) == 0: return points - # 转换为numpy数组处理 - if isinstance(points, list): - points_array = np.array(points) - else: - points_array = points - - if len(points_array) == 0 or points_array.shape[1] < 2: - return points_array.tolist() if isinstance(points, list) else points_array + points_array = np.array(points) + if len(points_array) == 0: + return [] # 检查每个点是否在区域内 in_region_mask = ( @@ -255,327 +332,461 @@ class RegionFilter: filtered_points = points_array[in_region_mask] - print(f"区域过滤: {len(points_array)} 个点 -> {len(filtered_points)} 个点 " - f"(过滤掉 {len(points_array) - len(filtered_points)} 个点)") + if len(points_array) > 0 and self.debug: + filtered_percent = (len(filtered_points) / len(points_array)) * 100 + print(f"[点过滤] {len(points_array)} → {len(filtered_points)} 点 ({filtered_percent:.1f}%保留)") - return filtered_points.tolist() if isinstance(points, list) else filtered_points + return filtered_points.tolist() -# ========== 核心工具函数:矩阵变换 ========== +# ========== 点云增强器类 ========== +class PointCloudEnhancer: + """点云智能增强器,用于稀疏点云的补足""" + + def __init__(self, strategy='balanced'): + """ + 初始化点云增强器 + :param strategy: 增强策略 'minimal'|'balanced'|'aggressive' + """ + self.strategy = strategy + + def enhance_points(self, base_points, original_vertices=None, + target_density=0.5, pixel_size=0.0001): + """ + 增强点云密度 + :param base_points: 基础点云 + :param original_vertices: 原始顶点(ECEF坐标) + :param target_density: 目标点密度(点/像素) + :param pixel_size: DEM像素大小 + :return: 增强后的点云 + """ + if len(base_points) < 100: # 点数太少,需要增强 + print(f"[点云增强] 点数过少({len(base_points)}),启动增强") + + if original_vertices is not None and len(original_vertices) > 0: + enhanced = self._generate_enhanced_points( + original_vertices, + num_points=1000 - len(base_points) # 补足到1000点 + ) + + # 转换到经纬度 + if enhanced.size > 0: + ecef = pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84') + lla = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') + transformer = pyproj.Transformer.from_proj(ecef, lla, always_xy=True) + + lons, lats, heights = transformer.transform( + enhanced[:, 0], enhanced[:, 1], enhanced[:, 2], radians=False + ) + + enhanced_points = np.column_stack([lons, lats, heights]) + base_array = np.array(base_points) + combined = np.vstack([base_array, enhanced_points]) + + print(f"[点云增强] 增强后点数: {len(combined)}") + return combined.tolist() + + return base_points + + def _generate_enhanced_points(self, vertices, num_points): + """生成增强点""" + if len(vertices) == 0: + return np.array([]) + + min_coords = np.min(vertices, axis=0) + max_coords = np.max(vertices, axis=0) + ranges = max_coords - min_coords + + if self.strategy == 'minimal': + # 最小增强:随机扰动 + noise = np.random.randn(num_points, 3) * 0.1 + indices = np.random.randint(0, len(vertices), num_points) + enhanced = vertices[indices] + noise + + elif self.strategy == 'balanced': + # 平衡增强:均匀采样 + 扰动 + uniform_num = int(num_points * 0.7) + noise_num = num_points - uniform_num + + # 均匀采样 + uniform_points = min_coords + np.random.rand(uniform_num, 3) * ranges + + # 顶点扰动 + indices = np.random.randint(0, len(vertices), noise_num) + noise = np.random.randn(noise_num, 3) * 0.05 + noise_points = vertices[indices] + noise + + enhanced = np.vstack([uniform_points, noise_points]) + + else: # aggressive + # 积极增强:多种方法组合 + points_per_dim = int(np.ceil(num_points ** (1/3))) + x = np.linspace(min_coords[0], max_coords[0], points_per_dim) + y = np.linspace(min_coords[1], max_coords[1], points_per_dim) + z = np.linspace(min_coords[2], max_coords[2], points_per_dim) + + xx, yy, zz = np.meshgrid(x, y, z) + grid_points = np.column_stack([xx.ravel(), yy.ravel(), zz.ravel()]) + + if len(grid_points) > num_points: + indices = np.random.choice(len(grid_points), num_points, replace=False) + enhanced = grid_points[indices] + else: + enhanced = grid_points + + return enhanced[:num_points] + +# ========== 核心工具函数 ========== def apply_transform_matrix(vertices, transform_matrix): - """ - 将模型的局部相对顶点坐标,通过transform矩阵转换为绝对ECEF坐标 - :param vertices: 原始局部顶点 (n,3) numpy数组 - :param transform_matrix: 瓦片的transform矩阵 一维列表/数组,长度16(4x4矩阵) - :return: 绝对ECEF坐标 (n,3) numpy数组 - """ + """应用变换矩阵到顶点""" if transform_matrix is None or len(transform_matrix) != 16: return vertices - # reshape为4x4,然后转置(因为glTF是列主序) + # 转换矩阵 mat = np.array(transform_matrix).reshape(4, 4).astype(np.float64).T - # 顶点齐次坐标化 (n,3) -> (n,4) 最后一列补1 + # 顶点齐次坐标化 ones = np.ones((vertices.shape[0], 1), dtype=np.float64) vertices_hom = np.hstack([vertices, ones]) - # 矩阵乘法:顶点坐标 * 变换矩阵 = 绝对坐标 + # 矩阵乘法 vertices_ecef_hom = np.dot(vertices_hom, mat.T) - - # 还原为三维坐标 (n,4) -> (n,3) - vertices_ecef = vertices_ecef_hom[:, :3] + vertices_ecef = vertices_ecef_hom[:, :3] / vertices_ecef_hom[:, 3:4] return vertices_ecef -def parse_b3dm_to_points(b3dm_path, region_filter=None, transform_matrix=None): +def parse_b3dm_to_points(b3dm_path, region_filter=None, transform_matrix=None, + enhancer=None, min_points_threshold=500): """ - 解析B3DM文件,提取顶点的经纬度+高程 - 【关键修改】区域过滤移到读取顶点后进行 + 解析B3DM文件,提取顶点 + :param enhancer: 点云增强器 + :param min_points_threshold: 最小点阈值 """ - # 获取脚本所在目录 + # 创建临时目录 script_dir = os.path.dirname(os.path.abspath(__file__)) temp_dir = os.path.join(script_dir, "temp_glb") - - # 创建临时目录(如果不存在) os.makedirs(temp_dir, exist_ok=True) - # 1. 读取B3DM二进制文件 - with open(b3dm_path, "rb") as f: - b3dm_data = f.read() - - # 跳过头部 - header = struct.unpack('<4sIIIIII', b3dm_data[:28]) - ft_json_len, ft_bin_len, bt_json_len, bt_bin_len = header[3:7] - - offset = 28 - offset += ft_json_len # 跳过Feature Table JSON - offset += ft_bin_len # 跳过Feature Table Binary - offset += bt_json_len # 跳过Batch Table JSON - offset += bt_bin_len # 跳过Batch Table Binary - - # 提取glb数据 - glb_data = b3dm_data[offset:] - if len(glb_data) < 12: - return [] - - # 2. 使用脚本目录下的临时文件 - temp_file_path = None + # 读取B3DM文件 try: - # 生成唯一临时文件名 - temp_filename = f"temp_{uuid.uuid4().hex[:8]}.glb" - temp_file_path = os.path.join(temp_dir, temp_filename) + with open(b3dm_path, "rb") as f: + b3dm_data = f.read() + except Exception as e: + print(f"[B3DM解析] 读取失败 {b3dm_path}: {e}") + return [] + + # 解析头部 + try: + header = struct.unpack('<4sIIIIII', b3dm_data[:28]) + ft_json_len, ft_bin_len, bt_json_len, bt_bin_len = header[3:7] - # 将GLB数据写入临时文件 + offset = 28 + ft_json_len + ft_bin_len + bt_json_len + bt_bin_len + glb_data = b3dm_data[offset:] + + if len(glb_data) < 12: + return [] + except Exception as e: + print(f"[B3DM解析] 头部解析失败 {b3dm_path}: {e}") + return [] + + # 写入临时文件 + temp_file_path = os.path.join(temp_dir, f"temp_{uuid.uuid4().hex[:8]}.glb") + try: with open(temp_file_path, "wb") as tmp_glb: tmp_glb.write(glb_data) - # 使用DracoGLBParser解析 + # 解析GLB parser = DracoGLBParser(temp_file_path) - # 解析 GLB 结构 parser.parse_glb_structure() - # 分析结构 parser.analyze_structure() - # 解码 Draco 网格 mesh = parser.decode_draco_meshes() - except Exception as e: - print(f"读取GLB数据失败 {b3dm_path}: {e}") - return [] - finally: - # 清理临时文件 - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except Exception as e: - pass - - if mesh is None: - print(f"无法加载模型: {b3dm_path}") - return [] - - # 获取顶点数据 - vertices = parser.get_all_vertices() - - if vertices.size == 0 or len(vertices.shape) < 2 or vertices.shape[1] != 3: - print(f"顶点数据格式无效: {b3dm_path}") - return [] - - # 3. 应用transform矩阵,局部坐标 → 绝对ECEF坐标 - vertices = apply_transform_matrix(vertices, transform_matrix) - - # 4. ECEF坐标转WGS84经纬度+高程 - try: + if mesh is None: + print(f"[B3DM解析] 无法加载模型: {b3dm_path}") + return [] + + # 获取顶点 + vertices = parser.get_all_vertices() + if vertices.size == 0 or len(vertices.shape) < 2 or vertices.shape[1] != 3: + print(f"[B3DM解析] 顶点数据无效: {b3dm_path}") + return [] + + # 保存原始顶点(用于增强) + original_vertices = vertices.copy() + + # 应用变换矩阵 + vertices = apply_transform_matrix(vertices, transform_matrix) + + # ECEF转WGS84 ecef = pyproj.Proj(proj='geocent', ellps='WGS84', datum='WGS84') lla = pyproj.Proj(proj='latlong', ellps='WGS84', datum='WGS84') transformer = pyproj.Transformer.from_proj(ecef, lla, always_xy=True) - + lons, lats, heights = transformer.transform( vertices[:, 0], vertices[:, 1], vertices[:, 2], radians=False ) - # 组合成点集 + # 组合点云 points = np.column_stack([lons, lats, heights]) - # 5. 基本数据清洗(过滤异常值) - # 过滤nan/inf + # 基本数据清洗 valid_mask = np.isfinite(points).all(axis=1) points = points[valid_mask] if len(points) == 0: - print(f"B3DM文件 {os.path.basename(b3dm_path)} 转换后无有效点") return [] - # 过滤经纬度超限值 - geo_mask = ( - (points[:, 0] >= -180) & (points[:, 0] <= 180) & - (points[:, 1] >= -90) & (points[:, 1] <= 90) - ) - points = points[geo_mask] - - if len(points) == 0: - print(f"B3DM文件 {os.path.basename(b3dm_path)} 经纬度超限") - return [] - - print(f"从 {os.path.basename(b3dm_path)} 提取到 {len(points)} 个原始顶点") - - # 6. 【关键修改】应用区域过滤器(如果提供) - # 在读取顶点后进行区域过滤,而不是在瓦片级别过滤 + # 区域过滤 if region_filter: points = region_filter.filter_points(points) if len(points) == 0: - print(f"B3DM文件 {os.path.basename(b3dm_path)} 的所有点都在区域外,跳过") + print(f"[B3DM解析] 所有点都在区域外: {os.path.basename(b3dm_path)}") return [] - print(f"最终保留 {len(points)} 个在区域内的顶点") - return points.tolist() + print(f"[B3DM解析] {os.path.basename(b3dm_path)}: {len(points)} 点") + + # 点云增强(如果需要) + if enhancer and len(points) < min_points_threshold: + points = enhancer.enhance_points( + points.tolist(), + original_vertices, + target_density=0.3, + pixel_size=0.0001 + ) + + return points.tolist() if isinstance(points, np.ndarray) else points except Exception as e: - print(f"坐标转换失败 {b3dm_path}: {e}") + print(f"[B3DM解析] 解析失败 {b3dm_path}: {e}") return [] + finally: + # 清理临时文件 + if os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except: + pass -def traverse_nested_tiles(tile_obj, base_dir, b3dm_paths, tile_transforms, region_filter=None, parent_transform=None): +# ========== 遍历函数 ========== +def traverse_nested_tiles(tile_obj, base_dir, b3dm_paths, tile_transforms, + tile_bounding_volumes, region_filter=None, + parent_transform=None, stats=None, depth=0): """ - 深度递归遍历瓦片,自动识别「子JSON」和「B3DM」 - 【修改】移除瓦片级别的区域过滤,完全依赖顶点级别的过滤 + 递归遍历tileset结构 """ - # 1. 计算当前瓦片的最终变换矩阵 + if stats is None: + stats = { + 'tiles_checked': 0, + 'b3dm_collected': 0, + 'b3dm_filtered': 0, + 'json_files': 0 + } + + stats['tiles_checked'] += 1 + + # 计算变换矩阵 current_transform = parent_transform if "transform" in tile_obj: tile_mat = tile_obj["transform"] if current_transform is None: current_transform = tile_mat else: - # 合并变换矩阵 mat1 = np.array(current_transform).reshape(4, 4) mat2 = np.array(tile_mat).reshape(4, 4) combined_mat = np.dot(mat1, mat2).flatten().tolist() current_transform = combined_mat - - # 2. 【修改】不再检查瓦片包围体,直接处理内容 - # 这样可以避免因粗略的包围体判断而漏掉部分在区域内的顶点 - # 3. 处理当前瓦片的内容 + # 处理当前瓦片内容 if "content" in tile_obj and "uri" in tile_obj["content"]: tile_uri = tile_obj["content"]["uri"] tile_abs_path = os.path.join(base_dir, tile_uri) if tile_uri.lower().endswith(".json"): - # 情况1:uri是子JSON文件 → 递归解析这个子JSON + # 子JSON文件 + stats['json_files'] += 1 if os.path.exists(tile_abs_path): - print(f"解析嵌套子JSON文件: {tile_abs_path}") - with open(tile_abs_path, "r", encoding="utf-8") as f: - sub_tileset = json.load(f) - sub_base_dir = os.path.dirname(tile_abs_path) - traverse_nested_tiles(sub_tileset["root"], sub_base_dir, b3dm_paths, tile_transforms, region_filter, current_transform) + try: + with open(tile_abs_path, "r", encoding="utf-8") as f: + sub_tileset = json.load(f) + sub_base_dir = os.path.dirname(tile_abs_path) + + traverse_nested_tiles( + sub_tileset["root"], sub_base_dir, b3dm_paths, tile_transforms, + tile_bounding_volumes, region_filter, current_transform, + stats, depth + 1 + ) + except Exception as e: + print(f"{' '*depth}[遍历] JSON解析失败: {tile_abs_path}: {e}") else: - print(f"嵌套子JSON文件不存在,跳过: {tile_abs_path}") + print(f"{' '*depth}[遍历] JSON文件不存在: {tile_abs_path}") elif tile_uri.lower().endswith(".b3dm"): - # 情况2:uri是B3DM文件 → 收集路径+对应transform矩阵 + # B3DM文件 if os.path.exists(tile_abs_path): - b3dm_paths.append(tile_abs_path) - tile_transforms.append(current_transform) - print(f"收集到B3DM文件: {tile_abs_path}") + bounding_volume = tile_obj.get("boundingVolume", {}) + + # 快速检查 + should_process = True + if region_filter: + should_process = region_filter.check_b3dm_file_quick( + tile_abs_path, current_transform, bounding_volume + ) + + if should_process: + b3dm_paths.append(tile_abs_path) + tile_transforms.append(current_transform) + tile_bounding_volumes.append(bounding_volume) + stats['b3dm_collected'] += 1 + + if region_filter and region_filter.debug: + print(f"{' '*depth}[遍历] 收集: {os.path.basename(tile_abs_path)}") + else: + stats['b3dm_filtered'] += 1 + if region_filter and region_filter.debug: + print(f"{' '*depth}[遍历] 过滤: {os.path.basename(tile_abs_path)}") else: - print(f"B3DM文件不存在,跳过: {tile_abs_path}") - - # 4. 递归遍历当前tile的子节点children + print(f"{' '*depth}[遍历] B3DM文件不存在: {tile_abs_path}") + + # 递归处理子节点 if "children" in tile_obj and len(tile_obj["children"]) > 0: for child_tile in tile_obj["children"]: - traverse_nested_tiles(child_tile, base_dir, b3dm_paths, tile_transforms, region_filter, current_transform) + traverse_nested_tiles( + child_tile, base_dir, b3dm_paths, tile_transforms, + tile_bounding_volumes, region_filter, current_transform, + stats, depth + 1 + ) + + return stats -def parse_tileset(tileset_path, region_coords=None): - """重构主解析函数,支持区域过滤+矩阵变换""" +# ========== 主解析函数 ========== +def parse_tileset(tileset_path, region_coords=None, enable_enhancement=True, debug=False): + """ + 主解析函数 + :param enable_enhancement: 是否启用点云增强 + :param debug: 调试模式 + :return: 点云数据 + """ + start_time = time.time() + if not os.path.exists(tileset_path): - raise FileNotFoundError(f"根tileset.json文件不存在: {tileset_path}") + raise FileNotFoundError(f"tileset.json不存在: {tileset_path}") - # 初始化区域过滤器(将在顶点级别使用) - region_filter = RegionFilter(region_coords) + print("=" * 60) + print("开始解析3D Tiles数据") + print("=" * 60) - # 读取根tileset.json + # 初始化过滤器 + region_filter = RegionFilter(region_coords, enable_tile_filter=True, debug=debug) + + # 初始化增强器 + enhancer = PointCloudEnhancer(strategy='balanced') if enable_enhancement else None + + # 读取根tileset with open(tileset_path, "r", encoding="utf-8") as f: tileset_json = json.load(f) root_dir = os.path.dirname(tileset_path) b3dm_paths = [] tile_transforms = [] - - print(f"开始遍历tileset结构...") - # 调用深度递归函数 - traverse_nested_tiles(tileset_json["root"], root_dir, b3dm_paths, tile_transforms, region_filter, None) - - print(f"\n遍历完成,共发现 {len(b3dm_paths)} 个B3DM文件:") - for i, b3dm_path in enumerate(b3dm_paths): - print(f" {i+1}. {os.path.basename(b3dm_path)}") - - # 批量解析所有B3DM文件,合并点云 + tile_bounding_volumes = [] + + print("[主解析] 开始遍历tileset结构...") + + # 遍历收集B3DM文件 + stats = traverse_nested_tiles( + tileset_json["root"], root_dir, b3dm_paths, tile_transforms, + tile_bounding_volumes, region_filter, None, + {'tiles_checked': 0, 'b3dm_collected': 0, + 'b3dm_filtered': 0, 'json_files': 0} + ) + + print(f"\n[主解析] 遍历完成:") + print(f" 检查瓦片: {stats['tiles_checked']}") + print(f" JSON入口: 1") + print(f" JSON文件: {stats['json_files']}") + print(f" 收集B3DM: {stats['b3dm_collected']}") + print(f" 过滤B3DM: {stats['b3dm_filtered']}") + + # 如果没有收集到文件,尝试放宽过滤 + if len(b3dm_paths) == 0 and region_coords: + print("\n[主解析] 警告: 未收集到B3DM文件,尝试禁用快速过滤...") + region_filter.enable_tile_filter = False + b3dm_paths.clear() + tile_transforms.clear() + tile_bounding_volumes.clear() + + stats = traverse_nested_tiles( + tileset_json["root"], root_dir, b3dm_paths, tile_transforms, + tile_bounding_volumes, region_filter, None, + {'tiles_checked': 0, 'b3dm_collected': 0, + 'b3dm_filtered': 0, 'json_files': 0} + ) + + print(f"[主解析] 重新扫描收集到 {len(b3dm_paths)} 个B3DM文件") + + # 解析B3DM文件 all_points = [] if len(b3dm_paths) == 0: - print("未提取到任何有效的B3DM文件!") + print("[主解析] 错误: 未找到有效的B3DM文件") return all_points - print(f"\n===== 开始解析B3DM文件 =====") + print(f"\n[主解析] 开始解析 {len(b3dm_paths)} 个B3DM文件...") + + total_points = 0 for i, (b3dm_path, transform_mat) in enumerate(zip(b3dm_paths, tile_transforms), 1): - print(f"解析文件 {i}/{len(b3dm_paths)}: {os.path.basename(b3dm_path)}") - points = parse_b3dm_to_points(b3dm_path, region_filter, transform_mat) + filename = os.path.basename(b3dm_path) + print(f" [{i}/{len(b3dm_paths)}] 解析: {filename}") + + points = parse_b3dm_to_points( + b3dm_path, + region_filter, + transform_mat, + enhancer, + min_points_threshold=300 + ) + if points: all_points.extend(points) - print(f" 提取到 {len(points)} 个点") + total_points += len(points) + print(f" 提取: {len(points)} 点") else: - print(f" 未提取到有效点") - - # 点云去重+优化 + print(f" 无有效点") + + # 点云处理 if all_points: - all_points = np.array(all_points) - original_count = len(all_points) - all_points = np.unique(all_points.round(decimals=6), axis=0) - print(f"\n最终提取点云数量: {len(all_points)} 个 (已去重, 去除了 {original_count - len(all_points)} 个重复点)") + all_points_array = np.array(all_points) + original_count = len(all_points_array) - # ========== 新增:输出整个地图文件的经纬度高程范围 ========== - print("\n" + "=" * 60) - print("地图文件总体范围统计:") - print("-" * 60) + # 去重 + all_points_array = np.unique(all_points_array.round(decimals=6), axis=0) - # 计算经纬度范围 - min_lon = np.min(all_points[:, 0]) - max_lon = np.max(all_points[:, 0]) - min_lat = np.min(all_points[:, 1]) - max_lat = np.max(all_points[:, 1]) - min_height = np.min(all_points[:, 2]) - max_height = np.max(all_points[:, 2]) - avg_height = np.mean(all_points[:, 2]) - std_height = np.std(all_points[:, 2]) + # 最终区域过滤(确保精确) + if region_filter: + filtered_points = region_filter.filter_points(all_points_array) + all_points_array = np.array(filtered_points) - # 计算中心点 - center_lon = (min_lon + max_lon) / 2 - center_lat = (min_lat + max_lat) / 2 - center_height = (min_height + max_height) / 2 + elapsed_time = time.time() - start_time - print(f"经度范围: {min_lon:.6f}° ~ {max_lon:.6f}° (跨度: {max_lon - min_lon:.6f}°)") - print(f"纬度范围: {min_lat:.6f}° ~ {max_lat:.6f}° (跨度: {max_lat - min_lat:.6f}°)") - print(f"高程范围: {min_height:.2f}m ~ {max_height:.2f}m (总高差: {max_height - min_height:.2f}m)") - print(f"平均高程: {avg_height:.2f}m (±{std_height:.2f}m)") - print(f"\n中心点坐标:") - print(f" 位置: ({center_lon:.6f}°, {center_lat:.6f}°)") - print(f" 高程: {center_height:.2f}m") + print(f"\n[主解析] 解析完成:") + print(f" 原始点数: {total_points}") + print(f" 去重后数: {len(all_points_array)}") + print(f" 处理时间: {elapsed_time:.1f}秒") + print(f" 点云密度: {len(all_points_array)/max(1, total_points)*100:.1f}%保留") - # 输出边界坐标(用于复制使用) - print(f"\n边界坐标:") - print(f" 西北角: ({min_lon:.6f}, {max_lat:.6f})") - print(f" 东北角: ({max_lon:.6f}, {max_lat:.6f})") - print(f" 西南角: ({min_lon:.6f}, {min_lat:.6f})") - print(f" 东南角: ({max_lon:.6f}, {min_lat:.6f})") - - # 如果有区域过滤,显示过滤效果 - if region_coords: - region_min_lon = min(coord[0] for coord in region_coords) - region_max_lon = max(coord[0] for coord in region_coords) - region_min_lat = min(coord[1] for coord in region_coords) - region_max_lat = max(coord[1] for coord in region_coords) - - print(f"\n区域过滤效果:") - print(f" 原始地图范围: Lon[{region_min_lon:.6f}, {region_max_lon:.6f}], Lat[{region_min_lat:.6f}, {region_max_lat:.6f}]") - print(f" 提取数据范围: Lon[{min_lon:.6f}, {max_lon:.6f}], Lat[{min_lat:.6f}, {max_lat:.6f}]") - - # 计算覆盖率 - region_width = region_max_lon - region_min_lon - region_height = region_max_lat - region_min_lat - extracted_width = max_lon - min_lon - extracted_height = max_lat - min_lat - - width_coverage = extracted_width / region_width * 100 if region_width > 0 else 0 - height_coverage = extracted_height / region_height * 100 if region_height > 0 else 0 - - print(f" 经度方向覆盖率: {width_coverage:.1f}%") - print(f" 纬度方向覆盖率: {height_coverage:.1f}%") - - print("=" * 60) - - return all_points + return all_points_array.tolist() + + print("[主解析] 错误: 未提取到任何点云数据") + return [] -def points_to_dem(points, output_dem_path, pixel_size=0.0001): - """将离散点云插值为DEM(GeoTIFF格式)- 使用Scipy插值优化版本""" +# ========== DEM生成函数 ========== +def points_to_dem(points, output_dem_path, pixel_size=None, quality='medium'): + """ + 将点云转换为DEM + :param quality: 质量等级 'low'|'medium'|'high' + """ if len(points) == 0: - raise ValueError("无有效点云数据,无法生成DEM") + raise ValueError("无点云数据,无法生成DEM") + + start_time = time.time() # 转换为numpy数组 points_array = np.array(points) @@ -583,74 +794,94 @@ def points_to_dem(points, output_dem_path, pixel_size=0.0001): lats = points_array[:, 1] heights = points_array[:, 2] + # 计算范围 min_lon, max_lon = lons.min(), lons.max() min_lat, max_lat = lats.min(), lats.max() - print(f"DEM范围: Lon[{min_lon:.6f}, {max_lon:.6f}], Lat[{min_lat:.6f}, {max_lat:.6f}]") - print(f"点云数量: {len(points)} 个") - print(f"高程范围: {heights.min():.2f} ~ {heights.max():.2f} 米") + print(f"[DEM生成] 点云范围:") + print(f" 经度: {min_lon:.6f}° ~ {max_lon:.6f}°") + print(f" 纬度: {min_lat:.6f}° ~ {max_lat:.6f}°") + print(f" 高程: {heights.min():.2f}m ~ {heights.max():.2f}m") + print(f" 点数: {len(points)}") + + # 自动确定像素大小 + if pixel_size is None: + area_km2 = ((max_lon - min_lon) * 111.32) * ((max_lat - min_lat) * 111.32) + + if quality == 'high': + pixel_size = 0.00005 # ~5米 + elif quality == 'medium': + pixel_size = 0.0001 # ~10米 + else: # low + pixel_size = 0.0002 # ~20米 + + # 根据点数调整 + if len(points) < 1000: + pixel_size = max(pixel_size, 0.0005) # 至少50米 + elif len(points) < 5000: + pixel_size = max(pixel_size, 0.0002) # 至少20米 # 计算网格尺寸 width = int((max_lon - min_lon) / pixel_size) + 1 height = int((max_lat - min_lat) / pixel_size) + 1 - # 限制网格大小,避免过大 - max_grid_size = 5000 # 最大网格尺寸 + # 限制最大网格大小 + max_grid_size = 10000 if width > max_grid_size or height > max_grid_size: - print(f"警告: 网格尺寸过大 ({width}x{height}),自动调整像素大小...") - # 重新计算像素大小 + print(f"[DEM生成] 警告: 网格过大({width}x{height}),调整像素大小") larger_dim = max(width, height) pixel_size = pixel_size * (larger_dim / max_grid_size) width = int((max_lon - min_lon) / pixel_size) + 1 height = int((max_lat - min_lat) / pixel_size) + 1 - print(f"调整后像素大小: {pixel_size:.6f}°") - print(f"DEM网格: {width}x{height} (像素大小: {pixel_size:.6f}°)") + print(f"[DEM生成] 网格设置:") + print(f" 像素大小: {pixel_size:.6f}° (~{pixel_size*111320:.1f}米)") + print(f" 网格尺寸: {width} × {height}") + print(f" 总像素数: {width * height:,}") - # 创建网格坐标 + # 创建网格 x_grid = np.linspace(min_lon, max_lon, width) - y_grid = np.linspace(max_lat, min_lat, height) # 纬度从上到下递减 + y_grid = np.linspace(max_lat, min_lat, height) # 纬度从上到下 xi, yi = np.meshgrid(x_grid, y_grid) - # 使用scipy进行插值 + # 插值 + print("[DEM生成] 开始插值计算...") from scipy.interpolate import griddata - print("开始插值计算...") - - # 方法1: 先尝试线性插值 try: + # 先尝试线性插值 zi = griddata((lons, lats), heights, (xi, yi), method='linear') nan_count = np.isnan(zi).sum() - nan_percent = nan_count / (width * height) * 100 - print(f"线性插值完成,空白区域: {nan_count} 像素 ({nan_percent:.1f}%)") - - # 如果有空白区域,使用最近邻方法填充 if nan_count > 0: - print("使用最近邻方法填充空白区域...") + print(f"[DEM生成] 线性插值空白: {nan_count} 像素 ({nan_count/(width*height)*100:.1f}%)") + + # 使用最近邻填充空白 zi_nn = griddata((lons, lats), heights, (xi, yi), method='nearest') mask = np.isnan(zi) zi[mask] = zi_nn[mask] - # 再次检查 + # 检查剩余空白 nan_count = np.isnan(zi).sum() if nan_count > 0: - print(f"仍有 {nan_count} 个空白像素,填充为最低高程值") + print(f"[DEM生成] 填充剩余空白: {nan_count} 像素") min_height = heights.min() zi[np.isnan(zi)] = min_height - except Exception as e: - print(f"线性插值失败: {e}") - print("尝试使用最近邻插值...") + print(f"[DEM生成] 插值失败: {e},使用最近邻插值") zi = griddata((lons, lats), heights, (xi, yi), method='nearest') # 创建GeoTIFF - print("创建GeoTIFF文件...") + print("[DEM生成] 创建GeoTIFF文件...") driver = gdal.GetDriverByName("GTiff") - dem_ds = driver.Create( - output_dem_path, width, height, 1, gdal.GDT_Float32, - options=["COMPRESS=LZW", "TILED=YES", "PREDICTOR=2", "ZLEVEL=9"] - ) + + # 压缩选项 + if quality == 'high': + options = ["COMPRESS=DEFLATE", "PREDICTOR=2", "ZLEVEL=9", "TILED=YES", "BIGTIFF=IF_SAFER"] + else: + options = ["COMPRESS=LZW", "TILED=YES"] + + dem_ds = driver.Create(output_dem_path, width, height, 1, gdal.GDT_Float32, options) if dem_ds is None: raise RuntimeError(f"无法创建DEM文件: {output_dem_path}") @@ -670,88 +901,155 @@ def points_to_dem(points, output_dem_path, pixel_size=0.0001): band = dem_ds.GetRasterBand(1) band.WriteArray(zi) band.SetNoDataValue(-9999.0) - band.SetDescription("Elevation") + band.SetDescription("Elevation (meters)") band.SetUnitType("meters") # 计算统计信息 - print("计算统计信息...") + print("[DEM生成] 计算统计信息...") band.FlushCache() band.ComputeStatistics(False) # 设置颜色表(可选) try: - import matplotlib.pyplot as plt - cmap = plt.cm.terrain - colors = cmap(np.linspace(0, 1, 256)) + from matplotlib.cm import terrain + colors = terrain(np.linspace(0, 1, 256)) colors = (colors[:, :3] * 255).astype(np.uint8) color_table = gdal.ColorTable() for i in range(256): - color_table.SetColorEntry(i, (colors[i, 0], colors[i, 1], colors[i, 2], 255)) + color_table.SetColorEntry(i, tuple(colors[i])) band.SetColorTable(color_table) band.SetColorInterpretation(gdal.GCI_PaletteIndex) except: - pass # 如果设置颜色表失败,继续 + pass dem_ds = None # 关闭文件 - print(f"DEM生成成功: {output_dem_path}") - print(f" 文件大小: {os.path.getsize(output_dem_path) / (1024*1024):.2f} MB") - - # 验证文件 + # 验证结果 if os.path.exists(output_dem_path): ds = gdal.Open(output_dem_path, gdal.GA_ReadOnly) if ds: band = ds.GetRasterBand(1) stats = band.GetStatistics(True, True) - print(f" DEM统计: 最小值={stats[0]:.2f}m, 最大值={stats[1]:.2f}m") - print(f" 平均值={stats[2]:.2f}m, 标准差={stats[3]:.2f}m") ds = None - else: - print("警告: DEM文件可能未成功生成") - -def generate_dem(REGION_COORDS=None, tileset_path=None, dem_path=None): - # 配置参数 - script_dir = os.path.dirname(os.path.abspath(__file__)) - if tileset_path: - TILESET_PATH = tileset_path - else: - TILESET_PATH = os.path.dirname(script_dir) + "/data/3dtiles/tileset.json" - if dem_path : - OUTPUT_DEM_PATH = dem_path - else : - OUTPUT_DEM_PATH = os.path.join(script_dir, f"o_dem_{uuid.uuid4().hex[:8]}.tif") - PIXEL_SIZE = 0.0001 + + print(f"[DEM生成] DEM统计:") + print(f" 最小值: {stats[0]:.2f}m") + print(f" 最大值: {stats[1]:.2f}m") + print(f" 平均值: {stats[2]:.2f}m") + print(f" 标准差: {stats[3]:.2f}m") - # 执行流程 + elapsed_time = time.time() - start_time + file_size_mb = os.path.getsize(output_dem_path) / (1024 * 1024) + + print(f"[DEM生成] 完成!") + print(f" 输出文件: {output_dem_path}") + print(f" 文件大小: {file_size_mb:.2f} MB") + print(f" 处理时间: {elapsed_time:.1f}秒") + + return output_dem_path + +# ========== 主函数 ========== +def generate_dem(tileset_path, dem_path=None, region_coords=None, + pixel_size=None, quality='medium', enable_enhancement=True, + debug=False): + """ + 生成DEM的主函数 + :param tileset_path: tileset.json路径 + :param output_path: 输出目录 + :param region_coords: 区域坐标 [(min_lon, min_lat), (max_lon, max_lat)] + :param pixel_size: DEM像素大小(度),None则自动确定 + :param quality: 质量等级 'low'|'medium'|'high' + :param enable_enhancement: 是否启用点云增强 + :param debug: 调试模式 + :return: DEM文件路径 + """ + print("=" * 60) + print("3D Tiles 到 DEM 转换器") print("=" * 60) - print("开始解析3D Tiles...") - if REGION_COORDS: - print(f"启用区域过滤: {REGION_COORDS}") - else: - print("未启用区域过滤,将处理所有数据") - points = parse_tileset(TILESET_PATH, REGION_COORDS) - print(f"解析完成,共提取点云: {len(points)}个") - - if len(points) > 0: - points_to_dem(points, OUTPUT_DEM_PATH, pixel_size=PIXEL_SIZE) - return OUTPUT_DEM_PATH - else: - print("无点云数据,无法生成DEM") + # 检查输入文件 + if not os.path.exists(tileset_path): + raise FileNotFoundError(f"输入文件不存在: {tileset_path}") + + # 设置输出路径 + if dem_path is None: + script_dir = os.path.dirname(os.path.abspath(__file__)) + dem_path = os.path.join(script_dir, f"o_dem_{uuid.uuid4().hex[:8]}.tif") + + # 解析点云 + print(f"[主函数] 开始解析3D Tiles数据...") + if region_coords: + print(f"[主函数] 区域过滤: {region_coords}") + + points = parse_tileset( + tileset_path, + region_coords=region_coords, + enable_enhancement=enable_enhancement, + debug=debug + ) + + if len(points) == 0: + print("[主函数] 错误: 未提取到点云数据") return None + + # 生成DEM + print(f"\n[主函数] 开始生成DEM...") + dem_path = points_to_dem( + points, + dem_path, + pixel_size=pixel_size, + quality=quality + ) + + print(f"\n[主函数] 转换完成!") + print(f" DEM文件: {dem_path}") + print(f" 点云数量: {len(points)}") + + return dem_path +# ========== 示例用法 ========== if __name__ == "__main__": - # 测试示例 - # 石棉县核心区域 - # SHIMIAN_CORE = [(100.22476304, 29.38340151), (110.32476304, 31.28340151)] + # 配置参数 + SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) + + # 1. 指定tileset.json路径 + # TILESET_PATH = os.path.join(SCRIPT_DIR, "../data/3dtiles/tileset.json") + TILESET_PATH = os.path.dirname(SCRIPT_DIR) + "/data/3dtiles/tileset.json" + + # 2. 可选:指定区域(经纬度) + # BEIJING_REGION = [(116.3, 39.9), (116.5, 40.1)] + # SHANGHAI_REGION = [(121.4, 31.1), (121.6, 31.3)] SHIMIAN_CORE = [(100.22476304, 29.18340151), (110.32476304, 31.28340151)] + SHIMIAN_CORE1 = [(102.216344, 29.376723), (102.218344, 29.378723)] + SHIMIAN_CORE2 = [(102.216344, 29.376723), (102.220344, 29.380723)] + SHIMIAN_CORE3 = [(102.216344, 29.376723), (102.222344, 29.382723)] + SHIMIAN_CORE4 = [(102.216344, 29.376723), (102.224344, 29.384723)] + REGION_COORDS = SHIMIAN_CORE3 # 不指定区域则处理全部数据 - # 可以根据需要启用或禁用区域过滤 - REGION_COORDS = SHIMIAN_CORE # 启用区域过滤 - # REGION_COORDS = None # 禁用区域过滤 + # 3. 可选:指定输出目录 + OUTPUT_DEM_PATH = os.path.join(SCRIPT_DIR, f"o_dem_{uuid.uuid4().hex[:8]}.tif") - dem_path = generate_dem(REGION_COORDS) - if dem_path: - print(f"\nDEM文件已生成: {dem_path}") \ No newline at end of file + # 4. 生成DEM + try: + dem_file = generate_dem( + tileset_path=TILESET_PATH, + dem_path=OUTPUT_DEM_PATH, + region_coords=REGION_COORDS, + quality='medium', # 质量等级 + enable_enhancement=True, # 启用点云增强 + debug=False # 调试模式 + ) + + if dem_file: + print(f"\nDEM生成成功!") + print(f" 文件位置: {dem_file}") + print(f" 文件大小: {os.path.getsize(dem_file) / (1024*1024):.2f} MB") + else: + print("\nDEM生成失败!") + + except Exception as e: + print(f"\n处理失败: {e}") + import traceback + traceback.print_exc() \ No newline at end of file diff --git a/b3dm/earthwork_api.py b/b3dm/earthwork_api.py index 246fb97..5345eb7 100644 --- a/b3dm/earthwork_api.py +++ b/b3dm/earthwork_api.py @@ -3,10 +3,10 @@ from sanic import Blueprint, Request, json from sanic.response import text from typing import Dict, Any import logging -from earthwork_calculator_point_cloud import EarthworkCalculatorPointCloud +from b3dm.earthwork_calculator_point_cloud import EarthworkCalculatorPointCloud # 导入计算模块 -from earthwork_calculator_3d_tiles import EarthworkCalculator3dTiles, AlgorithmType, EarthworkResult3dTiles -from tileset_data_source import TilesetDataSource +from b3dm.earthwork_calculator_3d_tiles import EarthworkCalculator3dTiles, AlgorithmType, EarthworkResult3dTiles +from b3dm.tileset_data_source import TilesetDataSource earthwork_bp = Blueprint("earthwork", url_prefix="") diff --git a/b3dm/terrain3d_analyzer_color.py b/b3dm/slope_aspect_img.py similarity index 99% rename from b3dm/terrain3d_analyzer_color.py rename to b3dm/slope_aspect_img.py index 6dfa19c..922c126 100644 --- a/b3dm/terrain3d_analyzer_color.py +++ b/b3dm/slope_aspect_img.py @@ -1,4 +1,4 @@ -# terrain3d_analyzer_color13 +# slope_aspect_img13 import numpy as np import matplotlib.pyplot as plt from scipy.ndimage import sobel diff --git a/b3dm/terrain_api.py b/b3dm/terrain_api.py index b22e906..70e8733 100644 --- a/b3dm/terrain_api.py +++ b/b3dm/terrain_api.py @@ -7,7 +7,8 @@ import time import uuid import urllib.parse import threading -from terrain_calculator import TerrainCalculator +import os +from b3dm.terrain_calculator import TerrainCalculator terrain_bp = Blueprint("terrain", url_prefix="") MINIO_SUB_PATH = "slopeAspectPng" @@ -54,7 +55,6 @@ class PointItem(BaseModel): z: float = Field(..., description="z坐标") class PointRequest(BaseModel): - """使用Pydantic HttpUrl类型的严格点批量请求模型""" points: List[PointItem] = Field(..., description="点列表") url: str = Field(..., description="URL地址") @@ -63,6 +63,9 @@ class PointRequest(BaseModel): if len(v) > 10: raise ValueError("批量处理最多支持10个点") return v + +class PreloadRequest(BaseModel): + url: str = Field(..., description="URL地址") class AnalysisConfig(BaseModel): """分析配置""" @@ -154,7 +157,43 @@ async def calculate_aspect1(request: Request): "error": f"服务器内部错误: {str(e)}", "success": False }, status=500) - + +@terrain_bp.post("/api/v1/calculate/preload3dTiles") +async def preload_3dtiles(request: Request): + """预加载3dtiles地图数据""" + try: + data = request.json + if not data: + return json({"error": "请求体不能为空"}, status=400) + + # 验证输入 + try: + vector = PreloadRequest(**data) + except Exception as e: + return json({"error": f"输入验证失败: {str(e)}"}, status=400) + + # 创建并启动线程 + script_dir = os.path.dirname(os.path.abspath(__file__)) + thread1 = threading.Thread(target=TerrainCalculator.preload_3dtiles, args=(vector.url)) + # 启动线程 + thread1.start() + url_prefix = extract_and_rebuild_url(vector.url) + return json({ + "success": True, + "data": f"{script_dir}/data_3dtiles", + "request": { + "input_vector": vector.model_dump(), + "timestamp": time.time() + } + }) + + except Exception as e: + logger.error(f"坡向计算API错误: {e}") + return json({ + "error": f"服务器内部错误: {str(e)}", + "success": False + }, status=500) + @terrain_bp.post("/api/v1/calculate/slopeAspect") async def calculate_slopeAspect(request: Request): """生成坡向坡度俯视图""" diff --git a/b3dm/terrain_calculator.py b/b3dm/terrain_calculator.py index 5f45082..8b5017a 100644 --- a/b3dm/terrain_calculator.py +++ b/b3dm/terrain_calculator.py @@ -3,20 +3,40 @@ from typing import List, Tuple, Dict, Any import logging import os import uuid -from data_3dtiles_manager import MinIO3DTilesManager -import data_3dtiles_to_dem -import terrain3d_analyzer_color +from b3dm.data_3dtiles_manager import MinIO3DTilesManager +import b3dm.data_3dtiles_to_dem as data_3dtiles_to_dem +import b3dm.slope_aspect_img as slope_aspect_img logger = logging.getLogger(__name__) +ENDPOINT_URL = "222.212.85.86:9000" +ACCESS_KEY = "WuRenJi" +SECRET_KEY = "WRJ@2024" + + class TerrainCalculator: """地形坡度和坡向计算器""" + def preload_3dtiles(url) : + # 下载3dtiles地图数据 + manager = MinIO3DTilesManager( + endpoint_url=ENDPOINT_URL, + access_key=ACCESS_KEY, + secret_key=SECRET_KEY, + secure=False + ) + script_dir = os.path.dirname(os.path.abspath(__file__)) + success, entry_local_path = manager.download_full_tileset( + tileset_url=url, + save_dir=f"{script_dir}/data_3dtiles", + region_filter=None + ) + if not success : + logger.info(f"下载地图数据失败: {url}") + return "下载地图数据失败", None + def generate_slopeAspect_3d_overlook(region_coords, url, overall_3d_png_name, minio_sub_path) : # 下载3dtiles地图数据 - ENDPOINT_URL = "222.212.85.86:9000" - ACCESS_KEY = "WuRenJi" - SECRET_KEY = "WRJ@2024" manager = MinIO3DTilesManager( endpoint_url=ENDPOINT_URL, access_key=ACCESS_KEY, @@ -35,14 +55,14 @@ class TerrainCalculator: tileset_path = entry_local_path dem_path = os.path.join(script_dir, f"o_dem_{uuid.uuid4().hex[:8]}.tif") - data_3dtiles_to_dem.generate_dem(region_coords, tileset_path, dem_path) + data_3dtiles_to_dem.generate_dem(tileset_path, dem_path, region_coords) if not os.path.exists(dem_path) : logger.info(f"生成坡度坡向俯视图失败: {url},{region_coords}") return "生成坡度坡向俯视图失败", None overall_3d_png_path = os.path.join(script_dir, overall_3d_png_name) - terrain3d_analyzer_color.read_slope_aspect_by_dem(dem_path, overall_3d_png_path) + slope_aspect_img.read_slope_aspect_by_dem(dem_path, overall_3d_png_path) logger.info(f"生成成功: {url},{region_coords},{overall_3d_png_path}") entry_bucket, _ = manager.parse_minio_url(url);