Compare commits
No commits in common. "eedca6cd50a0515199a7fdc70a6c6f946264452c" and "5c865a4418c67bf89aca7be44ca7291479766a2b" have entirely different histories.
eedca6cd50
...
5c865a4418
@ -819,579 +819,7 @@ async def read_video_frames(task_id, mqtt, mqtt_publish_topic,
|
|||||||
if os.path.exists(local_video_path):
|
if os.path.exists(local_video_path):
|
||||||
os.remove(local_video_path)
|
os.remove(local_video_path)
|
||||||
|
|
||||||
#
|
|
||||||
# async def read_rtmp_frames(
|
|
||||||
# loop,
|
|
||||||
# read_rtmp_frames_executor: ThreadPoolExecutor,
|
|
||||||
# video_url: str,
|
|
||||||
# device: Optional[MQTTDevice] = None,
|
|
||||||
# topic_camera_osd: Optional[str] = None,
|
|
||||||
# method_camera_osd: Optional[str] = None,
|
|
||||||
# topic_osd_info: Optional[str] = None,
|
|
||||||
# method_osd_info: Optional[str] = None,
|
|
||||||
# cancel_flag: Optional[asyncio.Event] = None,
|
|
||||||
# frame_queue: asyncio.Queue = None,
|
|
||||||
# timestamp_frame_queue: TimestampedQueue = None
|
|
||||||
# ):
|
|
||||||
# """
|
|
||||||
# 异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池)
|
|
||||||
# """
|
|
||||||
# max_retries = 20
|
|
||||||
# retry_delay = 2
|
|
||||||
# pic_count = 0
|
|
||||||
# attempt = 0
|
|
||||||
# time_start = time.time_ns() # 添加开始时间统计
|
|
||||||
# frame_count = 0 # 统计总帧数
|
|
||||||
#
|
|
||||||
# if cancel_flag is None:
|
|
||||||
# cancel_flag = asyncio.Event()
|
|
||||||
#
|
|
||||||
# # loop = asyncio.get_running_loop()
|
|
||||||
#
|
|
||||||
# # 打印初始统计信息
|
|
||||||
# print(f"开始读取RTMP流: {video_url}")
|
|
||||||
#
|
|
||||||
# while not cancel_flag.is_set() and attempt < max_retries:
|
|
||||||
# attempt += 1
|
|
||||||
# if cancel_flag.is_set():
|
|
||||||
# logger.info("收到停止信号,终止 RTMP 读取")
|
|
||||||
# break
|
|
||||||
#
|
|
||||||
# container = None
|
|
||||||
# try:
|
|
||||||
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
|
|
||||||
# # 1. 关键优化:将同步的 av.open 和流初始化放到线程池
|
|
||||||
# container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url)
|
|
||||||
# video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next,
|
|
||||||
# (s for s in container.streams if s.type == 'video'))
|
|
||||||
# logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})")
|
|
||||||
#
|
|
||||||
# # 2. 提前获取一次OSD消息(验证MQTT是否正常)
|
|
||||||
# if device and topic_osd_info and method_osd_info:
|
|
||||||
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
|
||||||
# if osd_msg:
|
|
||||||
# logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}")
|
|
||||||
# else:
|
|
||||||
# logger.warning("初始OSD消息为空,可能MQTT尚未收到消息")
|
|
||||||
#
|
|
||||||
# # 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取
|
|
||||||
# async def async_frame_generator():
|
|
||||||
# """异步帧生成器:在后台线程迭代同步帧,通过yield返回给事件循环"""
|
|
||||||
#
|
|
||||||
# def sync_frame_iter():
|
|
||||||
# try:
|
|
||||||
# for frame in container.decode(video=0):
|
|
||||||
# # 线程内检查取消标志(需定期检查,避免线程无法退出)
|
|
||||||
# if cancel_flag.is_set():
|
|
||||||
# logger.info("后台线程检测到取消信号,停止帧迭代")
|
|
||||||
# break
|
|
||||||
#
|
|
||||||
# # 确保是3通道RGB
|
|
||||||
# if len(frame.planes) == 1: # 如果是灰度图
|
|
||||||
# gray = frame.to_ndarray(format='gray')
|
|
||||||
# # 转换为3通道BGR(不修改尺寸)
|
|
||||||
# bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
|
|
||||||
# yield bgr
|
|
||||||
# else:
|
|
||||||
# # 保持原始尺寸和色彩空间,只转换格式
|
|
||||||
# bgr = frame.to_ndarray(format='bgr24')
|
|
||||||
# yield bgr
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"同步帧迭代出错: {e}")
|
|
||||||
# finally:
|
|
||||||
# if container:
|
|
||||||
# container.close()
|
|
||||||
# logger.info("RTMP容器已关闭")
|
|
||||||
#
|
|
||||||
# # 将同步迭代器包装为异步生成器
|
|
||||||
# gen = sync_frame_iter()
|
|
||||||
# while not cancel_flag.is_set():
|
|
||||||
# try:
|
|
||||||
# # 每次获取一帧都通过线程池执行,避免长时间阻塞
|
|
||||||
# frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None)
|
|
||||||
# if frame is None: # 迭代结束
|
|
||||||
# break
|
|
||||||
# yield frame
|
|
||||||
# except StopIteration:
|
|
||||||
# logger.info("RTMP流帧迭代结束")
|
|
||||||
# break
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"异步获取帧出错: {e}")
|
|
||||||
# break
|
|
||||||
#
|
|
||||||
# # 4. 异步迭代帧(不阻塞事件循环)
|
|
||||||
# async for frame in async_frame_generator():
|
|
||||||
# if cancel_flag.is_set():
|
|
||||||
# logger.info("检测到取消信号,停止读取帧")
|
|
||||||
# break
|
|
||||||
#
|
|
||||||
# try:
|
|
||||||
# # 5. 帧转换也放到线程池(av.Frame.to_ndarray是CPU密集操作)
|
|
||||||
# img = frame.copy() # 确保不修改原始帧
|
|
||||||
# osd_info = None
|
|
||||||
#
|
|
||||||
# # 6. 此时事件循环未被阻塞,MQTT消息已缓存,get_latest_message可即时获取
|
|
||||||
# if device and topic_osd_info and method_osd_info:
|
|
||||||
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
|
||||||
# if osd_msg and hasattr(osd_msg, 'data'):
|
|
||||||
# osd_info = Air_Attitude(
|
|
||||||
# gimbal_pitch=osd_msg.data.gimbal_pitch,
|
|
||||||
# gimbal_roll=osd_msg.data.gimbal_roll,
|
|
||||||
# gimbal_yaw=osd_msg.data.gimbal_yaw,
|
|
||||||
# height=osd_msg.data.height,
|
|
||||||
# latitude=osd_msg.data.latitude,
|
|
||||||
# longitude=osd_msg.data.longitude
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# # 7. 异步放入帧队列(避免队列满时阻塞)
|
|
||||||
# if not frame_queue.full():
|
|
||||||
# pic_count += 1
|
|
||||||
# frame_count += 1 # 增加总帧数统计
|
|
||||||
# time_ns = time.time_ns()
|
|
||||||
#
|
|
||||||
# # 定期输出统计信息(每1000帧)
|
|
||||||
# if time_ns - time_start > 1000000000:
|
|
||||||
# print(f"readFrames {pic_count}")
|
|
||||||
# pic_count = 0
|
|
||||||
# time_start = time_ns
|
|
||||||
# if img is not None and osd_info is not None:
|
|
||||||
# await frame_queue.put((img, osd_info, time_ns))
|
|
||||||
# timestamp_frame_queue.append({
|
|
||||||
# "timestamp": time_ns,
|
|
||||||
# "frame": img
|
|
||||||
# })
|
|
||||||
# logger.debug(
|
|
||||||
# f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}")
|
|
||||||
# else:
|
|
||||||
# logger.warning("帧队列已满,等待1ms后重试")
|
|
||||||
# await asyncio.sleep(0.001)
|
|
||||||
#
|
|
||||||
# except Exception as frame_error:
|
|
||||||
# logger.error(f"处理单帧时出错: {frame_error}", exc_info=True)
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# except (av.AVError, IOError) as e:
|
|
||||||
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
|
|
||||||
# if attempt < max_retries:
|
|
||||||
# await asyncio.sleep(retry_delay)
|
|
||||||
# else:
|
|
||||||
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
|
|
||||||
# except asyncio.CancelledError:
|
|
||||||
# logger.info("read_rtmp_frames 收到取消信号")
|
|
||||||
# raise
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"未知错误: {e}", exc_info=True)
|
|
||||||
# if attempt < max_retries:
|
|
||||||
# await asyncio.sleep(retry_delay)
|
|
||||||
# finally:
|
|
||||||
# # 双重保险:确保容器关闭
|
|
||||||
# if container and not container.closed:
|
|
||||||
# await loop.run_in_executor(None, container.close)
|
|
||||||
# logger.info("RTMP容器在finally中关闭")
|
|
||||||
#
|
|
||||||
# # 最终统计信息
|
|
||||||
# if frame_count > 0:
|
|
||||||
# total_time = (time.time_ns() - time_start) / 1e9
|
|
||||||
# avg_fps = frame_count / total_time if total_time > 0 else 0
|
|
||||||
# print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
|
|
||||||
# else:
|
|
||||||
# print("RTMP流读取失败,未获取到任何帧")
|
|
||||||
#
|
|
||||||
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------- 下述方法使用ffmpeg 拉流,可以解决cv2拉流的一些问题,主要是虚拟环境ffmpeg不匹配的问题。但是ffmpeg拉流慢3s左右
|
|
||||||
|
|
||||||
# import cv2
|
|
||||||
# import json
|
|
||||||
# import asyncio
|
|
||||||
# from typing import Optional
|
|
||||||
# from concurrent.futures import ThreadPoolExecutor
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# async def read_rtmp_frames(
|
|
||||||
# loop,
|
|
||||||
# read_rtmp_frames_executor: ThreadPoolExecutor,
|
|
||||||
# video_url: str,
|
|
||||||
# device: Optional[MQTTDevice] = None,
|
|
||||||
# topic_camera_osd: Optional[str] = None,
|
|
||||||
# method_camera_osd: Optional[str] = None,
|
|
||||||
# topic_osd_info: Optional[str] = None,
|
|
||||||
# method_osd_info: Optional[str] = None,
|
|
||||||
# cancel_flag: Optional[asyncio.Event] = None,
|
|
||||||
# frame_queue: asyncio.Queue = None,
|
|
||||||
# timestamp_frame_queue=None
|
|
||||||
# ):
|
|
||||||
# """
|
|
||||||
# 基于 FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧)
|
|
||||||
# """
|
|
||||||
# max_retries = 20
|
|
||||||
# retry_delay = 2
|
|
||||||
# pic_count = 0
|
|
||||||
# attempt = 0
|
|
||||||
# time_start = time.time_ns()
|
|
||||||
# frame_count = 0
|
|
||||||
#
|
|
||||||
# if cancel_flag is None:
|
|
||||||
# cancel_flag = asyncio.Event()
|
|
||||||
#
|
|
||||||
# print(f"开始读取RTMP流: {video_url}")
|
|
||||||
#
|
|
||||||
# while not cancel_flag.is_set() and attempt < max_retries:
|
|
||||||
# attempt += 1
|
|
||||||
# if cancel_flag.is_set():
|
|
||||||
# logger.info("收到停止信号,终止 RTMP 读取")
|
|
||||||
# break
|
|
||||||
#
|
|
||||||
# ffmpeg_process = None
|
|
||||||
# width, height = None, None
|
|
||||||
# frame_size = None
|
|
||||||
#
|
|
||||||
# try:
|
|
||||||
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
|
|
||||||
#
|
|
||||||
# # 1. 探测视频分辨率
|
|
||||||
# width, height = await detect_video_resolution(loop, read_rtmp_frames_executor, video_url)
|
|
||||||
#
|
|
||||||
# if width is None or height is None:
|
|
||||||
# logger.warning("使用默认分辨率 1920x1080")
|
|
||||||
# width, height = 1920, 1080
|
|
||||||
#
|
|
||||||
# frame_size = width * height * 3
|
|
||||||
# logger.info(f"视频分辨率: {width}x{height}, 帧大小: {frame_size} bytes")
|
|
||||||
#
|
|
||||||
# # 2. 启动 FFmpeg 进程(优化参数提高性能)
|
|
||||||
# ffmpeg_cmd = [
|
|
||||||
# 'ffmpeg',
|
|
||||||
# '-hide_banner',
|
|
||||||
# '-loglevel', 'warning', # 改为warning,可以看到更多错误信息
|
|
||||||
# '-fflags', '+nobuffer+genpts',
|
|
||||||
# '-err_detect', 'ignore_err',
|
|
||||||
# '-max_delay', '0',
|
|
||||||
# '-flags', 'low_delay',
|
|
||||||
# '-i', video_url,
|
|
||||||
# '-an', # 无音频
|
|
||||||
# '-c:v', 'rawvideo', # 关键:输出原始视频帧,而不是复制编码
|
|
||||||
# '-pix_fmt', 'bgr24', # OpenCV使用BGR格式
|
|
||||||
# '-f', 'rawvideo', # 关键:输出原始视频格式
|
|
||||||
# '-flush_packets', '1',
|
|
||||||
# '-'
|
|
||||||
# ]
|
|
||||||
# ffmpeg_process = await loop.run_in_executor(
|
|
||||||
# read_rtmp_frames_executor,
|
|
||||||
# lambda: subprocess.Popen(
|
|
||||||
# ffmpeg_cmd,
|
|
||||||
# stdout=subprocess.PIPE,
|
|
||||||
# stderr=subprocess.PIPE,
|
|
||||||
# bufsize=frame_size # 设置合适的缓冲区大小
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# logger.info(f"成功启动 FFmpeg 进程连接 RTMP 流: {video_url}")
|
|
||||||
#
|
|
||||||
# # 3. 初始化帧读取状态
|
|
||||||
# frame_sequence = 0
|
|
||||||
# last_timestamp = time_start
|
|
||||||
# consecutive_corrupted_frames = 0 # 连续损坏帧计数
|
|
||||||
# max_consecutive_corrupted = 10 # 最大连续损坏帧数
|
|
||||||
#
|
|
||||||
# while not cancel_flag.is_set():
|
|
||||||
# try:
|
|
||||||
# # 直接读取完整帧(高性能方式)
|
|
||||||
# raw_frame = await loop.run_in_executor(
|
|
||||||
# read_rtmp_frames_executor,
|
|
||||||
# lambda: ffmpeg_process.stdout.read(frame_size)
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# if not raw_frame:
|
|
||||||
# logger.warning("读取到空帧数据,流可能已结束")
|
|
||||||
# break
|
|
||||||
#
|
|
||||||
# current_time_ns = time.time_ns()
|
|
||||||
# frame_sequence += 1
|
|
||||||
# frame_count += 1
|
|
||||||
#
|
|
||||||
# # 处理帧数据(无论是否完整)
|
|
||||||
# img = None
|
|
||||||
# is_corrupted = False
|
|
||||||
# print(f"读取 read_rtmp_frames 判断")
|
|
||||||
# try:
|
|
||||||
# if len(raw_frame) == frame_size:
|
|
||||||
# # 完整帧处理
|
|
||||||
# frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((height, width, 3))
|
|
||||||
# img = frame.copy()
|
|
||||||
# consecutive_corrupted_frames = 0 # 重置连续损坏计数
|
|
||||||
# else:
|
|
||||||
# # 损坏帧处理
|
|
||||||
# logger.warning(f"帧数据损坏: {len(raw_frame)}/{frame_size} bytes, 序列: {frame_sequence}")
|
|
||||||
# is_corrupted = True
|
|
||||||
# consecutive_corrupted_frames += 1
|
|
||||||
#
|
|
||||||
# # 创建替代帧
|
|
||||||
# if consecutive_corrupted_frames <= max_consecutive_corrupted:
|
|
||||||
# # 尝试部分恢复
|
|
||||||
# img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
||||||
# valid_data = min(len(raw_frame), frame_size)
|
|
||||||
# if valid_data > 0:
|
|
||||||
# # 尽可能填充有效数据
|
|
||||||
# temp_frame = np.frombuffer(raw_frame[:valid_data], dtype=np.uint8)
|
|
||||||
# img.flat[:len(temp_frame)] = temp_frame
|
|
||||||
# else:
|
|
||||||
# # 连续损坏过多,创建空白帧
|
|
||||||
# img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
||||||
# logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
|
|
||||||
#
|
|
||||||
# except Exception as frame_error:
|
|
||||||
# logger.error(f"帧数据处理错误: {frame_error}")
|
|
||||||
# # 创建空白帧作为后备
|
|
||||||
# img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
||||||
# is_corrupted = True
|
|
||||||
# consecutive_corrupted_frames += 1
|
|
||||||
# print(f"读取 read_rtmp_frames 判断1")
|
|
||||||
# # 获取OSD信息
|
|
||||||
# osd_info = None
|
|
||||||
# if device and topic_osd_info and method_osd_info:
|
|
||||||
# try:
|
|
||||||
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
|
||||||
# if osd_msg and hasattr(osd_msg, 'data'):
|
|
||||||
# osd_info = Air_Attitude(
|
|
||||||
# gimbal_pitch=osd_msg.data.gimbal_pitch,
|
|
||||||
# gimbal_roll=osd_msg.data.gimbal_roll,
|
|
||||||
# gimbal_yaw=osd_msg.data.gimbal_yaw,
|
|
||||||
# height=osd_msg.data.height,
|
|
||||||
# latitude=osd_msg.data.latitude,
|
|
||||||
# longitude=osd_msg.data.longitude
|
|
||||||
# )
|
|
||||||
# except Exception as osd_error:
|
|
||||||
# logger.warning(f"获取OSD信息失败: {osd_error}")
|
|
||||||
# print(f"读取 read_rtmp_frames 判断2")
|
|
||||||
# # 放入帧队列
|
|
||||||
# if img is not None and not frame_queue.full():
|
|
||||||
# # 确保时间戳递增
|
|
||||||
# if current_time_ns <= last_timestamp:
|
|
||||||
# current_time_ns = last_timestamp + 1
|
|
||||||
# last_timestamp = current_time_ns
|
|
||||||
#
|
|
||||||
# # 统计信息
|
|
||||||
# pic_count += 1
|
|
||||||
# if current_time_ns - time_start > 1000000000: # 1秒
|
|
||||||
# elapsed_seconds = (current_time_ns - time_start) / 1e9
|
|
||||||
# fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
|
|
||||||
# corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
|
|
||||||
# print(
|
|
||||||
# f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
|
|
||||||
# pic_count = 0
|
|
||||||
# time_start = current_time_ns
|
|
||||||
#
|
|
||||||
# print(f"读取 read_rtmp_frames 实时流")
|
|
||||||
#
|
|
||||||
# # 准备帧数据
|
|
||||||
# frame_data = {
|
|
||||||
# "sequence": frame_sequence,
|
|
||||||
# "frame": img,
|
|
||||||
# "osd_info": osd_info,
|
|
||||||
# "timestamp": current_time_ns,
|
|
||||||
# "is_corrupted": is_corrupted
|
|
||||||
# }
|
|
||||||
# time_ns = time.time_ns()
|
|
||||||
# if img is not None and osd_info is not None:
|
|
||||||
# await frame_queue.put((img, osd_info, time_ns))
|
|
||||||
# timestamp_frame_queue.append({
|
|
||||||
# "timestamp": time_ns,
|
|
||||||
# "frame": img
|
|
||||||
# })
|
|
||||||
#
|
|
||||||
# if frame_sequence % 100 == 0: # 每100帧输出一次日志
|
|
||||||
# logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
|
|
||||||
#
|
|
||||||
# elif frame_queue.full():
|
|
||||||
# logger.warning("帧队列已满,跳过此帧")
|
|
||||||
# await asyncio.sleep(0.001) # 短暂等待
|
|
||||||
#
|
|
||||||
# # 检查是否需要重新探测分辨率(仅在连续损坏时)
|
|
||||||
# if consecutive_corrupted_frames > 5:
|
|
||||||
# logger.warning("连续帧损坏,尝试重新探测分辨率")
|
|
||||||
# try:
|
|
||||||
# new_width, new_height = await detect_video_resolution(loop, read_rtmp_frames_executor,
|
|
||||||
# video_url)
|
|
||||||
# if new_width and new_height and (new_width != width or new_height != height):
|
|
||||||
# logger.info(f"分辨率变化: {width}x{height} -> {new_width}x{new_height}")
|
|
||||||
# width, height = new_width, new_height
|
|
||||||
# frame_size = width * height * 3
|
|
||||||
# consecutive_corrupted_frames = 0 # 重置计数
|
|
||||||
# except Exception as probe_error:
|
|
||||||
# logger.warning(f"重新探测分辨率失败: {probe_error}")
|
|
||||||
#
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"读取帧数据时出错: {e}", exc_info=True)
|
|
||||||
# # 检查 FFmpeg 进程状态
|
|
||||||
# if ffmpeg_process and ffmpeg_process.poll() is not None:
|
|
||||||
# try:
|
|
||||||
# stderr_output = ffmpeg_process.stderr.read().decode('utf-8', errors='ignore')
|
|
||||||
# if stderr_output:
|
|
||||||
# logger.error(f"FFmpeg 进程错误: {stderr_output}")
|
|
||||||
# except:
|
|
||||||
# pass
|
|
||||||
# logger.error("FFmpeg 进程已退出")
|
|
||||||
# break
|
|
||||||
# # 短暂等待后继续
|
|
||||||
# await asyncio.sleep(0.01)
|
|
||||||
# continue
|
|
||||||
#
|
|
||||||
# except (subprocess.SubprocessError, IOError) as e:
|
|
||||||
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
|
|
||||||
# if attempt < max_retries:
|
|
||||||
# await asyncio.sleep(retry_delay)
|
|
||||||
# else:
|
|
||||||
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
|
|
||||||
# except asyncio.CancelledError:
|
|
||||||
# logger.info("read_rtmp_frames 收到取消信号")
|
|
||||||
# raise
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"未知错误: {e}", exc_info=True)
|
|
||||||
# if attempt < max_retries:
|
|
||||||
# await asyncio.sleep(retry_delay)
|
|
||||||
# finally:
|
|
||||||
# if ffmpeg_process:
|
|
||||||
# try:
|
|
||||||
# ffmpeg_process.terminate()
|
|
||||||
# try:
|
|
||||||
# await asyncio.wait_for(
|
|
||||||
# loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait),
|
|
||||||
# timeout=2.0
|
|
||||||
# )
|
|
||||||
# except asyncio.TimeoutError:
|
|
||||||
# ffmpeg_process.kill()
|
|
||||||
# await loop.run_in_executor(read_rtmp_frames_executor, ffmpeg_process.wait)
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.warning(f"关闭FFmpeg进程时出错: {e}")
|
|
||||||
# logger.info("FFmpeg 进程已关闭")
|
|
||||||
#
|
|
||||||
# if frame_count > 0:
|
|
||||||
# total_time = (time.time_ns() - time_start) / 1e9
|
|
||||||
# avg_fps = frame_count / total_time if total_time > 0 else 0
|
|
||||||
# print(f"RTMP流读取完成,总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
|
|
||||||
# else:
|
|
||||||
# print("RTMP流读取失败,未获取到任何帧")
|
|
||||||
#
|
|
||||||
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
|
|
||||||
#
|
|
||||||
# async def detect_video_resolution(loop, executor, video_url):
|
|
||||||
# """
|
|
||||||
# 探测视频流的分辨率(修复版)
|
|
||||||
# """
|
|
||||||
# try:
|
|
||||||
# logger.info(f"开始探测视频分辨率: {video_url}")
|
|
||||||
#
|
|
||||||
# # 方法1: 使用简单的ffprobe命令(更可靠)
|
|
||||||
# simple_cmd = [
|
|
||||||
# 'ffprobe',
|
|
||||||
# '-v', 'error',
|
|
||||||
# '-select_streams', 'v:0',
|
|
||||||
# '-show_entries', 'stream=width,height',
|
|
||||||
# '-of', 'csv=p=0:s=x',
|
|
||||||
# video_url
|
|
||||||
# ]
|
|
||||||
#
|
|
||||||
# def run_simple_probe():
|
|
||||||
# try:
|
|
||||||
# result = subprocess.run(simple_cmd, capture_output=True, text=True, timeout=15)
|
|
||||||
# logger.info(f"ffprobe返回码: {result.returncode}, 输出: {result.stdout.strip()}")
|
|
||||||
#
|
|
||||||
# if result.returncode == 0 and result.stdout.strip():
|
|
||||||
# dimensions = result.stdout.strip().split('x')
|
|
||||||
# if len(dimensions) == 2:
|
|
||||||
# width = int(dimensions[0])
|
|
||||||
# height = int(dimensions[1])
|
|
||||||
# if width > 0 and height > 0:
|
|
||||||
# return width, height
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.warning(f"简单分辨率探测失败: {e}")
|
|
||||||
# return None
|
|
||||||
#
|
|
||||||
# dimensions = await loop.run_in_executor(executor, run_simple_probe)
|
|
||||||
# if dimensions:
|
|
||||||
# width, height = dimensions
|
|
||||||
# logger.info(f"探测到视频分辨率: {width}x{height}")
|
|
||||||
# return width, height
|
|
||||||
#
|
|
||||||
# # 方法2: 使用详细的ffprobe命令
|
|
||||||
# detailed_cmd = [
|
|
||||||
# 'ffprobe',
|
|
||||||
# '-v', 'quiet',
|
|
||||||
# '-print_format', 'json',
|
|
||||||
# '-show_streams',
|
|
||||||
# video_url
|
|
||||||
# ]
|
|
||||||
#
|
|
||||||
# def run_detailed_probe():
|
|
||||||
# try:
|
|
||||||
# result = subprocess.run(detailed_cmd, capture_output=True, text=True, timeout=15)
|
|
||||||
# if result.returncode == 0:
|
|
||||||
# data = json.loads(result.stdout)
|
|
||||||
# if 'streams' in data:
|
|
||||||
# for stream in data['streams']:
|
|
||||||
# if stream.get('codec_type') == 'video':
|
|
||||||
# width = stream.get('width')
|
|
||||||
# height = stream.get('height')
|
|
||||||
# if width and height:
|
|
||||||
# return int(width), int(height)
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.warning(f"详细分辨率探测失败: {e}")
|
|
||||||
# return None
|
|
||||||
#
|
|
||||||
# dimensions = await loop.run_in_executor(executor, run_detailed_probe)
|
|
||||||
# if dimensions:
|
|
||||||
# width, height = dimensions
|
|
||||||
# logger.info(f"探测到视频分辨率: {width}x{height}")
|
|
||||||
# return width, height
|
|
||||||
#
|
|
||||||
# # 方法3: 尝试使用ffmpeg快速探测
|
|
||||||
# quick_cmd = [
|
|
||||||
# 'ffmpeg',
|
|
||||||
# '-i', video_url,
|
|
||||||
# '-t', '1', # 只读取1秒
|
|
||||||
# '-f', 'null',
|
|
||||||
# '-'
|
|
||||||
# ]
|
|
||||||
#
|
|
||||||
# def run_quick_probe():
|
|
||||||
# try:
|
|
||||||
# result = subprocess.run(quick_cmd, capture_output=True, text=True, timeout=10)
|
|
||||||
# # 从stderr中解析分辨率信息
|
|
||||||
# if result.stderr:
|
|
||||||
# import re
|
|
||||||
# # 尝试从输出中解析分辨率
|
|
||||||
# resolution_match = re.search(r'(\d+)x(\d+)', result.stderr)
|
|
||||||
# if resolution_match:
|
|
||||||
# width = int(resolution_match.group(1))
|
|
||||||
# height = int(resolution_match.group(2))
|
|
||||||
# if width > 0 and height > 0:
|
|
||||||
# return width, height
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.warning(f"快速分辨率探测失败: {e}")
|
|
||||||
# return None
|
|
||||||
#
|
|
||||||
# dimensions = await loop.run_in_executor(executor, run_quick_probe)
|
|
||||||
# if dimensions:
|
|
||||||
# width, height = dimensions
|
|
||||||
# logger.info(f"探测到视频分辨率: {width}x{height}")
|
|
||||||
# return width, height
|
|
||||||
#
|
|
||||||
# logger.warning("所有分辨率探测方法都失败,使用默认值 1920x1080")
|
|
||||||
# return 1920, 1080
|
|
||||||
#
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error(f"分辨率探测异常: {e}")
|
|
||||||
# logger.warning("使用默认分辨率 1920x1080")
|
|
||||||
# return 1920, 1080
|
|
||||||
|
|
||||||
import cv2
|
|
||||||
import asyncio
|
|
||||||
from typing import Optional
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
|
||||||
# 使用cv2 拉流,避免了ffmpeg 拉流的rtmp延时3s的问题
|
|
||||||
async def read_rtmp_frames(
|
async def read_rtmp_frames(
|
||||||
loop,
|
loop,
|
||||||
read_rtmp_frames_executor: ThreadPoolExecutor,
|
read_rtmp_frames_executor: ThreadPoolExecutor,
|
||||||
@ -1403,25 +831,24 @@ async def read_rtmp_frames(
|
|||||||
method_osd_info: Optional[str] = None,
|
method_osd_info: Optional[str] = None,
|
||||||
cancel_flag: Optional[asyncio.Event] = None,
|
cancel_flag: Optional[asyncio.Event] = None,
|
||||||
frame_queue: asyncio.Queue = None,
|
frame_queue: asyncio.Queue = None,
|
||||||
timestamp_frame_queue=None
|
timestamp_frame_queue: TimestampedQueue = None
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
基于 OpenCV+FFmpeg 读取 RTMP 流帧(优化版:高性能读取,处理损坏帧)
|
异步读取 RTMP 流帧(优化版:移除帧率控制,优化线程池)
|
||||||
✅ 核心修改:替换原FFmpeg子进程读流为 cv2.VideoCapture 读流,保留所有原有业务逻辑
|
|
||||||
✅ 核心优化:自适应分辨率、低延迟无残影、断流自动重连、超时保护
|
|
||||||
"""
|
"""
|
||||||
max_retries = 20
|
max_retries = 20
|
||||||
retry_delay = 2
|
retry_delay = 2
|
||||||
pic_count = 0
|
pic_count = 0
|
||||||
attempt = 0
|
attempt = 0
|
||||||
time_start = time.time_ns()
|
time_start = time.time_ns() # 添加开始时间统计
|
||||||
frame_count = 0
|
frame_count = 0 # 统计总帧数
|
||||||
|
|
||||||
if cancel_flag is None:
|
if cancel_flag is None:
|
||||||
cancel_flag = asyncio.Event()
|
cancel_flag = asyncio.Event()
|
||||||
if timestamp_frame_queue is None:
|
|
||||||
timestamp_frame_queue = []
|
|
||||||
|
|
||||||
|
# loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
# 打印初始统计信息
|
||||||
print(f"开始读取RTMP流: {video_url}")
|
print(f"开始读取RTMP流: {video_url}")
|
||||||
|
|
||||||
while not cancel_flag.is_set() and attempt < max_retries:
|
while not cancel_flag.is_set() and attempt < max_retries:
|
||||||
@ -1430,98 +857,81 @@ async def read_rtmp_frames(
|
|||||||
logger.info("收到停止信号,终止 RTMP 读取")
|
logger.info("收到停止信号,终止 RTMP 读取")
|
||||||
break
|
break
|
||||||
|
|
||||||
cap = None
|
container = None
|
||||||
width, height = None, None
|
|
||||||
stream_fps = None
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
|
logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
|
||||||
|
# 1. 关键优化:将同步的 av.open 和流初始化放到线程池
|
||||||
|
container = await loop.run_in_executor(read_rtmp_frames_executor, av.open, video_url)
|
||||||
|
video_stream = await loop.run_in_executor(read_rtmp_frames_executor, next,
|
||||||
|
(s for s in container.streams if s.type == 'video'))
|
||||||
|
logger.info(f"成功连接到 RTMP 流: {video_url} ({video_stream.width}x{video_stream.height})")
|
||||||
|
|
||||||
# ✅ 核心替换:使用cv2.VideoCapture + CAP_FFMPEG 打开RTMP流,最优参数配置
|
# 2. 提前获取一次OSD消息(验证MQTT是否正常)
|
||||||
# 切换到线程池执行opencv操作,避免阻塞协程
|
if device and topic_osd_info and method_osd_info:
|
||||||
cap = await loop.run_in_executor(
|
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
||||||
read_rtmp_frames_executor,
|
if osd_msg:
|
||||||
lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG)
|
logger.info(f"初始OSD消息获取成功: 高度={osd_msg.data.height}")
|
||||||
)
|
else:
|
||||||
# 设置核心参数 - 重中之重,缺一不可
|
logger.warning("初始OSD消息为空,可能MQTT尚未收到消息")
|
||||||
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 60000))
|
|
||||||
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 50000))
|
|
||||||
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)) # 无缓存,低延迟无残影
|
|
||||||
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25))
|
|
||||||
|
|
||||||
# 校验流是否成功打开
|
# 3. 关键优化:将同步的帧迭代放到线程池,通过生成器异步获取
|
||||||
is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened())
|
async def async_frame_generator():
|
||||||
if not is_opened:
|
"""异步帧生成器:在后台线程迭代同步帧,通过yield返回给事件循环"""
|
||||||
logger.warning(f"尝试 {attempt} 次:打开RTMP流失败,准备重试")
|
|
||||||
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None)
|
|
||||||
await asyncio.sleep(retry_delay)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# ✅ 自适应获取流的【真实分辨率】,无需手动探测,精准无误差
|
def sync_frame_iter():
|
||||||
width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
|
try:
|
||||||
height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
|
for frame in container.decode(video=0):
|
||||||
stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS))
|
# 线程内检查取消标志(需定期检查,避免线程无法退出)
|
||||||
|
if cancel_flag.is_set():
|
||||||
|
logger.info("后台线程检测到取消信号,停止帧迭代")
|
||||||
|
break
|
||||||
|
|
||||||
# 兜底分辨率,防止异常
|
# 确保是3通道RGB
|
||||||
if width is None or height is None or width == 0 or height == 0:
|
if len(frame.planes) == 1: # 如果是灰度图
|
||||||
logger.warning("使用默认分辨率 1920x1080")
|
gray = frame.to_ndarray(format='gray')
|
||||||
width, height = 1920, 1080
|
# 转换为3通道BGR(不修改尺寸)
|
||||||
|
bgr = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
|
||||||
logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS")
|
yield bgr
|
||||||
logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}")
|
else:
|
||||||
|
# 保持原始尺寸和色彩空间,只转换格式
|
||||||
# 初始化帧读取状态 (保留原逻辑不变)
|
bgr = frame.to_ndarray(format='bgr24')
|
||||||
frame_sequence = 0
|
yield bgr
|
||||||
last_timestamp = time_start
|
except Exception as e:
|
||||||
consecutive_corrupted_frames = 0 # 连续损坏帧计数
|
logger.error(f"同步帧迭代出错: {e}")
|
||||||
max_consecutive_corrupted = 10 # 最大连续损坏帧数
|
finally:
|
||||||
|
if container:
|
||||||
|
container.close()
|
||||||
|
logger.info("RTMP容器已关闭")
|
||||||
|
|
||||||
|
# 将同步迭代器包装为异步生成器
|
||||||
|
gen = sync_frame_iter()
|
||||||
while not cancel_flag.is_set():
|
while not cancel_flag.is_set():
|
||||||
try:
|
try:
|
||||||
# ✅ 核心替换:使用cv2.read()读取帧,线程池执行避免阻塞协程
|
# 每次获取一帧都通过线程池执行,避免长时间阻塞
|
||||||
ret, frame = await loop.run_in_executor(
|
frame = await loop.run_in_executor(read_rtmp_frames_executor, next, gen, None)
|
||||||
read_rtmp_frames_executor,
|
if frame is None: # 迭代结束
|
||||||
lambda: cap.read()
|
break
|
||||||
)
|
yield frame
|
||||||
|
except StopIteration:
|
||||||
|
logger.info("RTMP流帧迭代结束")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"异步获取帧出错: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
current_time_ns = time.time_ns()
|
# 4. 异步迭代帧(不阻塞事件循环)
|
||||||
frame_sequence += 1
|
async for frame in async_frame_generator():
|
||||||
frame_count += 1
|
if cancel_flag.is_set():
|
||||||
|
logger.info("检测到取消信号,停止读取帧")
|
||||||
|
break
|
||||||
|
|
||||||
# 处理帧数据(保留原逻辑完全不变,兼容原有的损坏帧处理)
|
|
||||||
img = None
|
|
||||||
is_corrupted = False
|
|
||||||
print(f"读取 read_rtmp_frames 判断")
|
|
||||||
try:
|
try:
|
||||||
if ret and frame is not None and frame.shape == (height, width, 3):
|
# 5. 帧转换也放到线程池(av.Frame.to_ndarray是CPU密集操作)
|
||||||
# 完整有效帧处理
|
img = frame.copy() # 确保不修改原始帧
|
||||||
img = frame.copy()
|
|
||||||
consecutive_corrupted_frames = 0 # 重置连续损坏计数
|
|
||||||
else:
|
|
||||||
# 损坏帧/空帧处理
|
|
||||||
logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}")
|
|
||||||
is_corrupted = True
|
|
||||||
consecutive_corrupted_frames += 1
|
|
||||||
|
|
||||||
# 创建替代帧,保留原逻辑
|
|
||||||
if consecutive_corrupted_frames <= max_consecutive_corrupted:
|
|
||||||
img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
||||||
else:
|
|
||||||
img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
||||||
logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
|
|
||||||
|
|
||||||
except Exception as frame_error:
|
|
||||||
logger.error(f"帧数据处理错误: {frame_error}")
|
|
||||||
# 创建空白帧作为后备,保留原逻辑
|
|
||||||
img = np.zeros((height, width, 3), dtype=np.uint8)
|
|
||||||
is_corrupted = True
|
|
||||||
consecutive_corrupted_frames += 1
|
|
||||||
print(f"读取 read_rtmp_frames 判断1")
|
|
||||||
|
|
||||||
# 获取OSD信息 - 保留原逻辑完全不变
|
|
||||||
osd_info = None
|
osd_info = None
|
||||||
|
|
||||||
|
# 6. 此时事件循环未被阻塞,MQTT消息已缓存,get_latest_message可即时获取
|
||||||
if device and topic_osd_info and method_osd_info:
|
if device and topic_osd_info and method_osd_info:
|
||||||
try:
|
|
||||||
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
|
||||||
if osd_msg and hasattr(osd_msg, 'data'):
|
if osd_msg and hasattr(osd_msg, 'data'):
|
||||||
osd_info = Air_Attitude(
|
osd_info = Air_Attitude(
|
||||||
@ -1532,77 +942,54 @@ async def read_rtmp_frames(
|
|||||||
latitude=osd_msg.data.latitude,
|
latitude=osd_msg.data.latitude,
|
||||||
longitude=osd_msg.data.longitude
|
longitude=osd_msg.data.longitude
|
||||||
)
|
)
|
||||||
except Exception as osd_error:
|
|
||||||
logger.warning(f"获取OSD信息失败: {osd_error}")
|
|
||||||
print(f"读取 read_rtmp_frames 判断2")
|
|
||||||
|
|
||||||
# 放入帧队列 - 保留原逻辑完全不变
|
# 7. 异步放入帧队列(避免队列满时阻塞)
|
||||||
if img is not None and not frame_queue.full():
|
if not frame_queue.full():
|
||||||
# 确保时间戳递增
|
|
||||||
if current_time_ns <= last_timestamp:
|
|
||||||
current_time_ns = last_timestamp + 1
|
|
||||||
last_timestamp = current_time_ns
|
|
||||||
|
|
||||||
# 统计信息 - 保留原逻辑完全不变
|
|
||||||
pic_count += 1
|
pic_count += 1
|
||||||
if current_time_ns - time_start > 1000000000: # 1秒
|
frame_count += 1 # 增加总帧数统计
|
||||||
elapsed_seconds = (current_time_ns - time_start) / 1e9
|
|
||||||
fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
|
|
||||||
corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
|
|
||||||
print(
|
|
||||||
f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
|
|
||||||
pic_count = 0
|
|
||||||
time_start = current_time_ns
|
|
||||||
|
|
||||||
print(f"读取 read_rtmp_frames 实时流")
|
|
||||||
|
|
||||||
# 准备帧数据 - 保留原逻辑完全不变
|
|
||||||
frame_data = {
|
|
||||||
"sequence": frame_sequence,
|
|
||||||
"frame": img,
|
|
||||||
"osd_info": osd_info,
|
|
||||||
"timestamp": current_time_ns,
|
|
||||||
"is_corrupted": is_corrupted
|
|
||||||
}
|
|
||||||
time_ns = time.time_ns()
|
time_ns = time.time_ns()
|
||||||
|
|
||||||
|
# 定期输出统计信息(每1000帧)
|
||||||
|
if time_ns - time_start > 1000000000:
|
||||||
|
print(f"readFrames {pic_count}")
|
||||||
|
pic_count = 0
|
||||||
|
time_start = time_ns
|
||||||
if img is not None and osd_info is not None:
|
if img is not None and osd_info is not None:
|
||||||
await frame_queue.put((img, osd_info, time_ns))
|
await frame_queue.put((img, osd_info, time_ns))
|
||||||
timestamp_frame_queue.append({
|
timestamp_frame_queue.append({
|
||||||
"timestamp": time_ns,
|
"timestamp": time_ns,
|
||||||
"frame": img
|
"frame": img
|
||||||
})
|
})
|
||||||
|
logger.debug(
|
||||||
|
f"已放入帧队列,累计帧数: {pic_count},队列剩余空间: {frame_queue.maxsize - frame_queue.qsize()}")
|
||||||
|
else:
|
||||||
|
logger.warning("帧队列已满,等待1ms后重试")
|
||||||
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
if frame_sequence % 100 == 0: # 每100帧输出一次日志
|
except Exception as frame_error:
|
||||||
logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
|
logger.error(f"处理单帧时出错: {frame_error}", exc_info=True)
|
||||||
|
continue
|
||||||
|
|
||||||
elif frame_queue.full():
|
except (av.AVError, IOError) as e:
|
||||||
logger.warning("帧队列已满,跳过此帧")
|
|
||||||
await asyncio.sleep(0.001) # 短暂等待
|
|
||||||
|
|
||||||
# 连续帧损坏触发重连前置判断 - 保留原逻辑
|
|
||||||
if consecutive_corrupted_frames > max_consecutive_corrupted:
|
|
||||||
logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑")
|
|
||||||
break
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"读取帧数据时出错: {e}", exc_info=True)
|
|
||||||
break
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
|
logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
|
||||||
if attempt < max_retries:
|
if attempt < max_retries:
|
||||||
await asyncio.sleep(retry_delay)
|
await asyncio.sleep(retry_delay)
|
||||||
else:
|
else:
|
||||||
raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
|
raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("read_rtmp_frames 收到取消信号")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"未知错误: {e}", exc_info=True)
|
||||||
|
if attempt < max_retries:
|
||||||
|
await asyncio.sleep(retry_delay)
|
||||||
finally:
|
finally:
|
||||||
# ✅ 释放opencv的VideoCapture资源,替代原FFmpeg进程的关闭逻辑
|
# 双重保险:确保容器关闭
|
||||||
if cap is not None:
|
if container and not container.closed:
|
||||||
await loop.run_in_executor(
|
await loop.run_in_executor(None, container.close)
|
||||||
read_rtmp_frames_executor,
|
logger.info("RTMP容器在finally中关闭")
|
||||||
lambda: cap.release()
|
|
||||||
)
|
|
||||||
logger.info("OpenCV VideoCapture 资源已释放")
|
|
||||||
|
|
||||||
|
# 最终统计信息
|
||||||
if frame_count > 0:
|
if frame_count > 0:
|
||||||
total_time = (time.time_ns() - time_start) / 1e9
|
total_time = (time.time_ns() - time_start) / 1e9
|
||||||
avg_fps = frame_count / total_time if total_time > 0 else 0
|
avg_fps = frame_count / total_time if total_time > 0 else 0
|
||||||
@ -1610,7 +997,7 @@ async def read_rtmp_frames(
|
|||||||
else:
|
else:
|
||||||
print("RTMP流读取失败,未获取到任何帧")
|
print("RTMP流读取失败,未获取到任何帧")
|
||||||
|
|
||||||
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
|
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {pic_count}")
|
||||||
|
|
||||||
|
|
||||||
# async def process_frames(detector: MultiYOLODetector):
|
# async def process_frames(detector: MultiYOLODetector):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user