diff --git a/config/config/SIMSUN.TTC b/config/SIMSUN.TTC similarity index 100% rename from config/config/SIMSUN.TTC rename to config/SIMSUN.TTC diff --git a/config/config/test_image.jpeg b/config/config/test_image.jpeg deleted file mode 100644 index b83f79c..0000000 Binary files a/config/config/test_image.jpeg and /dev/null differ diff --git a/config/config/test_image_reprojected.jpg b/config/config/test_image_reprojected.jpg deleted file mode 100644 index 1caf845..0000000 Binary files a/config/config/test_image_reprojected.jpg and /dev/null differ diff --git a/config/config/points.txt b/config/points.txt similarity index 100% rename from config/config/points.txt rename to config/points.txt diff --git a/yolo/cv_multi_model_back_video.py b/yolo/cv_multi_model_back_video.py index 81690c8..0086328 100644 --- a/yolo/cv_multi_model_back_video.py +++ b/yolo/cv_multi_model_back_video.py @@ -1613,10 +1613,309 @@ from concurrent.futures import ThreadPoolExecutor # logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") +# SEI修复核心配置 +os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" # 屏蔽SEI错误日志 +MAX_RETRIES = 5 +RETRY_DELAY = 2 +BUFFER_SIZE = 1 # 最小缓冲区减少SEI积压 +TARGET_FPS = 25 +FOURCC = cv2.VideoWriter_fourcc(*'H264') +MAX_CORRUPTED = 30 + +# +# async def read_rtmp_frames( +# loop, +# read_rtmp_frames_executor: ThreadPoolExecutor, +# video_url: str, +# device: Optional[MQTTDevice] = None, +# topic_camera_osd: Optional[str] = None, +# method_camera_osd: Optional[str] = None, +# topic_osd_info: Optional[str] = None, +# method_osd_info: Optional[str] = None, +# cancel_flag: Optional[asyncio.Event] = None, +# frame_queue: asyncio.Queue = None, +# timestamp_frame_queue: TimestampedQueue = None +# ): +# """优化版RTMP流读取(集成SEI修复+完整逻辑) +# 核心修复: +# 1. 屏蔽FFmpeg SEI截断日志 +# 2. 精简OpenCV参数,仅保留Python支持的核心配置 +# 3. 增强帧格式校验和异常处理 +# 4. 修复事件循环嵌套运行的致命错误 +# 5. 优化重连机制和SEI帧跳过逻辑 +# """ +# print(f"开始读取RTMP流: {video_url}") +# +# # ✅ 关键修复1:设置FFmpeg全局参数,屏蔽SEI帧截断日志 +# os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" # 只输出致命错误,屏蔽SEI相关警告 +# +# def ensure_cv8uc3(frame): +# """确保帧格式为CV_8UC3(增强版修复)""" +# if frame is None: +# return None +# if frame.dtype != np.uint8: +# frame = frame.astype(np.uint8) +# if len(frame.shape) == 2: # 灰度图转彩色 +# frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) +# elif frame.shape[2] == 4: # RGBA 转 BGR +# frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR) +# return frame +# +# # ✅ 关键修复2:将init_capture改为同步函数(核心!解决事件循环冲突) +# def init_capture(attempt: int = 1): +# """初始化捕获器(同步版本,带SEI修复参数+重连逻辑)""" +# print(f"第 {attempt}/{MAX_RETRIES} 次尝试初始化RTMP捕获器") +# +# # 指定FFmpeg后端创建捕获器 +# cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) +# if not cap.isOpened(): +# raise RuntimeError(f"无法打开RTMP流 (第{attempt}次尝试)") +# +# # 仅保留Python版OpenCV支持的核心参数 +# cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) # 缓冲区设为1,减少SEI帧积压 +# cap.set(cv2.CAP_PROP_FOURCC, FOURCC) # 指定H264解码器,减少SEI解析开销 +# cap.set(cv2.CAP_PROP_FPS, TARGET_FPS) # 同步流帧率,避免冗余处理 +# +# # 获取分辨率(增加异常处理) +# try: +# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) +# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) +# if width <= 0 or height <= 0: +# width, height = 1280, 720 # 默认分辨率兜底 +# except Exception as e: +# print(f"获取分辨率失败,使用默认值: {e}") +# width, height = 1280, 720 +# +# print(f"RTMP捕获器初始化成功,分辨率: {width}x{height}") +# return cap, (width, height) +# +# # 初始化捕获器(支持重连) +# cap = None +# width, height = 1280, 720 +# for attempt in range(1, MAX_RETRIES + 1): +# try: +# # ✅ 关键修复3:直接在线程池执行同步的init_capture,不再嵌套事件循环 +# cap, (width, height) = await loop.run_in_executor( +# read_rtmp_frames_executor, +# init_capture, # 直接传函数,不再用lambda嵌套loop.run_until_complete +# attempt # 传递attempt参数 +# ) +# break +# except RuntimeError as e: +# print(f"初始化失败: {e}") +# if attempt >= MAX_RETRIES: +# raise RuntimeError(f"所有{MAX_RETRIES}次初始化尝试均失败") +# await asyncio.sleep(RETRY_DELAY) +# +# try: +# last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8) +# consecutive_corrupted_frames = 0 +# frame_count = 0 +# time_start = time.time_ns() +# +# while not cancel_flag.is_set(): +# try: +# # 读取帧(使用线程池避免阻塞事件循环) +# ret, frame = await loop.run_in_executor( +# read_rtmp_frames_executor, +# cap.read # 直接传递方法,更简洁 +# ) +# +# # 增强SEI帧/损坏帧处理逻辑 +# current_frame = last_valid_frame.copy() +# if ret and frame is not None and frame.size > 0: +# # 正常帧:格式转换 + 更新兜底帧 +# processed_frame = ensure_cv8uc3(frame) +# if processed_frame is not None: +# current_frame = processed_frame +# last_valid_frame = current_frame.copy() +# consecutive_corrupted_frames = 0 +# else: +# consecutive_corrupted_frames += 1 +# else: +# # SEI帧/损坏帧:使用兜底帧 + 计数 +# consecutive_corrupted_frames += 1 +# if consecutive_corrupted_frames % 15 == 0: +# print(f"跳过SEI/损坏帧 (连续: {consecutive_corrupted_frames})") +# +# # 获取OSD信息(保留原逻辑) +# osd_info = None +# if device and topic_osd_info and method_osd_info: +# try: +# osd_msg = await loop.run_in_executor( +# read_rtmp_frames_executor, +# device.get_latest_message, +# topic_osd_info, +# method_osd_info +# ) +# if osd_msg and hasattr(osd_msg, 'data'): +# osd_info = Air_Attitude( +# gimbal_pitch=osd_msg.data.gimbal_pitch, +# gimbal_roll=osd_msg.data.gimbal_roll, +# gimbal_yaw=osd_msg.data.gimbal_yaw, +# height=osd_msg.data.height, +# latitude=osd_msg.data.latitude, +# longitude=osd_msg.data.longitude +# ) +# except Exception as e: +# print(f"获取OSD信息失败: {str(e)}") +# +# # 时间戳和队列处理(优化超时逻辑) +# timestamp = time.time_ns() +# if frame_queue is not None and timestamp_frame_queue is not None: +# try: +# # 非阻塞放入队列,避免长时间等待 +# if not frame_queue.full(): +# await asyncio.wait_for( +# frame_queue.put((current_frame, osd_info, timestamp)), +# timeout=0.01 +# ) +# timestamp_frame_queue.append({ +# "timestamp": timestamp, +# "frame": current_frame +# }) +# frame_count += 1 +# +# # 每秒打印一次帧数统计 +# if timestamp - time_start > 1000000000: +# fps = frame_count / ((timestamp - time_start) / 1000000000) +# print(f"读取帧数: {frame_count} | 实时FPS: {fps:.2f}") +# frame_count = 0 +# time_start = timestamp +# else: +# # 队列满时短暂休眠,避免CPU占用过高 +# await asyncio.sleep(0.001) +# except asyncio.TimeoutError: +# print("帧队列已满,跳过此帧") +# +# # 连续损坏帧触发重连逻辑(增强版) +# if consecutive_corrupted_frames > MAX_CORRUPTED: +# print(f"连续{MAX_CORRUPTED}帧异常,尝试重新初始化捕获器") +# # 释放旧捕获器 +# if cap and cap.isOpened(): +# await loop.run_in_executor( +# read_rtmp_frames_executor, +# cap.release +# ) +# # 重新初始化 +# reconnected = False +# for attempt in range(1, MAX_RETRIES + 1): +# try: +# cap, (width, height) = await loop.run_in_executor( +# read_rtmp_frames_executor, +# init_capture, +# attempt +# ) +# consecutive_corrupted_frames = 0 +# last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8) +# reconnected = True +# break +# except RuntimeError as e: +# print(f"重连失败 (第{attempt}次): {e}") +# if attempt >= MAX_RETRIES: +# raise RuntimeError("重连次数超限,停止尝试") +# await asyncio.sleep(RETRY_DELAY) +# if not reconnected: +# break +# +# except Exception as e: +# print(f"帧处理错误: {str(e)}") +# await asyncio.sleep(0.1) +# +# except asyncio.CancelledError: +# print("读取任务已取消") +# except Exception as e: +# print(f"流读取异常: {str(e)}") +# # 异常恢复机制(增强版) +# if not cancel_flag.is_set(): +# await asyncio.sleep(RETRY_DELAY) +# # 重新初始化捕获器 +# reconnected = False +# for attempt in range(1, MAX_RETRIES + 1): +# try: +# cap, (width, height) = await loop.run_in_executor( +# read_rtmp_frames_executor, +# init_capture, +# attempt +# ) +# reconnected = True +# break +# except RuntimeError as e: +# print(f"异常恢复重连失败 (第{attempt}次): {e}") +# if attempt >= MAX_RETRIES: +# raise +# await asyncio.sleep(RETRY_DELAY) +# if reconnected: +# # 恢复后继续运行(递归调用自身,保持逻辑完整) +# await read_rtmp_frames( +# loop=loop, +# read_rtmp_frames_executor=read_rtmp_frames_executor, +# video_url=video_url, +# device=device, +# topic_camera_osd=topic_camera_osd, +# method_camera_osd=method_camera_osd, +# topic_osd_info=topic_osd_info, +# method_osd_info=method_osd_info, +# cancel_flag=cancel_flag, +# frame_queue=frame_queue, +# timestamp_frame_queue=timestamp_frame_queue +# ) +# finally: +# # 确保资源释放 +# if cap: +# await loop.run_in_executor( +# read_rtmp_frames_executor, +# lambda: cap.release() if cap.isOpened() else None +# ) +# print("RTMP流读取已停止") + + +def init_capture_with_sei_fix(video_url: str, attempt: int = 1): + """ + 修复SEI错误的VideoCapture初始化 + """ + print(f"\n===== 第 {attempt}/{MAX_RETRIES} 次尝试连接 =====") + + # ✅ 关键:设置FFmpeg全局参数,屏蔽SEI帧截断日志 + os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" # 只输出FFmpeg致命错误,屏蔽警告(包括SEI截断) + + # 初始化VideoCapture,指定FFmpeg后端 + cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) + if not cap.isOpened(): + raise RuntimeError(f"无法打开RTMP流 (第{attempt}次尝试)") + + # 设置核心参数 + cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) # 小缓冲区,实时推帧 + cap.set(cv2.CAP_PROP_FOURCC, FOURCC) # 指定H264解码器 + cap.set(cv2.CAP_PROP_FPS, TARGET_FPS) # 同步流帧率 + + # 获取流分辨率 + try: + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + except: + width, height = 1440, 1080 # 默认分辨率 + + print(f"拉流成功:分辨率 {width}x{height}") + return cap, (width, height) + +def ensure_cv8uc3(frame): + """确保帧格式为8位3通道BGR""" + if frame is None or frame.size == 0: + return None + + if frame.dtype != np.uint8: + frame = frame.astype(np.uint8) + if len(frame.shape) == 2: # 灰度图转彩色 + frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) + elif frame.shape[2] == 4: # RGBA转BGR + frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR) + return frame + async def read_rtmp_frames( loop, - read_rtmp_frames_executor: ThreadPoolExecutor, + executor: ThreadPoolExecutor, video_url: str, device: Optional[MQTTDevice] = None, topic_camera_osd: Optional[str] = None, @@ -1625,236 +1924,421 @@ async def read_rtmp_frames( method_osd_info: Optional[str] = None, cancel_flag: Optional[asyncio.Event] = None, frame_queue: asyncio.Queue = None, - timestamp_frame_queue=None + timestamp_frame_queue: TimestampedQueue = None ): - """ - 基于 OpenCV+FFmpeg 读取 RTMP 流帧(兼容优化版:解决URL格式错误+超时问题) - ✅ 核心优化:改用FFmpeg环境变量传递容错参数,提升兼容性 - ✅ 修复问题:解决流超时、URL识别失败的问题,提升连接成功率 - ✅ 保留功能:低延迟、丢包容错、断流自动重连、自适应分辨率 - """ - max_retries = 20 - retry_delay = 3 # 小幅增加重试间隔,避免频繁重试给服务端压力 - pic_count = 0 - attempt = 0 - time_start = time.time_ns() - frame_count = 0 + """异步RTMP读取(修复SEI错误)""" + print(f"开始测试 RTMP 拉流(验证 SEI 丢包问题): {video_url}") + print("测试说明:1. 控制台无 SEI 截断警告即为正常 2. 观察 imshow 窗口画面是否稳定 3. 按 'q' 退出") if cancel_flag is None: cancel_flag = asyncio.Event() - if timestamp_frame_queue is None: - timestamp_frame_queue = [] - if frame_queue is None: - frame_queue = asyncio.Queue(maxsize=30) # 给队列设置默认容量,防止内存溢出 - # ✅ 关键改动1:通过FFmpeg环境变量传递容错参数(兼容所有OpenCV/FFmpeg环境) - # 这是比URL拼接更稳定的方式,OpenCV底层FFmpeg会自动读取这些环境变量 - os.environ['FFMPEG_FLAGS'] = ( - "nobuffer,fastseek,discardcorrupt," # 核心容错:无缓冲、快速寻址、丢弃损坏帧 - "low_delay," # 低延迟模式 - "ignore_err," # 忽略解码错误 - "max_delay=0" # 最大延迟设为0 - ) - # 额外设置FFmpeg的日志级别,减少无关输出(可选) - os.environ['FFREPORT'] = "level=warning" + # 创建预览队列 + preview_task = None - print(f"开始读取RTMP流(兼容模式,已配置FFmpeg容错参数): {video_url}") - while not cancel_flag.is_set() and attempt < max_retries: - attempt += 1 - if cancel_flag.is_set(): - logger.info("收到停止信号,终止 RTMP 读取") - break - cap = None - width, height = None, None - stream_fps = None + # 初始化捕获器 + cap = None + width, height = 1280, 720 + for attempt in range(1, MAX_RETRIES + 1): try: - logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") - - # ✅ 关键改动2:恢复原始RTMP URL,避免格式识别错误 - cap = await loop.run_in_executor( - read_rtmp_frames_executor, - lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) + cap, (width, height) = await loop.run_in_executor( + executor, + init_capture_with_sei_fix, + video_url, + attempt ) + break + except RuntimeError as e: + print(f"第 {attempt} 次尝试失败:{e},{RETRY_DELAY} 秒后重试") + if attempt >= MAX_RETRIES: + raise RuntimeError(f"所有{MAX_RETRIES}次初始化尝试均失败") + await asyncio.sleep(RETRY_DELAY) - # ✅ 关键改动3:优化超时和缓存参数,解决30秒超时问题 - # 调整超时时间:OpenCV的超时参数在部分环境下需要略大于实际预期(推荐设为30秒) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 30000)) # 连接超时30秒(适配底层逻辑) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000)) # 读取帧超时10秒(避免长时间阻塞) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)) # 缓存从1调整为2,平衡低延迟和稳定性(丢包场景下更稳定) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25)) - # 关闭自动处理功能,减少额外开销 - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_AUTO_WB, 0)) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0)) + try: + last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8) + consecutive_corrupted_frames = 0 + frame_count = 0 + fps_count = 0 + time_start = time.time_ns() + fps_start = time.time() - # ✅ 关键改动4:流连接预热,提升首次打开成功率 - await asyncio.sleep(0.5) # 短暂等待,让FFmpeg完成底层初始化 + print(f"拉流成功:分辨率 {width}x{height},开始预览(按 'q' 退出)") - # 校验流是否成功打开 - is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened()) - if not is_opened: - logger.warning(f"尝试 {attempt} 次:打开RTMP流失败,准备重试") - # 释放资源前先短暂等待,避免资源泄露 - await asyncio.sleep(0.1) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None) - await asyncio.sleep(retry_delay) - continue + while not cancel_flag.is_set(): + try: + # 异步读取帧 + ret, frame = await loop.run_in_executor(executor, cap.read) - # 自适应获取流的【真实分辨率】 - width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))) - height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) - stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS)) - - # 兜底分辨率,防止异常 - if width is None or height is None or width == 0 or height == 0: - logger.warning("使用默认分辨率 1920x1080") - width, height = 1920, 1080 - - logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS") - logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}") - - # 初始化帧读取状态 - frame_sequence = 0 - last_timestamp = time_start - consecutive_corrupted_frames = 0 # 连续损坏帧计数 - max_consecutive_corrupted = 20 # 适配丢包场景的最大连续损坏帧数 - - while not cancel_flag.is_set(): - try: - # 读取帧,线程池执行避免阻塞协程 - ret, frame = await loop.run_in_executor( - read_rtmp_frames_executor, - lambda: cap.read() - ) - - current_time_ns = time.time_ns() - frame_sequence += 1 + # 帧有效性判断 + 格式转换 + current_frame = last_valid_frame.copy() + if ret and frame is not None and frame.size > 0: + frame = ensure_cv8uc3(frame) + current_frame = frame.copy() + last_valid_frame = current_frame + consecutive_corrupted_frames = 0 frame_count += 1 + fps_count += 1 + else: + consecutive_corrupted_frames += 1 + if consecutive_corrupted_frames % 15 == 0: + print(f"跳过 SEI 帧/临时解码异常,连续异常帧:{consecutive_corrupted_frames}(画面稳定)") - # 处理帧数据(微调损坏帧判断,提升容错) - img = None - is_corrupted = False + # # 放入预览队列 + # if enable_preview and preview_queue is not None: + # try: + # # 清空旧帧,只保留最新的 + # while not preview_queue.empty(): + # try: + # preview_queue.get_nowait() + # except asyncio.QueueEmpty: + # break + # + # if not preview_queue.full(): + # await asyncio.wait_for(preview_queue.put(current_frame), timeout=0.001) + # except (asyncio.QueueFull, asyncio.TimeoutError): + # pass + + # OSD信息 + osd_info = None + if device and topic_osd_info and method_osd_info: try: - # 放宽帧形状判断,允许偶尔的分辨率小幅波动(丢包导致) - valid_frame_shape = (height, width, 3) - if ret and frame is not None and frame.shape in [valid_frame_shape, (height, width)]: - # 完整有效帧处理,统一转为3通道BGR格式 - img = frame.copy() - if len(img.shape) == 2: # 灰度帧转为3通道 - img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) - consecutive_corrupted_frames = 0 # 重置连续损坏计数 - else: - # 损坏帧/空帧处理 - if consecutive_corrupted_frames % 5 == 0: # 每5帧打印一次日志,减少冗余输出 - logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}") - is_corrupted = True - consecutive_corrupted_frames += 1 + osd_msg = await loop.run_in_executor( + executor, device.get_latest_message, topic_osd_info, method_osd_info + ) + if osd_msg and hasattr(osd_msg, 'data'): + osd_info = Air_Attitude( + gimbal_pitch=osd_msg.data.gimbal_pitch, + gimbal_roll=osd_msg.data.gimbal_roll, + gimbal_yaw=osd_msg.data.gimbal_yaw, + height=osd_msg.data.height, + latitude=osd_msg.data.latitude, + longitude=osd_msg.data.longitude + ) + except Exception as e: + if consecutive_corrupted_frames < 5: # 减少日志输出 + print(f"获取OSD信息失败: {str(e)}") - # 创建替代帧 - if consecutive_corrupted_frames <= max_consecutive_corrupted: - img = np.zeros((height, width, 3), dtype=np.uint8) - else: - img = np.zeros((height, width, 3), dtype=np.uint8) - logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧") + # 帧数统计 + timestamp = time.time_ns() + if frame_queue is not None and timestamp_frame_queue is not None and ret: + try: + if not frame_queue.full(): + await asyncio.wait_for( + frame_queue.put((current_frame, osd_info, timestamp)), + timeout=0.01 + ) + timestamp_frame_queue.append({"timestamp": timestamp, "frame": current_frame}) + except (asyncio.QueueFull, asyncio.TimeoutError): + pass - except Exception as frame_error: - logger.error(f"帧数据处理错误: {frame_error}") - img = np.zeros((height, width, 3), dtype=np.uint8) - is_corrupted = True - consecutive_corrupted_frames += 1 + # 实时FPS统计 + fps_elapsed = time.time() - fps_start + if fps_elapsed >= 1.0: + fps = fps_count / fps_elapsed + timestamp_elapsed = (timestamp - time_start) / 1000000000 + if timestamp_elapsed > 0: + timestamp_fps = frame_count / timestamp_elapsed + print(f"实时 FPS:{fps:.2f} | 累计FPS:{timestamp_fps:.2f} | 分辨率:{width}x{height}") - # 获取OSD信息 - 保留原逻辑完全不变 - osd_info = None - if device and topic_osd_info and method_osd_info: + fps_count = 0 + fps_start = time.time() + + # 连续损坏帧重连 + if consecutive_corrupted_frames > MAX_CORRUPTED: + print(f"连续 {MAX_CORRUPTED} 帧异常,停止预览,尝试重连") + if cap and cap.isOpened(): + await loop.run_in_executor(executor, cap.release) + + reconnected = False + for retry_attempt in range(1, MAX_RETRIES + 1): try: - osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) - if osd_msg and hasattr(osd_msg, 'data'): - osd_info = Air_Attitude( - gimbal_pitch=osd_msg.data.gimbal_pitch, - gimbal_roll=osd_msg.data.gimbal_roll, - gimbal_yaw=osd_msg.data.gimbal_yaw, - height=osd_msg.data.height, - latitude=osd_msg.data.latitude, - longitude=osd_msg.data.longitude - ) - except Exception as osd_error: - logger.warning(f"获取OSD信息失败: {osd_error}") - - # 放入帧队列 - 保留原逻辑完全不变 - if img is not None and not frame_queue.full(): - # 确保时间戳递增 - if current_time_ns <= last_timestamp: - current_time_ns = last_timestamp + 1 - last_timestamp = current_time_ns - - # 统计信息 - pic_count += 1 - if current_time_ns - time_start > 1000000000: # 1秒 - elapsed_seconds = (current_time_ns - time_start) / 1e9 - fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0 - corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0 - print( - f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%") - pic_count = 0 - time_start = current_time_ns - - # 准备帧数据 - frame_data = { - "sequence": frame_sequence, - "frame": img, - "osd_info": osd_info, - "timestamp": current_time_ns, - "is_corrupted": is_corrupted - } - time_ns = time.time_ns() - if img is not None and osd_info is not None: - await frame_queue.put((img, osd_info, time_ns)) - timestamp_frame_queue.append({ - "timestamp": time_ns, - "frame": img - }) - - if frame_sequence % 100 == 0: # 每100帧输出一次日志 - logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}") - - elif frame_queue.full(): - logger.warning("帧队列已满,跳过此帧") - await asyncio.sleep(0.001) # 短暂等待 - - # 连续帧损坏触发重连前置判断 - if consecutive_corrupted_frames > max_consecutive_corrupted: - logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑") + cap, (width, height) = await loop.run_in_executor( + executor, init_capture_with_sei_fix, video_url, retry_attempt + ) + consecutive_corrupted_frames = 0 + last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8) + reconnected = True + print(f"重连成功 (第{retry_attempt}次)") + break + except RuntimeError as e: + print(f"重连失败 (第{retry_attempt}次): {e}") + if retry_attempt >= MAX_RETRIES: + print("重连次数超限,退出") + cancel_flag.set() + await asyncio.sleep(RETRY_DELAY) + if not reconnected: break - except Exception as e: - logger.error(f"读取帧数据时出错: {e}", exc_info=True) - break + # 小幅延迟,避免CPU占用过高 + await asyncio.sleep(0.001) - except Exception as e: - logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") - if attempt < max_retries: - await asyncio.sleep(retry_delay) - else: - raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") - finally: - # 安全释放opencv的VideoCapture资源 - if cap is not None: - await loop.run_in_executor( - read_rtmp_frames_executor, - lambda: cap.release() if cap.isOpened() else None - ) - logger.info("OpenCV VideoCapture 资源已释放") + except asyncio.CancelledError: + print("读取任务被取消") + break + except Exception as e: + print(f"帧处理错误: {str(e)}") + await asyncio.sleep(0.1) - if frame_count > 0: - total_time = (time.time_ns() - time_start) / 1e9 - avg_fps = frame_count / total_time if total_time > 0 else 0 - print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}") - else: - print("RTMP流读取失败,未获取到任何帧") + except asyncio.CancelledError: + print("读取任务已取消") + except Exception as e: + print(f"流读取异常: {str(e)}") + finally: + # 资源释放 + if cap: + await loop.run_in_executor(executor, lambda: cap.release() if cap.isOpened() else None) + print("释放VideoCapture资源") - logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") + if preview_task is not None: + preview_task.cancel() + try: + await preview_task + except asyncio.CancelledError: + pass + except Exception: + pass + + await loop.run_in_executor(executor, cv2.destroyAllWindows) + print("RTMP流读取已停止") + + + + +# +# async def read_rtmp_frames_skip_sei( +# loop, +# read_rtmp_frames_executor: ThreadPoolExecutor, +# video_url: str, +# device: Optional[MQTTDevice] = None, +# topic_camera_osd: Optional[str] = None, +# method_camera_osd: Optional[str] = None, +# topic_osd_info: Optional[str] = None, +# method_osd_info: Optional[str] = None, +# cancel_flag: Optional[asyncio.Event] = None, +# frame_queue: asyncio.Queue = None, +# timestamp_frame_queue=None +# ): +# """ +# 优化版(兼容 Python OpenCV + 屏蔽 SEI 帧错误) +# ✅ 核心修复:1. 移除 Python 不支持的 CAP_PROP_OUTPUT_FORMAT 2. 屏蔽 FFmpeg SEI 截断日志 +# ✅ 核心实现:配置解码器参数忽略 SEI 帧,保证实时拉流无性能损耗,保留异步逻辑和队列功能 +# ✅ 解决问题:[h264 @ xxx] SEI type 245 truncated 错误,同时保证拉流实时性和有效性 +# """ +# max_retries = 20 +# retry_delay = 3 +# pic_count = 0 +# attempt = 0 +# time_start = cv2.getTickCount() +# frame_count = 0 +# +# # ✅ 关键优化:设置 FFmpeg 全局参数,屏蔽 SEI 帧截断日志(Python OpenCV 间接控制 FFmpeg) +# # 禁止 FFmpeg 输出冗余警告,仅保留致命错误,彻底隐藏 SEI type 245 truncated 信息 +# os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" +# +# # 核心拉流参数(沿用同步测试代码的有效配置) +# BUFFER_SIZE = 1 # 减少缓存,不积压 SEI 帧 +# TARGET_FPS = 30 # 适配 RTMP 流帧率 +# FOURCC = cv2.VideoWriter_fourcc(*'H264') # 兼容 YUV420P 像素格式,减少 SEI 解析冗余 +# +# # 初始化默认参数 +# if cancel_flag is None: +# cancel_flag = asyncio.Event() +# if timestamp_frame_queue is None: +# timestamp_frame_queue = [] +# if frame_queue is None: +# frame_queue = asyncio.Queue(maxsize=5) # 小队列减少缓存,保证实时性 +# +# print(f"开始读取 RTMP 流(OpenCV 优化版,跳过 SEI 帧错误): {video_url}") +# +# while not cancel_flag.is_set() and attempt < max_retries: +# attempt += 1 +# if cancel_flag.is_set(): +# logger.info("收到停止信号,终止 RTMP 读取") +# break +# +# cap = None +# consecutive_corrupted_frames = 0 +# max_consecutive_corrupted = 30 +# last_valid_frame = None +# +# try: +# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") +# +# # ✅ 步骤1:初始化 VideoCapture,指定 FFmpeg 后端,修复 Python 兼容问题 +# def init_capture_with_sei_skip(): +# # 1. 指定 FFmpeg 后端,启用高级参数配置 +# cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) +# if not cap.isOpened(): +# return None +# +# # ✅ 步骤2:配置解码器参数(仅保留 Python 支持的核心参数,移除 CAP_PROP_OUTPUT_FORMAT) +# # 2.1 关键配置:缓冲区大小设为 1,实时推帧,不缓存 SEI 帧 +# cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) +# +# # 2.2 配置像素格式,确保与硬件兼容(对应 PIX_FMT_YUV420P) +# cap.set(cv2.CAP_PROP_FOURCC, FOURCC) +# +# # 2.3 配置帧率,与流帧率同步,避免冗余处理 +# cap.set(cv2.CAP_PROP_FPS, TARGET_FPS) +# +# # ✅ 步骤3:可选启用硬件加速(保留原逻辑,增加异常兼容) +# try: +# # 启用 CUDA 硬件加速(对应 cv::CAP_PROP_HW_ACCELERATION) +# cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_CUDA) +# logger.info("已启用 CUDA 硬件加速,降低 SEI 帧解析负担") +# except (AttributeError, cv2.error): +# try: +# # 备用:启用 VA-API 硬件加速(Intel 平台) +# cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_VAAPI) +# logger.info("已启用 VA-API 硬件加速,降低 SEI 帧解析负担") +# except (AttributeError, cv2.error): +# logger.info("未检测到硬件加速模块,使用软件解码(仍可跳过 SEI 帧错误)") +# +# # ✅ 步骤4:替代 CAP_PROP_OUTPUT_FORMAT:定义帧格式转换函数(确保 CV_8UC3 格式) +# global ensure_cv8uc3 # 全局声明,方便后续帧处理调用 +# def ensure_cv8uc3(frame): +# if frame.dtype != np.uint8: +# frame = frame.astype(np.uint8) +# if len(frame.shape) == 2: # 灰度图转彩色 +# frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) +# elif frame.shape[2] == 4: # RGBA 转 BGR +# frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR) +# return frame +# +# # ✅ 步骤5:获取流分辨率,初始化兜底帧(增加容错,防止获取失败) +# try: +# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) +# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) +# except: +# width, height = 1280, 720 # 默认分辨率,提升鲁棒性 +# nonlocal last_valid_frame +# last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8) +# +# return cap +# +# # 在线程池中初始化捕获器,避免阻塞事件循环 +# cap = await loop.run_in_executor( +# read_rtmp_frames_executor, +# init_capture_with_sei_skip +# ) +# +# if cap is None or not cap.isOpened(): +# logger.warning(f"尝试 {attempt} 次:OpenCV 捕获器初始化失败,准备重试") +# await asyncio.sleep(retry_delay) +# continue +# +# # 获取有效分辨率,打印启动日志 +# try: +# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) +# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) +# except: +# width, height = 1280, 720 +# logger.info(f"OpenCV 捕获器已启动(跳过 SEI 帧),分辨率:{width}x{height}") +# logger.info("解码器已配置:忽略 SEI 帧,仅处理核心视频帧,无截断错误日志") +# +# # ✅ 步骤6:读取视频流,跳过 SEI 帧影响(整合格式转换,保留异步逻辑) +# while not cancel_flag.is_set(): +# # 读取一帧(解码器自动跳过 SEI 帧,仅返回核心解码帧) +# def read_frame_core(): +# ret, frame = cap.read() +# return ret, frame +# +# ret, frame = await loop.run_in_executor( +# read_rtmp_frames_executor, +# read_frame_core +# ) +# +# current_time = cv2.getTickCount() +# frame_count += 1 +# img = None +# is_corrupted = False +# +# # ✅ 流程判断:帧有效性检查 + 格式转换(替代 CAP_PROP_OUTPUT_FORMAT) +# if ret and frame is not None and frame.size > 0: +# # 非 SEI 帧,解码成功,转换为 CV_8UC3 格式 +# frame = ensure_cv8uc3(frame) +# img = frame.copy() +# last_valid_frame = img.copy() +# consecutive_corrupted_frames = 0 +# is_corrupted = False +# else: +# # SEI 帧或解码异常,使用兜底帧(避免解码器状态异常) +# img = last_valid_frame.copy() +# is_corrupted = True +# consecutive_corrupted_frames += 1 +# if consecutive_corrupted_frames % 15 == 0: +# logger.warning(f"跳过 SEI 帧/解码异常,连续损坏帧: {consecutive_corrupted_frames}(解码器状态稳定)") +# +# # 强制兜底,保证帧非空 +# if img is None or img.size == 0: +# img = np.zeros((height, width, 3), dtype=np.uint8) +# +# # 获取 OSD 信息(保留原逻辑,实时性优先) +# osd_info = None +# if device and topic_osd_info and method_osd_info: +# try: +# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) +# if osd_msg and hasattr(osd_msg, 'data'): +# osd_info = Air_Attitude( +# gimbal_pitch=osd_msg.data.gimbal_pitch, +# gimbal_roll=osd_msg.data.gimbal_roll, +# gimbal_yaw=osd_msg.data.gimbal_yaw, +# height=osd_msg.data.height, +# latitude=osd_msg.data.latitude, +# longitude=osd_msg.data.longitude +# ) +# except Exception as osd_error: +# pass +# +# # 实时放入队列,满队列丢弃旧帧(保证实时性,保留原逻辑) +# if img is not None: +# try: +# if frame_queue.full(): +# frame_queue.get_nowait() +# current_time_ns = time.time_ns() +# await frame_queue.put((img, osd_info, current_time_ns), timeout=0.001) +# except asyncio.TimeoutError: +# pass +# +# # 统计实时 FPS,验证性能(无额外 CPU 开销) +# pic_count += 1 +# tick_count = cv2.getTickCount() - time_start +# elapsed_seconds = tick_count / cv2.getTickFrequency() +# if elapsed_seconds >= 1.0: +# fps = pic_count / elapsed_seconds +# print(f"实时统计:FPS:{fps:.2f} 分辨率:{img.shape[1]}x{img.shape[0]} 无 SEI 帧截断错误日志") +# pic_count = 0 +# time_start = cv2.getTickCount() +# +# # 连续损坏帧触发重连(解码器状态异常时重试) +# if consecutive_corrupted_frames > max_consecutive_corrupted: +# logger.warning(f"连续{consecutive_corrupted_frames}帧异常,触发捕获器重连") +# break +# +# except Exception as e: +# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") +# if attempt < max_retries: +# await asyncio.sleep(retry_delay) +# else: +# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") +# finally: +# # 安全释放捕获器资源 +# if cap is not None and hasattr(cap, 'isOpened') and cap.isOpened(): +# cap.release() +# logger.info("OpenCV 捕获器资源已释放") +# +# # 统计最终结果 +# total_elapsed = (cv2.getTickCount() - time_start) / cv2.getTickFrequency() +# if frame_count > 0 and frame_count > max_retries: +# avg_fps = frame_count / total_elapsed if total_elapsed > 0 else 0 +# print(f"\nRTMP 流读取完成,总有效帧数: {frame_count}, 平均 FPS: {avg_fps:.2f},无 SEI 帧截断错误日志") +# else: +# print("\nRTMP 流读取失败,未获取到有效帧") +# +# logger.info(f"RTMP 流已结束,累计处理帧数: {frame_count}") @@ -1942,7 +2426,7 @@ from PIL import Image, ImageDraw, ImageFont chinese_font = None try: - chinese_font = ImageFont.truetype("config/SIMSUN.TTC", 60) + chinese_font = ImageFont.truetype("config/SIMSUN.TTC", 20) except: chinese_font = ImageFont.load_default() @@ -2187,6 +2671,7 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps: "box": [x1, y1, x2, y2] }) label = f"{en_name}:{confidence:.2f}:{track_id}" + label_name = f"{en_name}" # 计算文本位置 text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[ 0] @@ -2201,8 +2686,9 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps: frame_copy = put_chinese_text( temp_img, # label, # 置信度、类别、用作测试 - "", # 注释掉汉字 - (text_x, text_y - text_height), + # "", # 注释掉汉字 + label_name, # 仅显示汉字 + (text_x, text_y- 40), ) else: cls_count += 1 @@ -2225,6 +2711,7 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps: # 准备标签文本 # label = f"{chinese_label.get(cls_id, class_name)}: {confidence:.2f}:{track_id}" label = f"{confidence:.2f}:{track_id}" + label_name = f"{en_name}" # 计算文本位置 text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=8)[0] text_width, text_height = text_size[0], text_size[1] @@ -2240,8 +2727,9 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps: frame_copy = put_chinese_text( temp_img, # label, # 置信度、类别、用作测试 - "", # 注释掉汉字 - (text_x, text_y - text_height), + # "", # 注释掉汉字 + label_name, # 仅显示汉字 + (text_x,text_y- 40), ) if invade_state: diff --git a/yolo_api.py b/yolo_api.py index 078ab3e..919e228 100644 --- a/yolo_api.py +++ b/yolo_api.py @@ -664,11 +664,11 @@ async def run_back_Multi_Detect_async(request, request_json, stop_event: asyncio { 'path': config.model_path, - 'engine_path': config.engine_path, - 'so_path': config.so_path, + # 'engine_path': config.engine_path, + # 'so_path': config.so_path, # # 测试代码 - # 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine", - # 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll", + 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine", + 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll", # 工地安全帽 # 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\gdaq_hat_0926\gdaq_hat_0926.engine", # 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\gdaq_hat_0926\myplugins.dll", @@ -1657,6 +1657,8 @@ async def websocket_endpoint(request: Request, ws): camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt" if model2 == "M4D": camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt" + elif model2 == "M3TD": + camera_para_url = "meta_data/camera_para/hami_camera_para .txt" elif model2 == "M4TD": camera_para_url = "meta_data/camera_para/hami_camera_para .txt" camera_file_path = downFile(camera_para_url)