diff --git a/yolo/cv_multi_model_back_video.py b/yolo/cv_multi_model_back_video.py index f39be1a..ad9feef 100644 --- a/yolo/cv_multi_model_back_video.py +++ b/yolo/cv_multi_model_back_video.py @@ -819,7 +819,579 @@ async def read_video_frames(task_id, mqtt, mqtt_publish_topic, if os.path.exists(local_video_path): os.remove(local_video_path) +# +# async def read_rtmp_frames( +# loop, +# read_rtmp_frames_executor: ThreadPoolExecutor, +# video_url: str, +# device: Optional[MQTTDevice] = None, +# topic_camera_osd: Optional[str] = None, +# method_camera_osd: Optional[str] = None, +# topic_osd_info: Optional[str] = None, +# method_osd_info: Optional[str] = None, +# cancel_flag: Optional[asyncio.Event] = None, +# frame_queue: asyncio.Queue = None, +# timestamp_frame_queue: TimestampedQueue = None +# ): +# """ +# 异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池) +# """ +# max_retries = 20 +# retry_delay = 2 +# pic_count = 0 +# attempt = 0 +# time_start = time.time_ns() # 添加开始时间统计 +# frame_count = 0 # 统计总帧数 +# +# if cancel_flag is None: +# cancel_flag = asyncio.Event() +# +# # loop = asyncio.get_running_loop() +# +# # 打印初始统计信息 +# print(f"开始读取RTMP流: {video_url}") +# +# while not cancel_flag.is_set() and attempt < max_retries: +# attempt += 1 +# if cancel_flag.is_set(): +# logger.info("收到停止信号,终止 RTMP 读取") +# break +# +# container = None +# try: +# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") +# # 1. 关键优化:将同步的 av.open 和流初始化放到线程池 +# container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url) +# video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next, +# (s for s in container.streams if s.type == 'video')) +# logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})") +# +# # 2. 提前获取一次OSD消息(验证MQTT是否正常) +# if device and topic_osd_info and method_osd_info: +# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) +# if osd_msg: +# logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}") +# else: +# logger.warning("初始OSD消息为空,可能MQTT尚未收到消息") +# +# # 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取 +# async def async_frame_generator(): +# """异步帧生成器:在后台线程迭代同步帧,通过yield返回给事件循环""" +# +# def sync_frame_iter(): +# try: +# for frame in container.decode(video=0): +# # 线程内检查取消标志(需定期检查,避免线程无法退出) +# if cancel_flag.is_set(): +# logger.info("后台线程检测到取消信号,停止帧迭代") +# break +# +# # 确保是3通道RGB +# if len(frame.planes) == 1: # 如果是灰度图 +# gray = frame.to_ndarray(format='gray') +# # 转换为3通道BGR(不修改尺寸) +# bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) +# yield bgr +# else: +# # 保持原始尺寸和色彩空间,只转换格式 +# bgr = frame.to_ndarray(format='bgr24') +# yield bgr +# except Exception as e: +# logger.error(f"同步帧迭代出错: {e}") +# finally: +# if container: +# container.close() +# logger.info("RTMP容器已关闭") +# +# # 将同步迭代器包装为异步生成器 +# gen = sync_frame_iter() +# while not cancel_flag.is_set(): +# try: +# # 每次获取一帧都通过线程池执行,避免长时间阻塞 +# frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None) +# if frame is None: # 迭代结束 +# break +# yield frame +# except StopIteration: +# logger.info("RTMP流帧迭代结束") +# break +# except Exception as e: +# logger.error(f"异步获取帧出错: {e}") +# break +# +# # 4. 异步迭代帧(不阻塞事件循环) +# async for frame in async_frame_generator(): +# if cancel_flag.is_set(): +# logger.info("检测到取消信号,停止读取帧") +# break +# +# try: +# # 5. 帧转换也放到线程池(av.Frame.to_ndarray是CPU密集操作) +# img = frame.copy() # 确保不修改原始帧 +# osd_info = None +# +# # 6. 此时事件循环未被阻塞,MQTT消息已缓存,get_latest_message可即时获取 +# if device and topic_osd_info and method_osd_info: +# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) +# if osd_msg and hasattr(osd_msg, 'data'): +# osd_info = Air_Attitude( +# gimbal_pitch=osd_msg.data.gimbal_pitch, +# gimbal_roll=osd_msg.data.gimbal_roll, +# gimbal_yaw=osd_msg.data.gimbal_yaw, +# height=osd_msg.data.height, +# latitude=osd_msg.data.latitude, +# longitude=osd_msg.data.longitude +# ) +# +# # 7. 异步放入帧队列(避免队列满时阻塞) +# if not frame_queue.full(): +# pic_count += 1 +# frame_count += 1 # 增加总帧数统计 +# time_ns = time.time_ns() +# +# # 定期输出统计信息(每1000帧) +# if time_ns - time_start > 1000000000: +# print(f"readFrames {pic_count}") +# pic_count = 0 +# time_start = time_ns +# if img is not None and osd_info is not None: +# await frame_queue.put((img, osd_info, time_ns)) +# timestamp_frame_queue.append({ +# "timestamp": time_ns, +# "frame": img +# }) +# logger.debug( +# f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}") +# else: +# logger.warning("帧队列已满,等待1ms后重试") +# await asyncio.sleep(0.001) +# +# except Exception as frame_error: +# logger.error(f"处理单帧时出错: {frame_error}", exc_info=True) +# continue +# +# except (av.AVError, IOError) as e: +# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") +# if attempt < max_retries: +# await asyncio.sleep(retry_delay) +# else: +# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") +# except asyncio.CancelledError: +# logger.info("read_rtmp_frames 收到取消信号") +# raise +# except Exception as e: +# logger.error(f"未知错误: {e}", exc_info=True) +# if attempt < max_retries: +# await asyncio.sleep(retry_delay) +# finally: +# # 双重保险:确保容器关闭 +# if container and not container.closed: +# await loop.run_in_executor(None, container.close) +# logger.info("RTMP容器在finally中关闭") +# +# # 最终统计信息 +# if frame_count > 0: +# total_time = (time.time_ns() - time_start) / 1e9 +# avg_fps = frame_count / total_time if total_time > 0 else 0 +# print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}") +# else: +# print("RTMP流读取失败,未获取到任何帧") +# +# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}") + + +# ------------------------------- 下述方法使用ffmpeg 拉流,可以解决cv2拉流的一些问题,主要是虚拟环境ffmpeg不匹配的问题。但是ffmpeg拉流慢3s左右 + +# import cv2 +# import json +# import asyncio +# from typing import Optional +# from concurrent.futures import ThreadPoolExecutor +# +# +# async def read_rtmp_frames( +# loop, +# read_rtmp_frames_executor: ThreadPoolExecutor, +# video_url: str, +# device: Optional[MQTTDevice] = None, +# topic_camera_osd: Optional[str] = None, +# method_camera_osd: Optional[str] = None, +# topic_osd_info: Optional[str] = None, +# method_osd_info: Optional[str] = None, +# cancel_flag: Optional[asyncio.Event] = None, +# frame_queue: asyncio.Queue = None, +# timestamp_frame_queue=None +# ): +# """ +# 基于 FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧) +# """ +# max_retries = 20 +# retry_delay = 2 +# pic_count = 0 +# attempt = 0 +# time_start = time.time_ns() +# frame_count = 0 +# +# if cancel_flag is None: +# cancel_flag = asyncio.Event() +# +# print(f"开始读取RTMP流: {video_url}") +# +# while not cancel_flag.is_set() and attempt < max_retries: +# attempt += 1 +# if cancel_flag.is_set(): +# logger.info("收到停止信号,终止 RTMP 读取") +# break +# +# ffmpeg_process = None +# width, height = None, None +# frame_size = None +# +# try: +# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") +# +# # 1. 探测视频分辨率 +# width, height = await detect_video_resolution(loop, read_rtmp_frames_executor, video_url) +# +# if width is None or height is None: +# logger.warning("使用默认分辨率 1920x1080") +# width, height = 1920, 1080 +# +# frame_size = width * height * 3 +# logger.info(f"视频分辨率: {width}x{height}, 帧大小: {frame_size} bytes") +# +# # 2. 启动 FFmpeg 进程(优化参数提高性能) +# ffmpeg_cmd = [ +# 'ffmpeg', +# '-hide_banner', +# '-loglevel', 'warning', # 改为warning,可以看到更多错误信息 +# '-fflags', '+nobuffer+genpts', +# '-err_detect', 'ignore_err', +# '-max_delay', '0', +# '-flags', 'low_delay', +# '-i', video_url, +# '-an', # 无音频 +# '-c:v', 'rawvideo', # 关键:输出原始视频帧,而不是复制编码 +# '-pix_fmt', 'bgr24', # OpenCV使用BGR格式 +# '-f', 'rawvideo', # 关键:输出原始视频格式 +# '-flush_packets', '1', +# '-' +# ] +# ffmpeg_process = await loop.run_in_executor( +# read_rtmp_frames_executor, +# lambda: subprocess.Popen( +# ffmpeg_cmd, +# stdout=subprocess.PIPE, +# stderr=subprocess.PIPE, +# bufsize=frame_size # 设置合适的缓冲区大小 +# ) +# ) +# +# logger.info(f"成功启动 FFmpeg 进程连接 RTMP 流: {video_url}") +# +# # 3. 初始化帧读取状态 +# frame_sequence = 0 +# last_timestamp = time_start +# consecutive_corrupted_frames = 0 # 连续损坏帧计数 +# max_consecutive_corrupted = 10 # 最大连续损坏帧数 +# +# while not cancel_flag.is_set(): +# try: +# # 直接读取完整帧(高性能方式) +# raw_frame = await loop.run_in_executor( +# read_rtmp_frames_executor, +# lambda: ffmpeg_process.stdout.read(frame_size) +# ) +# +# if not raw_frame: +# logger.warning("读取到空帧数据,流可能已结束") +# break +# +# current_time_ns = time.time_ns() +# frame_sequence += 1 +# frame_count += 1 +# +# # 处理帧数据(无论是否完整) +# img = None +# is_corrupted = False +# print(f"读取 read_rtmp_frames 判断") +# try: +# if len(raw_frame) == frame_size: +# # 完整帧处理 +# frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3)) +# img = frame.copy() +# consecutive_corrupted_frames = 0 # 重置连续损坏计数 +# else: +# # 损坏帧处理 +# logger.warning(f"帧数据损坏: {len(raw_frame)}/{frame_size} bytes, 序列: {frame_sequence}") +# is_corrupted = True +# consecutive_corrupted_frames += 1 +# +# # 创建替代帧 +# if consecutive_corrupted_frames <= max_consecutive_corrupted: +# # 尝试部分恢复 +# img = np.zeros((height, width, 3), dtype=np.uint8) +# valid_data = min(len(raw_frame), frame_size) +# if valid_data > 0: +# # 尽可能填充有效数据 +# temp_frame = np.frombuffer(raw_frame[:valid_data], dtype=np.uint8) +# img.flat[:len(temp_frame)] = temp_frame +# else: +# # 连续损坏过多,创建空白帧 +# img = np.zeros((height, width, 3), dtype=np.uint8) +# logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧") +# +# except Exception as frame_error: +# logger.error(f"帧数据处理错误: {frame_error}") +# # 创建空白帧作为后备 +# img = np.zeros((height, width, 3), dtype=np.uint8) +# is_corrupted = True +# consecutive_corrupted_frames += 1 +# print(f"读取 read_rtmp_frames 判断1") +# # 获取OSD信息 +# osd_info = None +# if device and topic_osd_info and method_osd_info: +# try: +# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) +# if osd_msg and hasattr(osd_msg, 'data'): +# osd_info = Air_Attitude( +# gimbal_pitch=osd_msg.data.gimbal_pitch, +# gimbal_roll=osd_msg.data.gimbal_roll, +# gimbal_yaw=osd_msg.data.gimbal_yaw, +# height=osd_msg.data.height, +# latitude=osd_msg.data.latitude, +# longitude=osd_msg.data.longitude +# ) +# except Exception as osd_error: +# logger.warning(f"获取OSD信息失败: {osd_error}") +# print(f"读取 read_rtmp_frames 判断2") +# # 放入帧队列 +# if img is not None and not frame_queue.full(): +# # 确保时间戳递增 +# if current_time_ns <= last_timestamp: +# current_time_ns = last_timestamp + 1 +# last_timestamp = current_time_ns +# +# # 统计信息 +# pic_count += 1 +# if current_time_ns - time_start > 1000000000: # 1秒 +# elapsed_seconds = (current_time_ns - time_start) / 1e9 +# fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0 +# corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0 +# print( +# f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%") +# pic_count = 0 +# time_start = current_time_ns +# +# print(f"读取 read_rtmp_frames 实时流") +# +# # 准备帧数据 +# frame_data = { +# "sequence": frame_sequence, +# "frame": img, +# "osd_info": osd_info, +# "timestamp": current_time_ns, +# "is_corrupted": is_corrupted +# } +# time_ns = time.time_ns() +# if img is not None and osd_info is not None: +# await frame_queue.put((img, osd_info, time_ns)) +# timestamp_frame_queue.append({ +# "timestamp": time_ns, +# "frame": img +# }) +# +# if frame_sequence % 100 == 0: # 每100帧输出一次日志 +# logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}") +# +# elif frame_queue.full(): +# logger.warning("帧队列已满,跳过此帧") +# await asyncio.sleep(0.001) # 短暂等待 +# +# # 检查是否需要重新探测分辨率(仅在连续损坏时) +# if consecutive_corrupted_frames > 5: +# logger.warning("连续帧损坏,尝试重新探测分辨率") +# try: +# new_width, new_height = await detect_video_resolution(loop, read_rtmp_frames_executor, +# video_url) +# if new_width and new_height and (new_width != width or new_height != height): +# logger.info(f"分辨率变化: {width}x{height} -> {new_width}x{new_height}") +# width, height = new_width, new_height +# frame_size = width * height * 3 +# consecutive_corrupted_frames = 0 # 重置计数 +# except Exception as probe_error: +# logger.warning(f"重新探测分辨率失败: {probe_error}") +# +# except Exception as e: +# logger.error(f"读取帧数据时出错: {e}", exc_info=True) +# # 检查 FFmpeg 进程状态 +# if ffmpeg_process and ffmpeg_process.poll() is not None: +# try: +# stderr_output = ffmpeg_process.stderr.read().decode('utf-8', errors='ignore') +# if stderr_output: +# logger.error(f"FFmpeg 进程错误: {stderr_output}") +# except: +# pass +# logger.error("FFmpeg 进程已退出") +# break +# # 短暂等待后继续 +# await asyncio.sleep(0.01) +# continue +# +# except (subprocess.SubprocessError, IOError) as e: +# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") +# if attempt < max_retries: +# await asyncio.sleep(retry_delay) +# else: +# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") +# except asyncio.CancelledError: +# logger.info("read_rtmp_frames 收到取消信号") +# raise +# except Exception as e: +# logger.error(f"未知错误: {e}", exc_info=True) +# if attempt < max_retries: +# await asyncio.sleep(retry_delay) +# finally: +# if ffmpeg_process: +# try: +# ffmpeg_process.terminate() +# try: +# await asyncio.wait_for( +# loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait), +# timeout=2.0 +# ) +# except asyncio.TimeoutError: +# ffmpeg_process.kill() +# await loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait) +# except Exception as e: +# logger.warning(f"关闭FFmpeg进程时出错: {e}") +# logger.info("FFmpeg 进程已关闭") +# +# if frame_count > 0: +# total_time = (time.time_ns() - time_start) / 1e9 +# avg_fps = frame_count / total_time if total_time > 0 else 0 +# print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}") +# else: +# print("RTMP流读取失败,未获取到任何帧") +# +# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") +# +# async def detect_video_resolution(loop, executor, video_url): +# """ +# 探测视频流的分辨率(修复版) +# """ +# try: +# logger.info(f"开始探测视频分辨率: {video_url}") +# +# # 方法1: 使用简单的ffprobe命令(更可靠) +# simple_cmd = [ +# 'ffprobe', +# '-v', 'error', +# '-select_streams', 'v:0', +# '-show_entries', 'stream=width,height', +# '-of', 'csv=p=0:s=x', +# video_url +# ] +# +# def run_simple_probe(): +# try: +# result = subprocess.run(simple_cmd, capture_output=True, text=True, timeout=15) +# logger.info(f"ffprobe返回码: {result.returncode}, 输出: {result.stdout.strip()}") +# +# if result.returncode == 0 and result.stdout.strip(): +# dimensions = result.stdout.strip().split('x') +# if len(dimensions) == 2: +# width = int(dimensions[0]) +# height = int(dimensions[1]) +# if width > 0 and height > 0: +# return width, height +# except Exception as e: +# logger.warning(f"简单分辨率探测失败: {e}") +# return None +# +# dimensions = await loop.run_in_executor(executor, run_simple_probe) +# if dimensions: +# width, height = dimensions +# logger.info(f"探测到视频分辨率: {width}x{height}") +# return width, height +# +# # 方法2: 使用详细的ffprobe命令 +# detailed_cmd = [ +# 'ffprobe', +# '-v', 'quiet', +# '-print_format', 'json', +# '-show_streams', +# video_url +# ] +# +# def run_detailed_probe(): +# try: +# result = subprocess.run(detailed_cmd, capture_output=True, text=True, timeout=15) +# if result.returncode == 0: +# data = json.loads(result.stdout) +# if 'streams' in data: +# for stream in data['streams']: +# if stream.get('codec_type') == 'video': +# width = stream.get('width') +# height = stream.get('height') +# if width and height: +# return int(width), int(height) +# except Exception as e: +# logger.warning(f"详细分辨率探测失败: {e}") +# return None +# +# dimensions = await loop.run_in_executor(executor, run_detailed_probe) +# if dimensions: +# width, height = dimensions +# logger.info(f"探测到视频分辨率: {width}x{height}") +# return width, height +# +# # 方法3: 尝试使用ffmpeg快速探测 +# quick_cmd = [ +# 'ffmpeg', +# '-i', video_url, +# '-t', '1', # 只读取1秒 +# '-f', 'null', +# '-' +# ] +# +# def run_quick_probe(): +# try: +# result = subprocess.run(quick_cmd, capture_output=True, text=True, timeout=10) +# # 从stderr中解析分辨率信息 +# if result.stderr: +# import re +# # 尝试从输出中解析分辨率 +# resolution_match = re.search(r'(\d+)x(\d+)', result.stderr) +# if resolution_match: +# width = int(resolution_match.group(1)) +# height = int(resolution_match.group(2)) +# if width > 0 and height > 0: +# return width, height +# except Exception as e: +# logger.warning(f"快速分辨率探测失败: {e}") +# return None +# +# dimensions = await loop.run_in_executor(executor, run_quick_probe) +# if dimensions: +# width, height = dimensions +# logger.info(f"探测到视频分辨率: {width}x{height}") +# return width, height +# +# logger.warning("所有分辨率探测方法都失败,使用默认值 1920x1080") +# return 1920, 1080 +# +# except Exception as e: +# logger.error(f"分辨率探测异常: {e}") +# logger.warning("使用默认分辨率 1920x1080") +# return 1920, 1080 + +import cv2 +import asyncio +from typing import Optional +from concurrent.futures import ThreadPoolExecutor +# 使用cv2 拉流,避免了ffmpeg 拉流的rtmp延时3s的问题 async def read_rtmp_frames( loop, read_rtmp_frames_executor: ThreadPoolExecutor, @@ -831,24 +1403,25 @@ async def read_rtmp_frames( method_osd_info: Optional[str] = None, cancel_flag: Optional[asyncio.Event] = None, frame_queue: asyncio.Queue = None, - timestamp_frame_queue: TimestampedQueue = None + timestamp_frame_queue=None ): """ - 异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池) + 基于 OpenCV+FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧) + ✅ 核心修改:替换原FFmpeg子进程读流为 cv2.VideoCapture 读流,保留所有原有业务逻辑 + ✅ 核心优化:自适应分辨率、低延迟无残影、断流自动重连、超时保护 """ max_retries = 20 retry_delay = 2 pic_count = 0 attempt = 0 - time_start = time.time_ns() # 添加开始时间统计 - frame_count = 0 # 统计总帧数 + time_start = time.time_ns() + frame_count = 0 if cancel_flag is None: cancel_flag = asyncio.Event() + if timestamp_frame_queue is None: + timestamp_frame_queue = [] - # loop = asyncio.get_running_loop() - - # 打印初始统计信息 print(f"开始读取RTMP流: {video_url}") while not cancel_flag.is_set() and attempt < max_retries: @@ -857,139 +1430,179 @@ async def read_rtmp_frames( logger.info("收到停止信号,终止 RTMP 读取") break - container = None + cap = None + width, height = None, None + stream_fps = None + try: logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}") - # 1. 关键优化:将同步的 av.open 和流初始化放到线程池 - container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url) - video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next, - (s for s in container.streams if s.type == 'video')) - logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})") - # 2. 提前获取一次OSD消息(验证MQTT是否正常) - if device and topic_osd_info and method_osd_info: - osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) - if osd_msg: - logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}") - else: - logger.warning("初始OSD消息为空,可能MQTT尚未收到消息") + # ✅ 核心替换:使用cv2.VideoCapture + CAP_FFMPEG 打开RTMP流,最优参数配置 + # 切换到线程池执行opencv操作,避免阻塞协程 + cap = await loop.run_in_executor( + read_rtmp_frames_executor, + lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) + ) + # 设置核心参数 - 重中之重,缺一不可 + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000)) + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 50000)) + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)) # 无缓存,低延迟无残影 + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25)) - # 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取 - async def async_frame_generator(): - """异步帧生成器:在后台线程迭代同步帧,通过yield返回给事件循环""" + # 校验流是否成功打开 + is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened()) + if not is_opened: + logger.warning(f"尝试 {attempt} 次:打开RTMP流失败,准备重试") + await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None) + await asyncio.sleep(retry_delay) + continue - def sync_frame_iter(): - try: - for frame in container.decode(video=0): - # 线程内检查取消标志(需定期检查,避免线程无法退出) - if cancel_flag.is_set(): - logger.info("后台线程检测到取消信号,停止帧迭代") - break + # ✅ 自适应获取流的【真实分辨率】,无需手动探测,精准无误差 + width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))) + height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) + stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS)) - # 确保是3通道RGB - if len(frame.planes) == 1: # 如果是灰度图 - gray = frame.to_ndarray(format='gray') - # 转换为3通道BGR(不修改尺寸) - bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR) - yield bgr - else: - # 保持原始尺寸和色彩空间,只转换格式 - bgr = frame.to_ndarray(format='bgr24') - yield bgr - except Exception as e: - logger.error(f"同步帧迭代出错: {e}") - finally: - if container: - container.close() - logger.info("RTMP容器已关闭") + # 兜底分辨率,防止异常 + if width is None or height is None or width == 0 or height == 0: + logger.warning("使用默认分辨率 1920x1080") + width, height = 1920, 1080 - # 将同步迭代器包装为异步生成器 - gen = sync_frame_iter() - while not cancel_flag.is_set(): - try: - # 每次获取一帧都通过线程池执行,避免长时间阻塞 - frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None) - if frame is None: # 迭代结束 - break - yield frame - except StopIteration: - logger.info("RTMP流帧迭代结束") - break - except Exception as e: - logger.error(f"异步获取帧出错: {e}") - break + logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS") + logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}") - # 4. 异步迭代帧(不阻塞事件循环) - async for frame in async_frame_generator(): - if cancel_flag.is_set(): - logger.info("检测到取消信号,停止读取帧") - break + # 初始化帧读取状态 (保留原逻辑不变) + frame_sequence = 0 + last_timestamp = time_start + consecutive_corrupted_frames = 0 # 连续损坏帧计数 + max_consecutive_corrupted = 10 # 最大连续损坏帧数 + while not cancel_flag.is_set(): try: - # 5. 帧转换也放到线程池(av.Frame.to_ndarray是CPU密集操作) - img = frame.copy() # 确保不修改原始帧 + # ✅ 核心替换:使用cv2.read()读取帧,线程池执行避免阻塞协程 + ret, frame = await loop.run_in_executor( + read_rtmp_frames_executor, + lambda: cap.read() + ) + + current_time_ns = time.time_ns() + frame_sequence += 1 + frame_count += 1 + + # 处理帧数据(保留原逻辑完全不变,兼容原有的损坏帧处理) + img = None + is_corrupted = False + print(f"读取 read_rtmp_frames 判断") + try: + if ret and frame is not None and frame.shape == (height, width, 3): + # 完整有效帧处理 + img = frame.copy() + consecutive_corrupted_frames = 0 # 重置连续损坏计数 + else: + # 损坏帧/空帧处理 + logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}") + is_corrupted = True + consecutive_corrupted_frames += 1 + + # 创建替代帧,保留原逻辑 + if consecutive_corrupted_frames <= max_consecutive_corrupted: + img = np.zeros((height, width, 3), dtype=np.uint8) + else: + img = np.zeros((height, width, 3), dtype=np.uint8) + logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧") + + except Exception as frame_error: + logger.error(f"帧数据处理错误: {frame_error}") + # 创建空白帧作为后备,保留原逻辑 + img = np.zeros((height, width, 3), dtype=np.uint8) + is_corrupted = True + consecutive_corrupted_frames += 1 + print(f"读取 read_rtmp_frames 判断1") + + # 获取OSD信息 - 保留原逻辑完全不变 osd_info = None - - # 6. 此时事件循环未被阻塞,MQTT消息已缓存,get_latest_message可即时获取 if device and topic_osd_info and method_osd_info: - osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) - if osd_msg and hasattr(osd_msg, 'data'): - osd_info = Air_Attitude( - gimbal_pitch=osd_msg.data.gimbal_pitch, - gimbal_roll=osd_msg.data.gimbal_roll, - gimbal_yaw=osd_msg.data.gimbal_yaw, - height=osd_msg.data.height, - latitude=osd_msg.data.latitude, - longitude=osd_msg.data.longitude - ) + try: + osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info) + if osd_msg and hasattr(osd_msg, 'data'): + osd_info = Air_Attitude( + gimbal_pitch=osd_msg.data.gimbal_pitch, + gimbal_roll=osd_msg.data.gimbal_roll, + gimbal_yaw=osd_msg.data.gimbal_yaw, + height=osd_msg.data.height, + latitude=osd_msg.data.latitude, + longitude=osd_msg.data.longitude + ) + except Exception as osd_error: + logger.warning(f"获取OSD信息失败: {osd_error}") + print(f"读取 read_rtmp_frames 判断2") - # 7. 异步放入帧队列(避免队列满时阻塞) - if not frame_queue.full(): + # 放入帧队列 - 保留原逻辑完全不变 + if img is not None and not frame_queue.full(): + # 确保时间戳递增 + if current_time_ns <= last_timestamp: + current_time_ns = last_timestamp + 1 + last_timestamp = current_time_ns + + # 统计信息 - 保留原逻辑完全不变 pic_count += 1 - frame_count += 1 # 增加总帧数统计 - time_ns = time.time_ns() - - # 定期输出统计信息(每1000帧) - if time_ns - time_start > 1000000000: - print(f"readFrames {pic_count}") + if current_time_ns - time_start > 1000000000: # 1秒 + elapsed_seconds = (current_time_ns - time_start) / 1e9 + fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0 + corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0 + print( + f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%") pic_count = 0 - time_start = time_ns + time_start = current_time_ns + + print(f"读取 read_rtmp_frames 实时流") + + # 准备帧数据 - 保留原逻辑完全不变 + frame_data = { + "sequence": frame_sequence, + "frame": img, + "osd_info": osd_info, + "timestamp": current_time_ns, + "is_corrupted": is_corrupted + } + time_ns = time.time_ns() if img is not None and osd_info is not None: await frame_queue.put((img, osd_info, time_ns)) timestamp_frame_queue.append({ "timestamp": time_ns, "frame": img }) - logger.debug( - f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}") - else: - logger.warning("帧队列已满,等待1ms后重试") - await asyncio.sleep(0.001) - except Exception as frame_error: - logger.error(f"处理单帧时出错: {frame_error}", exc_info=True) - continue + if frame_sequence % 100 == 0: # 每100帧输出一次日志 + logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}") - except (av.AVError, IOError) as e: + elif frame_queue.full(): + logger.warning("帧队列已满,跳过此帧") + await asyncio.sleep(0.001) # 短暂等待 + + # 连续帧损坏触发重连前置判断 - 保留原逻辑 + if consecutive_corrupted_frames > max_consecutive_corrupted: + logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑") + break + + except Exception as e: + logger.error(f"读取帧数据时出错: {e}", exc_info=True) + break + + except Exception as e: logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}") if attempt < max_retries: await asyncio.sleep(retry_delay) else: raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}") - except asyncio.CancelledError: - logger.info("read_rtmp_frames 收到取消信号") - raise - except Exception as e: - logger.error(f"未知错误: {e}", exc_info=True) - if attempt < max_retries: - await asyncio.sleep(retry_delay) finally: - # 双重保险:确保容器关闭 - if container and not container.closed: - await loop.run_in_executor(None, container.close) - logger.info("RTMP容器在finally中关闭") + # ✅ 释放opencv的VideoCapture资源,替代原FFmpeg进程的关闭逻辑 + if cap is not None: + await loop.run_in_executor( + read_rtmp_frames_executor, + lambda: cap.release() + ) + logger.info("OpenCV VideoCapture 资源已释放") - # 最终统计信息 if frame_count > 0: total_time = (time.time_ns() - time_start) / 1e9 avg_fps = frame_count / total_time if total_time > 0 else 0 @@ -997,7 +1610,7 @@ async def read_rtmp_frames( else: print("RTMP流读取失败,未获取到任何帧") - logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}") + logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}") # async def process_frames(detector: MultiYOLODetector):