diff --git a/yolo/cv_multi_model_back_video.py b/yolo/cv_multi_model_back_video.py index ad9feef..81690c8 100644 --- a/yolo/cv_multi_model_back_video.py +++ b/yolo/cv_multi_model_back_video.py @@ -1392,6 +1392,228 @@ import asyncio from typing import Optional from concurrent.futures import ThreadPoolExecutor # 使用cv2 拉流,避免了ffmpeg 拉流的rtmp延时3s的问题 +# async def read_rtmp_frames( +# loop, +# read_rtmp_frames_executor: ThreadPoolExecutor, +# video_url: str, +# device: Optional[MQTTDevice] = None, +# topic_camera_osd: Optional[str] = None, +# method_camera_osd: Optional[str] = None, +# topic_osd_info: Optional[str] = None, +# method_osd_info: Optional[str] = None, +# cancel_flag: Optional[asyncio.Event] = None, +# frame_queue: asyncio.Queue = None, +# timestamp_frame_queue=None +# ): +# """ +# 基于 OpenCV+FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧) +# ✅ 核心修改:替换原FFmpeg子进程读流为 cv2.VideoCapture 读流,保留所有原有业务逻辑 +# ✅ 核心优化:自适应分辨率、低延迟无残影、断流自动重连、超时保护 +# """ +# max_retries = 20 +# retry_delay = 2 +# pic_count = 0 +# attempt = 0 +# time_start = time.time_ns() +# frame_count = 0 +# +# if cancel_flag is None: +# cancel_flag = asyncio.Event() +# if timestamp_frame_queue is None: +# timestamp_frame_queue = [] +# +# print(f"开始读取RTMP流: {video_url}") +# +# while not cancel_flag.is_set() and attempt < max_retries: +# attempt += 1 +# if cancel_flag.is_set(): +# logger.info("收到停止信号,终止 RTMP 读取") +# break +# +# cap = None +# width, height = None, None +# stream_fps = None +# +# try: +# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") +# +# # ✅ 核心替换:使用cv2.VideoCapture + CAP_FFMPEG 打开RTMP流,最优参数配置 +# # 切换到线程池执行opencv操作,避免阻塞协程 +# cap = await loop.run_in_executor( +# read_rtmp_frames_executor, +# lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) +# ) +# # 设置核心参数 - 重中之重,缺一不可 +# await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000)) +# await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 50000)) +# await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)) # 无缓存,低延迟无残影 +# await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25)) +# +# # 校验流是否成功打开 +# is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened()) +# if not is_opened: +# logger.warning(f"尝试 {attempt} 次:打开RTMP流失败,准备重试") +# await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None) +# await asyncio.sleep(retry_delay) +# continue +# +# # ✅ 自适应获取流的【真实分辨率】,无需手动探测,精准无误差 +# width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))) +# height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) +# stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS)) +# +# # 兜底分辨率,防止异常 +# if width is None or height is None or width == 0 or height == 0: +# logger.warning("使用默认分辨率 1920x1080") +# width, height = 1920, 1080 +# +# logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS") +# logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}") +# +# # 初始化帧读取状态 (保留原逻辑不变) +# frame_sequence = 0 +# last_timestamp = time_start +# consecutive_corrupted_frames = 0 # 连续损坏帧计数 +# max_consecutive_corrupted = 10 # 最大连续损坏帧数 +# +# while not cancel_flag.is_set(): +# try: +# # ✅ 核心替换:使用cv2.read()读取帧,线程池执行避免阻塞协程 +# ret, frame = await loop.run_in_executor( +# read_rtmp_frames_executor, +# lambda: cap.read() +# ) +# +# current_time_ns = time.time_ns() +# frame_sequence += 1 +# frame_count += 1 +# +# # 处理帧数据(保留原逻辑完全不变,兼容原有的损坏帧处理) +# img = None +# is_corrupted = False +# print(f"读取 read_rtmp_frames 判断") +# try: +# if ret and frame is not None and frame.shape == (height, width, 3): +# # 完整有效帧处理 +# img = frame.copy() +# consecutive_corrupted_frames = 0 # 重置连续损坏计数 +# else: +# # 损坏帧/空帧处理 +# logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}") +# is_corrupted = True +# consecutive_corrupted_frames += 1 +# +# # 创建替代帧,保留原逻辑 +# if consecutive_corrupted_frames <= max_consecutive_corrupted: +# img = np.zeros((height, width, 3), dtype=np.uint8) +# else: +# img = np.zeros((height, width, 3), dtype=np.uint8) +# logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧") +# +# except Exception as frame_error: +# logger.error(f"帧数据处理错误: {frame_error}") +# # 创建空白帧作为后备,保留原逻辑 +# img = np.zeros((height, width, 3), dtype=np.uint8) +# is_corrupted = True +# consecutive_corrupted_frames += 1 +# print(f"读取 read_rtmp_frames 判断1") +# +# # 获取OSD信息 - 保留原逻辑完全不变 +# osd_info = None +# if device and topic_osd_info and method_osd_info: +# try: +# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) +# if osd_msg and hasattr(osd_msg, 'data'): +# osd_info = Air_Attitude( +# gimbal_pitch=osd_msg.data.gimbal_pitch, +# gimbal_roll=osd_msg.data.gimbal_roll, +# gimbal_yaw=osd_msg.data.gimbal_yaw, +# height=osd_msg.data.height, +# latitude=osd_msg.data.latitude, +# longitude=osd_msg.data.longitude +# ) +# except Exception as osd_error: +# logger.warning(f"获取OSD信息失败: {osd_error}") +# print(f"读取 read_rtmp_frames 判断2") +# +# # 放入帧队列 - 保留原逻辑完全不变 +# if img is not None and not frame_queue.full(): +# # 确保时间戳递增 +# if current_time_ns <= last_timestamp: +# current_time_ns = last_timestamp + 1 +# last_timestamp = current_time_ns +# +# # 统计信息 - 保留原逻辑完全不变 +# pic_count += 1 +# if current_time_ns - time_start > 1000000000: # 1秒 +# elapsed_seconds = (current_time_ns - time_start) / 1e9 +# fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0 +# corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0 +# print( +# f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%") +# pic_count = 0 +# time_start = current_time_ns +# +# print(f"读取 read_rtmp_frames 实时流") +# +# # 准备帧数据 - 保留原逻辑完全不变 +# frame_data = { +# "sequence": frame_sequence, +# "frame": img, +# "osd_info": osd_info, +# "timestamp": current_time_ns, +# "is_corrupted": is_corrupted +# } +# time_ns = time.time_ns() +# if img is not None and osd_info is not None: +# await frame_queue.put((img, osd_info, time_ns)) +# timestamp_frame_queue.append({ +# "timestamp": time_ns, +# "frame": img +# }) +# +# if frame_sequence % 100 == 0: # 每100帧输出一次日志 +# logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}") +# +# elif frame_queue.full(): +# logger.warning("帧队列已满,跳过此帧") +# await asyncio.sleep(0.001) # 短暂等待 +# +# # 连续帧损坏触发重连前置判断 - 保留原逻辑 +# if consecutive_corrupted_frames > max_consecutive_corrupted: +# logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑") +# break +# +# except Exception as e: +# logger.error(f"读取帧数据时出错: {e}", exc_info=True) +# break +# +# except Exception as e: +# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") +# if attempt < max_retries: +# await asyncio.sleep(retry_delay) +# else: +# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") +# finally: +# # ✅ 释放opencv的VideoCapture资源,替代原FFmpeg进程的关闭逻辑 +# if cap is not None: +# await loop.run_in_executor( +# read_rtmp_frames_executor, +# lambda: cap.release() +# ) +# logger.info("OpenCV VideoCapture 资源已释放") +# +# if frame_count > 0: +# total_time = (time.time_ns() - time_start) / 1e9 +# avg_fps = frame_count / total_time if total_time > 0 else 0 +# print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}") +# else: +# print("RTMP流读取失败,未获取到任何帧") +# +# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") + + + async def read_rtmp_frames( loop, read_rtmp_frames_executor: ThreadPoolExecutor, @@ -1406,12 +1628,13 @@ async def read_rtmp_frames( timestamp_frame_queue=None ): """ - 基于 OpenCV+FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧) - ✅ 核心修改:替换原FFmpeg子进程读流为 cv2.VideoCapture 读流,保留所有原有业务逻辑 - ✅ 核心优化:自适应分辨率、低延迟无残影、断流自动重连、超时保护 + 基于 OpenCV+FFmpeg 读取 RTMP 流帧(兼容优化版:解决URL格式错误+超时问题) + ✅ 核心优化:改用FFmpeg环境变量传递容错参数,提升兼容性 + ✅ 修复问题:解决流超时、URL识别失败的问题,提升连接成功率 + ✅ 保留功能:低延迟、丢包容错、断流自动重连、自适应分辨率 """ max_retries = 20 - retry_delay = 2 + retry_delay = 3 # 小幅增加重试间隔,避免频繁重试给服务端压力 pic_count = 0 attempt = 0 time_start = time.time_ns() @@ -1421,8 +1644,21 @@ async def read_rtmp_frames( cancel_flag = asyncio.Event() if timestamp_frame_queue is None: timestamp_frame_queue = [] + if frame_queue is None: + frame_queue = asyncio.Queue(maxsize=30) # 给队列设置默认容量,防止内存溢出 - print(f"开始读取RTMP流: {video_url}") + # ✅ 关键改动1:通过FFmpeg环境变量传递容错参数(兼容所有OpenCV/FFmpeg环境) + # 这是比URL拼接更稳定的方式,OpenCV底层FFmpeg会自动读取这些环境变量 + os.environ['FFMPEG_FLAGS'] = ( + "nobuffer,fastseek,discardcorrupt," # 核心容错:无缓冲、快速寻址、丢弃损坏帧 + "low_delay," # 低延迟模式 + "ignore_err," # 忽略解码错误 + "max_delay=0" # 最大延迟设为0 + ) + # 额外设置FFmpeg的日志级别,减少无关输出(可选) + os.environ['FFREPORT'] = "level=warning" + + print(f"开始读取RTMP流(兼容模式,已配置FFmpeg容错参数): {video_url}") while not cancel_flag.is_set() and attempt < max_retries: attempt += 1 @@ -1437,27 +1673,36 @@ async def read_rtmp_frames( try: logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") - # ✅ 核心替换:使用cv2.VideoCapture + CAP_FFMPEG 打开RTMP流,最优参数配置 - # 切换到线程池执行opencv操作,避免阻塞协程 + # ✅ 关键改动2:恢复原始RTMP URL,避免格式识别错误 cap = await loop.run_in_executor( read_rtmp_frames_executor, lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) ) - # 设置核心参数 - 重中之重,缺一不可 - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000)) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 50000)) - await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)) # 无缓存,低延迟无残影 + + # ✅ 关键改动3:优化超时和缓存参数,解决30秒超时问题 + # 调整超时时间:OpenCV的超时参数在部分环境下需要略大于实际预期(推荐设为30秒) + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 30000)) # 连接超时30秒(适配底层逻辑) + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000)) # 读取帧超时10秒(避免长时间阻塞) + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)) # 缓存从1调整为2,平衡低延迟和稳定性(丢包场景下更稳定) await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25)) + # 关闭自动处理功能,减少额外开销 + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_AUTO_WB, 0)) + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0)) + + # ✅ 关键改动4:流连接预热,提升首次打开成功率 + await asyncio.sleep(0.5) # 短暂等待,让FFmpeg完成底层初始化 # 校验流是否成功打开 is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened()) if not is_opened: logger.warning(f"尝试 {attempt} 次:打开RTMP流失败,准备重试") + # 释放资源前先短暂等待,避免资源泄露 + await asyncio.sleep(0.1) await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None) await asyncio.sleep(retry_delay) continue - # ✅ 自适应获取流的【真实分辨率】,无需手动探测,精准无误差 + # 自适应获取流的【真实分辨率】 width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))) height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS)) @@ -1470,15 +1715,15 @@ async def read_rtmp_frames( logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS") logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}") - # 初始化帧读取状态 (保留原逻辑不变) + # 初始化帧读取状态 frame_sequence = 0 last_timestamp = time_start consecutive_corrupted_frames = 0 # 连续损坏帧计数 - max_consecutive_corrupted = 10 # 最大连续损坏帧数 + max_consecutive_corrupted = 20 # 适配丢包场景的最大连续损坏帧数 while not cancel_flag.is_set(): try: - # ✅ 核心替换:使用cv2.read()读取帧,线程池执行避免阻塞协程 + # 读取帧,线程池执行避免阻塞协程 ret, frame = await loop.run_in_executor( read_rtmp_frames_executor, lambda: cap.read() @@ -1488,22 +1733,26 @@ async def read_rtmp_frames( frame_sequence += 1 frame_count += 1 - # 处理帧数据(保留原逻辑完全不变,兼容原有的损坏帧处理) + # 处理帧数据(微调损坏帧判断,提升容错) img = None is_corrupted = False - print(f"读取 read_rtmp_frames 判断") try: - if ret and frame is not None and frame.shape == (height, width, 3): - # 完整有效帧处理 + # 放宽帧形状判断,允许偶尔的分辨率小幅波动(丢包导致) + valid_frame_shape = (height, width, 3) + if ret and frame is not None and frame.shape in [valid_frame_shape, (height, width)]: + # 完整有效帧处理,统一转为3通道BGR格式 img = frame.copy() + if len(img.shape) == 2: # 灰度帧转为3通道 + img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) consecutive_corrupted_frames = 0 # 重置连续损坏计数 else: # 损坏帧/空帧处理 - logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}") + if consecutive_corrupted_frames % 5 == 0: # 每5帧打印一次日志,减少冗余输出 + logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}") is_corrupted = True consecutive_corrupted_frames += 1 - # 创建替代帧,保留原逻辑 + # 创建替代帧 if consecutive_corrupted_frames <= max_consecutive_corrupted: img = np.zeros((height, width, 3), dtype=np.uint8) else: @@ -1512,11 +1761,9 @@ async def read_rtmp_frames( except Exception as frame_error: logger.error(f"帧数据处理错误: {frame_error}") - # 创建空白帧作为后备,保留原逻辑 img = np.zeros((height, width, 3), dtype=np.uint8) is_corrupted = True consecutive_corrupted_frames += 1 - print(f"读取 read_rtmp_frames 判断1") # 获取OSD信息 - 保留原逻辑完全不变 osd_info = None @@ -1534,7 +1781,6 @@ async def read_rtmp_frames( ) except Exception as osd_error: logger.warning(f"获取OSD信息失败: {osd_error}") - print(f"读取 read_rtmp_frames 判断2") # 放入帧队列 - 保留原逻辑完全不变 if img is not None and not frame_queue.full(): @@ -1543,7 +1789,7 @@ async def read_rtmp_frames( current_time_ns = last_timestamp + 1 last_timestamp = current_time_ns - # 统计信息 - 保留原逻辑完全不变 + # 统计信息 pic_count += 1 if current_time_ns - time_start > 1000000000: # 1秒 elapsed_seconds = (current_time_ns - time_start) / 1e9 @@ -1554,9 +1800,7 @@ async def read_rtmp_frames( pic_count = 0 time_start = current_time_ns - print(f"读取 read_rtmp_frames 实时流") - - # 准备帧数据 - 保留原逻辑完全不变 + # 准备帧数据 frame_data = { "sequence": frame_sequence, "frame": img, @@ -1579,7 +1823,7 @@ async def read_rtmp_frames( logger.warning("帧队列已满,跳过此帧") await asyncio.sleep(0.001) # 短暂等待 - # 连续帧损坏触发重连前置判断 - 保留原逻辑 + # 连续帧损坏触发重连前置判断 if consecutive_corrupted_frames > max_consecutive_corrupted: logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑") break @@ -1595,11 +1839,11 @@ async def read_rtmp_frames( else: raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") finally: - # ✅ 释放opencv的VideoCapture资源,替代原FFmpeg进程的关闭逻辑 + # 安全释放opencv的VideoCapture资源 if cap is not None: await loop.run_in_executor( read_rtmp_frames_executor, - lambda: cap.release() + lambda: cap.release() if cap.isOpened() else None ) logger.info("OpenCV VideoCapture 资源已释放") @@ -1613,6 +1857,7 @@ async def read_rtmp_frames( logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") + # async def process_frames(detector: MultiYOLODetector): # async def process_frames(detector: MultiYOLODetector_TrackId, cancel_flag: asyncio.Event, # frame_queue: asyncio.Queue, processed_queue: asyncio.Queue): diff --git a/yolo_api.py b/yolo_api.py index 1bddcb3..078ab3e 100644 --- a/yolo_api.py +++ b/yolo_api.py @@ -667,8 +667,8 @@ async def run_back_Multi_Detect_async(request, request_json, stop_event: asyncio 'engine_path': config.engine_path, 'so_path': config.so_path, # # 测试代码 - # 'engine_path': r"D:\project\AI-PYTHON\tensorrtx-master\yolo11\build\Release\build.engine", - # 'so_path': r"D:\project\AI-PYTHON\tensorrtx-master\yolo11\build\Release\myplugins.dll", + # 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine", + # 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll", # 工地安全帽 # 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\gdaq_hat_0926\gdaq_hat_0926.engine", # 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\gdaq_hat_0926\myplugins.dll",