Compare commits

...

2 Commits

View File

@ -819,7 +819,579 @@ async def read_video_frames(task_id, mqtt, mqtt_publish_topic,
if os.path.exists(local_video_path):
os.remove(local_video_path)
#
# async def read_rtmp_frames(
# loop,
# read_rtmp_frames_executor: ThreadPoolExecutor,
# video_url: str,
# device: Optional[MQTTDevice] = None,
# topic_camera_osd: Optional[str] = None,
# method_camera_osd: Optional[str] = None,
# topic_osd_info: Optional[str] = None,
# method_osd_info: Optional[str] = None,
# cancel_flag: Optional[asyncio.Event] = None,
# frame_queue: asyncio.Queue = None,
# timestamp_frame_queue: TimestampedQueue = None
# ):
# """
# 异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池)
# """
# max_retries = 20
# retry_delay = 2
# pic_count = 0
# attempt = 0
# time_start = time.time_ns() # 添加开始时间统计
# frame_count = 0 # 统计总帧数
#
# if cancel_flag is None:
# cancel_flag = asyncio.Event()
#
# # loop = asyncio.get_running_loop()
#
# # 打印初始统计信息
# print(f"开始读取RTMP流: {video_url}")
#
# while not cancel_flag.is_set() and attempt < max_retries:
# attempt += 1
# if cancel_flag.is_set():
# logger.info("收到停止信号,终止 RTMP 读取")
# break
#
# container = None
# try:
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
# # 1. 关键优化:将同步的 av.open 和流初始化放到线程池
# container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url)
# video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next,
# (s for s in container.streams if s.type == 'video'))
# logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})")
#
# # 2. 提前获取一次OSD消息验证MQTT是否正常
# if device and topic_osd_info and method_osd_info:
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
# if osd_msg:
# logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}")
# else:
# logger.warning("初始OSD消息为空可能MQTT尚未收到消息")
#
# # 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取
# async def async_frame_generator():
# """异步帧生成器在后台线程迭代同步帧通过yield返回给事件循环"""
#
# def sync_frame_iter():
# try:
# for frame in container.decode(video=0):
# # 线程内检查取消标志(需定期检查,避免线程无法退出)
# if cancel_flag.is_set():
# logger.info("后台线程检测到取消信号,停止帧迭代")
# break
#
# # 确保是3通道RGB
# if len(frame.planes) == 1: # 如果是灰度图
# gray = frame.to_ndarray(format='gray')
# # 转换为3通道BGR不修改尺寸
# bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
# yield bgr
# else:
# # 保持原始尺寸和色彩空间,只转换格式
# bgr = frame.to_ndarray(format='bgr24')
# yield bgr
# except Exception as e:
# logger.error(f"同步帧迭代出错: {e}")
# finally:
# if container:
# container.close()
# logger.info("RTMP容器已关闭")
#
# # 将同步迭代器包装为异步生成器
# gen = sync_frame_iter()
# while not cancel_flag.is_set():
# try:
# # 每次获取一帧都通过线程池执行,避免长时间阻塞
# frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None)
# if frame is None: # 迭代结束
# break
# yield frame
# except StopIteration:
# logger.info("RTMP流帧迭代结束")
# break
# except Exception as e:
# logger.error(f"异步获取帧出错: {e}")
# break
#
# # 4. 异步迭代帧(不阻塞事件循环)
# async for frame in async_frame_generator():
# if cancel_flag.is_set():
# logger.info("检测到取消信号,停止读取帧")
# break
#
# try:
# # 5. 帧转换也放到线程池av.Frame.to_ndarray是CPU密集操作
# img = frame.copy() # 确保不修改原始帧
# osd_info = None
#
# # 6. 此时事件循环未被阻塞MQTT消息已缓存get_latest_message可即时获取
# if device and topic_osd_info and method_osd_info:
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
# if osd_msg and hasattr(osd_msg, 'data'):
# osd_info = Air_Attitude(
# gimbal_pitch=osd_msg.data.gimbal_pitch,
# gimbal_roll=osd_msg.data.gimbal_roll,
# gimbal_yaw=osd_msg.data.gimbal_yaw,
# height=osd_msg.data.height,
# latitude=osd_msg.data.latitude,
# longitude=osd_msg.data.longitude
# )
#
# # 7. 异步放入帧队列(避免队列满时阻塞)
# if not frame_queue.full():
# pic_count += 1
# frame_count += 1 # 增加总帧数统计
# time_ns = time.time_ns()
#
# # 定期输出统计信息每1000帧
# if time_ns - time_start > 1000000000:
# print(f"readFrames {pic_count}")
# pic_count = 0
# time_start = time_ns
# if img is not None and osd_info is not None:
# await frame_queue.put((img, osd_info, time_ns))
# timestamp_frame_queue.append({
# "timestamp": time_ns,
# "frame": img
# })
# logger.debug(
# f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}")
# else:
# logger.warning("帧队列已满等待1ms后重试")
# await asyncio.sleep(0.001)
#
# except Exception as frame_error:
# logger.error(f"处理单帧时出错: {frame_error}", exc_info=True)
# continue
#
# except (av.AVError, IOError) as e:
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# else:
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
# except asyncio.CancelledError:
# logger.info("read_rtmp_frames 收到取消信号")
# raise
# except Exception as e:
# logger.error(f"未知错误: {e}", exc_info=True)
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# finally:
# # 双重保险:确保容器关闭
# if container and not container.closed:
# await loop.run_in_executor(None, container.close)
# logger.info("RTMP容器在finally中关闭")
#
# # 最终统计信息
# if frame_count > 0:
# total_time = (time.time_ns() - time_start) / 1e9
# avg_fps = frame_count / total_time if total_time > 0 else 0
# print(f"RTMP流读取完成总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
# else:
# print("RTMP流读取失败未获取到任何帧")
#
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
# ------------------------------- 下述方法使用ffmpeg 拉流可以解决cv2拉流的一些问题主要是虚拟环境ffmpeg不匹配的问题。但是ffmpeg拉流慢3s左右
# import cv2
# import json
# import asyncio
# from typing import Optional
# from concurrent.futures import ThreadPoolExecutor
#
#
# async def read_rtmp_frames(
# loop,
# read_rtmp_frames_executor: ThreadPoolExecutor,
# video_url: str,
# device: Optional[MQTTDevice] = None,
# topic_camera_osd: Optional[str] = None,
# method_camera_osd: Optional[str] = None,
# topic_osd_info: Optional[str] = None,
# method_osd_info: Optional[str] = None,
# cancel_flag: Optional[asyncio.Event] = None,
# frame_queue: asyncio.Queue = None,
# timestamp_frame_queue=None
# ):
# """
# 基于 FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧)
# """
# max_retries = 20
# retry_delay = 2
# pic_count = 0
# attempt = 0
# time_start = time.time_ns()
# frame_count = 0
#
# if cancel_flag is None:
# cancel_flag = asyncio.Event()
#
# print(f"开始读取RTMP流: {video_url}")
#
# while not cancel_flag.is_set() and attempt < max_retries:
# attempt += 1
# if cancel_flag.is_set():
# logger.info("收到停止信号,终止 RTMP 读取")
# break
#
# ffmpeg_process = None
# width, height = None, None
# frame_size = None
#
# try:
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
#
# # 1. 探测视频分辨率
# width, height = await detect_video_resolution(loop, read_rtmp_frames_executor, video_url)
#
# if width is None or height is None:
# logger.warning("使用默认分辨率 1920x1080")
# width, height = 1920, 1080
#
# frame_size = width * height * 3
# logger.info(f"视频分辨率: {width}x{height}, 帧大小: {frame_size} bytes")
#
# # 2. 启动 FFmpeg 进程(优化参数提高性能)
# ffmpeg_cmd = [
# 'ffmpeg',
# '-hide_banner',
# '-loglevel', 'warning', # 改为warning可以看到更多错误信息
# '-fflags', '+nobuffer+genpts',
# '-err_detect', 'ignore_err',
# '-max_delay', '0',
# '-flags', 'low_delay',
# '-i', video_url,
# '-an', # 无音频
# '-c:v', 'rawvideo', # 关键:输出原始视频帧,而不是复制编码
# '-pix_fmt', 'bgr24', # OpenCV使用BGR格式
# '-f', 'rawvideo', # 关键:输出原始视频格式
# '-flush_packets', '1',
# '-'
# ]
# ffmpeg_process = await loop.run_in_executor(
# read_rtmp_frames_executor,
# lambda: subprocess.Popen(
# ffmpeg_cmd,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# bufsize=frame_size # 设置合适的缓冲区大小
# )
# )
#
# logger.info(f"成功启动 FFmpeg 进程连接 RTMP 流: {video_url}")
#
# # 3. 初始化帧读取状态
# frame_sequence = 0
# last_timestamp = time_start
# consecutive_corrupted_frames = 0 # 连续损坏帧计数
# max_consecutive_corrupted = 10 # 最大连续损坏帧数
#
# while not cancel_flag.is_set():
# try:
# # 直接读取完整帧(高性能方式)
# raw_frame = await loop.run_in_executor(
# read_rtmp_frames_executor,
# lambda: ffmpeg_process.stdout.read(frame_size)
# )
#
# if not raw_frame:
# logger.warning("读取到空帧数据,流可能已结束")
# break
#
# current_time_ns = time.time_ns()
# frame_sequence += 1
# frame_count += 1
#
# # 处理帧数据(无论是否完整)
# img = None
# is_corrupted = False
# print(f"读取 read_rtmp_frames 判断")
# try:
# if len(raw_frame) == frame_size:
# # 完整帧处理
# frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3))
# img = frame.copy()
# consecutive_corrupted_frames = 0 # 重置连续损坏计数
# else:
# # 损坏帧处理
# logger.warning(f"帧数据损坏: {len(raw_frame)}/{frame_size} bytes, 序列: {frame_sequence}")
# is_corrupted = True
# consecutive_corrupted_frames += 1
#
# # 创建替代帧
# if consecutive_corrupted_frames <= max_consecutive_corrupted:
# # 尝试部分恢复
# img = np.zeros((height, width, 3), dtype=np.uint8)
# valid_data = min(len(raw_frame), frame_size)
# if valid_data > 0:
# # 尽可能填充有效数据
# temp_frame = np.frombuffer(raw_frame[:valid_data], dtype=np.uint8)
# img.flat[:len(temp_frame)] = temp_frame
# else:
# # 连续损坏过多,创建空白帧
# img = np.zeros((height, width, 3), dtype=np.uint8)
# logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
#
# except Exception as frame_error:
# logger.error(f"帧数据处理错误: {frame_error}")
# # 创建空白帧作为后备
# img = np.zeros((height, width, 3), dtype=np.uint8)
# is_corrupted = True
# consecutive_corrupted_frames += 1
# print(f"读取 read_rtmp_frames 判断1")
# # 获取OSD信息
# osd_info = None
# if device and topic_osd_info and method_osd_info:
# try:
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
# if osd_msg and hasattr(osd_msg, 'data'):
# osd_info = Air_Attitude(
# gimbal_pitch=osd_msg.data.gimbal_pitch,
# gimbal_roll=osd_msg.data.gimbal_roll,
# gimbal_yaw=osd_msg.data.gimbal_yaw,
# height=osd_msg.data.height,
# latitude=osd_msg.data.latitude,
# longitude=osd_msg.data.longitude
# )
# except Exception as osd_error:
# logger.warning(f"获取OSD信息失败: {osd_error}")
# print(f"读取 read_rtmp_frames 判断2")
# # 放入帧队列
# if img is not None and not frame_queue.full():
# # 确保时间戳递增
# if current_time_ns <= last_timestamp:
# current_time_ns = last_timestamp + 1
# last_timestamp = current_time_ns
#
# # 统计信息
# pic_count += 1
# if current_time_ns - time_start > 1000000000: # 1秒
# elapsed_seconds = (current_time_ns - time_start) / 1e9
# fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
# corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
# print(
# f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
# pic_count = 0
# time_start = current_time_ns
#
# print(f"读取 read_rtmp_frames 实时流")
#
# # 准备帧数据
# frame_data = {
# "sequence": frame_sequence,
# "frame": img,
# "osd_info": osd_info,
# "timestamp": current_time_ns,
# "is_corrupted": is_corrupted
# }
# time_ns = time.time_ns()
# if img is not None and osd_info is not None:
# await frame_queue.put((img, osd_info, time_ns))
# timestamp_frame_queue.append({
# "timestamp": time_ns,
# "frame": img
# })
#
# if frame_sequence % 100 == 0: # 每100帧输出一次日志
# logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
#
# elif frame_queue.full():
# logger.warning("帧队列已满,跳过此帧")
# await asyncio.sleep(0.001) # 短暂等待
#
# # 检查是否需要重新探测分辨率(仅在连续损坏时)
# if consecutive_corrupted_frames > 5:
# logger.warning("连续帧损坏,尝试重新探测分辨率")
# try:
# new_width, new_height = await detect_video_resolution(loop, read_rtmp_frames_executor,
# video_url)
# if new_width and new_height and (new_width != width or new_height != height):
# logger.info(f"分辨率变化: {width}x{height} -> {new_width}x{new_height}")
# width, height = new_width, new_height
# frame_size = width * height * 3
# consecutive_corrupted_frames = 0 # 重置计数
# except Exception as probe_error:
# logger.warning(f"重新探测分辨率失败: {probe_error}")
#
# except Exception as e:
# logger.error(f"读取帧数据时出错: {e}", exc_info=True)
# # 检查 FFmpeg 进程状态
# if ffmpeg_process and ffmpeg_process.poll() is not None:
# try:
# stderr_output = ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
# if stderr_output:
# logger.error(f"FFmpeg 进程错误: {stderr_output}")
# except:
# pass
# logger.error("FFmpeg 进程已退出")
# break
# # 短暂等待后继续
# await asyncio.sleep(0.01)
# continue
#
# except (subprocess.SubprocessError, IOError) as e:
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# else:
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
# except asyncio.CancelledError:
# logger.info("read_rtmp_frames 收到取消信号")
# raise
# except Exception as e:
# logger.error(f"未知错误: {e}", exc_info=True)
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# finally:
# if ffmpeg_process:
# try:
# ffmpeg_process.terminate()
# try:
# await asyncio.wait_for(
# loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait),
# timeout=2.0
# )
# except asyncio.TimeoutError:
# ffmpeg_process.kill()
# await loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait)
# except Exception as e:
# logger.warning(f"关闭FFmpeg进程时出错: {e}")
# logger.info("FFmpeg 进程已关闭")
#
# if frame_count > 0:
# total_time = (time.time_ns() - time_start) / 1e9
# avg_fps = frame_count / total_time if total_time > 0 else 0
# print(f"RTMP流读取完成总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
# else:
# print("RTMP流读取失败未获取到任何帧")
#
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
#
# async def detect_video_resolution(loop, executor, video_url):
# """
# 探测视频流的分辨率(修复版)
# """
# try:
# logger.info(f"开始探测视频分辨率: {video_url}")
#
# # 方法1: 使用简单的ffprobe命令更可靠
# simple_cmd = [
# 'ffprobe',
# '-v', 'error',
# '-select_streams', 'v:0',
# '-show_entries', 'stream=width,height',
# '-of', 'csv=p=0:s=x',
# video_url
# ]
#
# def run_simple_probe():
# try:
# result = subprocess.run(simple_cmd, capture_output=True, text=True, timeout=15)
# logger.info(f"ffprobe返回码: {result.returncode}, 输出: {result.stdout.strip()}")
#
# if result.returncode == 0 and result.stdout.strip():
# dimensions = result.stdout.strip().split('x')
# if len(dimensions) == 2:
# width = int(dimensions[0])
# height = int(dimensions[1])
# if width > 0 and height > 0:
# return width, height
# except Exception as e:
# logger.warning(f"简单分辨率探测失败: {e}")
# return None
#
# dimensions = await loop.run_in_executor(executor, run_simple_probe)
# if dimensions:
# width, height = dimensions
# logger.info(f"探测到视频分辨率: {width}x{height}")
# return width, height
#
# # 方法2: 使用详细的ffprobe命令
# detailed_cmd = [
# 'ffprobe',
# '-v', 'quiet',
# '-print_format', 'json',
# '-show_streams',
# video_url
# ]
#
# def run_detailed_probe():
# try:
# result = subprocess.run(detailed_cmd, capture_output=True, text=True, timeout=15)
# if result.returncode == 0:
# data = json.loads(result.stdout)
# if 'streams' in data:
# for stream in data['streams']:
# if stream.get('codec_type') == 'video':
# width = stream.get('width')
# height = stream.get('height')
# if width and height:
# return int(width), int(height)
# except Exception as e:
# logger.warning(f"详细分辨率探测失败: {e}")
# return None
#
# dimensions = await loop.run_in_executor(executor, run_detailed_probe)
# if dimensions:
# width, height = dimensions
# logger.info(f"探测到视频分辨率: {width}x{height}")
# return width, height
#
# # 方法3: 尝试使用ffmpeg快速探测
# quick_cmd = [
# 'ffmpeg',
# '-i', video_url,
# '-t', '1', # 只读取1秒
# '-f', 'null',
# '-'
# ]
#
# def run_quick_probe():
# try:
# result = subprocess.run(quick_cmd, capture_output=True, text=True, timeout=10)
# # 从stderr中解析分辨率信息
# if result.stderr:
# import re
# # 尝试从输出中解析分辨率
# resolution_match = re.search(r'(\d+)x(\d+)', result.stderr)
# if resolution_match:
# width = int(resolution_match.group(1))
# height = int(resolution_match.group(2))
# if width > 0 and height > 0:
# return width, height
# except Exception as e:
# logger.warning(f"快速分辨率探测失败: {e}")
# return None
#
# dimensions = await loop.run_in_executor(executor, run_quick_probe)
# if dimensions:
# width, height = dimensions
# logger.info(f"探测到视频分辨率: {width}x{height}")
# return width, height
#
# logger.warning("所有分辨率探测方法都失败,使用默认值 1920x1080")
# return 1920, 1080
#
# except Exception as e:
# logger.error(f"分辨率探测异常: {e}")
# logger.warning("使用默认分辨率 1920x1080")
# return 1920, 1080
import cv2
import asyncio
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
# 使用cv2 拉流避免了ffmpeg 拉流的rtmp延时3s的问题
async def read_rtmp_frames(
loop,
read_rtmp_frames_executor: ThreadPoolExecutor,
@ -831,24 +1403,25 @@ async def read_rtmp_frames(
method_osd_info: Optional[str] = None,
cancel_flag: Optional[asyncio.Event] = None,
frame_queue: asyncio.Queue = None,
timestamp_frame_queue: TimestampedQueue = None
timestamp_frame_queue=None
):
"""
异步读取 RTMP 流帧优化版移除帧率控制优化线程池
基于 OpenCV+FFmpeg 读取 RTMP 流帧优化版高性能读取处理损坏帧
核心修改替换原FFmpeg子进程读流为 cv2.VideoCapture 读流保留所有原有业务逻辑
核心优化自适应分辨率低延迟无残影断流自动重连超时保护
"""
max_retries = 20
retry_delay = 2
pic_count = 0
attempt = 0
time_start = time.time_ns() # 添加开始时间统计
frame_count = 0 # 统计总帧数
time_start = time.time_ns()
frame_count = 0
if cancel_flag is None:
cancel_flag = asyncio.Event()
if timestamp_frame_queue is None:
timestamp_frame_queue = []
# loop = asyncio.get_running_loop()
# 打印初始统计信息
print(f"开始读取RTMP流: {video_url}")
while not cancel_flag.is_set() and attempt < max_retries:
@ -857,81 +1430,98 @@ async def read_rtmp_frames(
logger.info("收到停止信号,终止 RTMP 读取")
break
container = None
cap = None
width, height = None, None
stream_fps = None
try:
logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
# 1. 关键优化:将同步的 av.open 和流初始化放到线程池
container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url)
video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next,
(s for s in container.streams if s.type == 'video'))
logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})")
# 2. 提前获取一次OSD消息验证MQTT是否正常
if device and topic_osd_info and method_osd_info:
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
if osd_msg:
logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}")
else:
logger.warning("初始OSD消息为空可能MQTT尚未收到消息")
# ✅ 核心替换使用cv2.VideoCapture + CAP_FFMPEG 打开RTMP流最优参数配置
# 切换到线程池执行opencv操作避免阻塞协程
cap = await loop.run_in_executor(
read_rtmp_frames_executor,
lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG)
)
# 设置核心参数 - 重中之重,缺一不可
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000))
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 50000))
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)) # 无缓存,低延迟无残影
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25))
# 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取
async def async_frame_generator():
"""异步帧生成器在后台线程迭代同步帧通过yield返回给事件循环"""
# 校验流是否成功打开
is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened())
if not is_opened:
logger.warning(f"尝试 {attempt}打开RTMP流失败准备重试")
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None)
await asyncio.sleep(retry_delay)
continue
def sync_frame_iter():
try:
for frame in container.decode(video=0):
# 线程内检查取消标志(需定期检查,避免线程无法退出)
if cancel_flag.is_set():
logger.info("后台线程检测到取消信号,停止帧迭代")
break
# ✅ 自适应获取流的【真实分辨率】,无需手动探测,精准无误差
width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS))
# 确保是3通道RGB
if len(frame.planes) == 1: # 如果是灰度图
gray = frame.to_ndarray(format='gray')
# 转换为3通道BGR不修改尺寸
bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
yield bgr
else:
# 保持原始尺寸和色彩空间,只转换格式
bgr = frame.to_ndarray(format='bgr24')
yield bgr
except Exception as e:
logger.error(f"同步帧迭代出错: {e}")
finally:
if container:
container.close()
logger.info("RTMP容器已关闭")
# 兜底分辨率,防止异常
if width is None or height is None or width == 0 or height == 0:
logger.warning("使用默认分辨率 1920x1080")
width, height = 1920, 1080
logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS")
logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}")
# 初始化帧读取状态 (保留原逻辑不变)
frame_sequence = 0
last_timestamp = time_start
consecutive_corrupted_frames = 0 # 连续损坏帧计数
max_consecutive_corrupted = 10 # 最大连续损坏帧数
# 将同步迭代器包装为异步生成器
gen = sync_frame_iter()
while not cancel_flag.is_set():
try:
# 每次获取一帧都通过线程池执行,避免长时间阻塞
frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None)
if frame is None: # 迭代结束
break
yield frame
except StopIteration:
logger.info("RTMP流帧迭代结束")
break
except Exception as e:
logger.error(f"异步获取帧出错: {e}")
break
# ✅ 核心替换使用cv2.read()读取帧,线程池执行避免阻塞协程
ret, frame = await loop.run_in_executor(
read_rtmp_frames_executor,
lambda: cap.read()
)
# 4. 异步迭代帧(不阻塞事件循环)
async for frame in async_frame_generator():
if cancel_flag.is_set():
logger.info("检测到取消信号,停止读取帧")
break
current_time_ns = time.time_ns()
frame_sequence += 1
frame_count += 1
# 处理帧数据(保留原逻辑完全不变,兼容原有的损坏帧处理)
img = None
is_corrupted = False
print(f"读取 read_rtmp_frames 判断")
try:
# 5. 帧转换也放到线程池av.Frame.to_ndarray是CPU密集操作
img = frame.copy() # 确保不修改原始帧
osd_info = None
if ret and frame is not None and frame.shape == (height, width, 3):
# 完整有效帧处理
img = frame.copy()
consecutive_corrupted_frames = 0 # 重置连续损坏计数
else:
# 损坏帧/空帧处理
logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}")
is_corrupted = True
consecutive_corrupted_frames += 1
# 6. 此时事件循环未被阻塞MQTT消息已缓存get_latest_message可即时获取
# 创建替代帧,保留原逻辑
if consecutive_corrupted_frames <= max_consecutive_corrupted:
img = np.zeros((height, width, 3), dtype=np.uint8)
else:
img = np.zeros((height, width, 3), dtype=np.uint8)
logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
except Exception as frame_error:
logger.error(f"帧数据处理错误: {frame_error}")
# 创建空白帧作为后备,保留原逻辑
img = np.zeros((height, width, 3), dtype=np.uint8)
is_corrupted = True
consecutive_corrupted_frames += 1
print(f"读取 read_rtmp_frames 判断1")
# 获取OSD信息 - 保留原逻辑完全不变
osd_info = None
if device and topic_osd_info and method_osd_info:
try:
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
if osd_msg and hasattr(osd_msg, 'data'):
osd_info = Air_Attitude(
@ -942,54 +1532,77 @@ async def read_rtmp_frames(
latitude=osd_msg.data.latitude,
longitude=osd_msg.data.longitude
)
except Exception as osd_error:
logger.warning(f"获取OSD信息失败: {osd_error}")
print(f"读取 read_rtmp_frames 判断2")
# 7. 异步放入帧队列(避免队列满时阻塞)
if not frame_queue.full():
# 放入帧队列 - 保留原逻辑完全不变
if img is not None and not frame_queue.full():
# 确保时间戳递增
if current_time_ns <= last_timestamp:
current_time_ns = last_timestamp + 1
last_timestamp = current_time_ns
# 统计信息 - 保留原逻辑完全不变
pic_count += 1
frame_count += 1 # 增加总帧数统计
time_ns = time.time_ns()
# 定期输出统计信息每1000帧
if time_ns - time_start > 1000000000:
print(f"readFrames {pic_count}")
if current_time_ns - time_start > 1000000000: # 1秒
elapsed_seconds = (current_time_ns - time_start) / 1e9
fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
print(
f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
pic_count = 0
time_start = time_ns
time_start = current_time_ns
print(f"读取 read_rtmp_frames 实时流")
# 准备帧数据 - 保留原逻辑完全不变
frame_data = {
"sequence": frame_sequence,
"frame": img,
"osd_info": osd_info,
"timestamp": current_time_ns,
"is_corrupted": is_corrupted
}
time_ns = time.time_ns()
if img is not None and osd_info is not None:
await frame_queue.put((img, osd_info, time_ns))
timestamp_frame_queue.append({
"timestamp": time_ns,
"frame": img
})
logger.debug(
f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}")
else:
logger.warning("帧队列已满等待1ms后重试")
await asyncio.sleep(0.001)
except Exception as frame_error:
logger.error(f"处理单帧时出错: {frame_error}", exc_info=True)
continue
if frame_sequence % 100 == 0: # 每100帧输出一次日志
logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
except (av.AVError, IOError) as e:
elif frame_queue.full():
logger.warning("帧队列已满,跳过此帧")
await asyncio.sleep(0.001) # 短暂等待
# 连续帧损坏触发重连前置判断 - 保留原逻辑
if consecutive_corrupted_frames > max_consecutive_corrupted:
logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑")
break
except Exception as e:
logger.error(f"读取帧数据时出错: {e}", exc_info=True)
break
except Exception as e:
logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
if attempt < max_retries:
await asyncio.sleep(retry_delay)
else:
raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
except asyncio.CancelledError:
logger.info("read_rtmp_frames 收到取消信号")
raise
except Exception as e:
logger.error(f"未知错误: {e}", exc_info=True)
if attempt < max_retries:
await asyncio.sleep(retry_delay)
finally:
# 双重保险:确保容器关闭
if container and not container.closed:
await loop.run_in_executor(None, container.close)
logger.info("RTMP容器在finally中关闭")
# ✅ 释放opencv的VideoCapture资源替代原FFmpeg进程的关闭逻辑
if cap is not None:
await loop.run_in_executor(
read_rtmp_frames_executor,
lambda: cap.release()
)
logger.info("OpenCV VideoCapture 资源已释放")
# 最终统计信息
if frame_count > 0:
total_time = (time.time_ns() - time_start) / 1e9
avg_fps = frame_count / total_time if total_time > 0 else 0
@ -997,7 +1610,7 @@ async def read_rtmp_frames(
else:
print("RTMP流读取失败未获取到任何帧")
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
# async def process_frames(detector: MultiYOLODetector):