优化拉流丢包的问题,增加宽松策略

This commit is contained in:
martin 2026-02-05 02:26:38 +08:00
parent 89181007c2
commit 146872a4dd
6 changed files with 703 additions and 213 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.2 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.3 MiB

View File

@ -1613,10 +1613,309 @@ from concurrent.futures import ThreadPoolExecutor
# logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
# SEI修复核心配置
os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" # 屏蔽SEI错误日志
MAX_RETRIES = 5
RETRY_DELAY = 2
BUFFER_SIZE = 1 # 最小缓冲区减少SEI积压
TARGET_FPS = 25
FOURCC = cv2.VideoWriter_fourcc(*'H264')
MAX_CORRUPTED = 30
#
# async def read_rtmp_frames(
# loop,
# read_rtmp_frames_executor: ThreadPoolExecutor,
# video_url: str,
# device: Optional[MQTTDevice] = None,
# topic_camera_osd: Optional[str] = None,
# method_camera_osd: Optional[str] = None,
# topic_osd_info: Optional[str] = None,
# method_osd_info: Optional[str] = None,
# cancel_flag: Optional[asyncio.Event] = None,
# frame_queue: asyncio.Queue = None,
# timestamp_frame_queue: TimestampedQueue = None
# ):
# """优化版RTMP流读取集成SEI修复+完整逻辑)
# 核心修复:
# 1. 屏蔽FFmpeg SEI截断日志
# 2. 精简OpenCV参数仅保留Python支持的核心配置
# 3. 增强帧格式校验和异常处理
# 4. 修复事件循环嵌套运行的致命错误
# 5. 优化重连机制和SEI帧跳过逻辑
# """
# print(f"开始读取RTMP流: {video_url}")
#
# # ✅ 关键修复1设置FFmpeg全局参数屏蔽SEI帧截断日志
# os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" # 只输出致命错误屏蔽SEI相关警告
#
# def ensure_cv8uc3(frame):
# """确保帧格式为CV_8UC3增强版修复"""
# if frame is None:
# return None
# if frame.dtype != np.uint8:
# frame = frame.astype(np.uint8)
# if len(frame.shape) == 2: # 灰度图转彩色
# frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
# elif frame.shape[2] == 4: # RGBA 转 BGR
# frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
# return frame
#
# # ✅ 关键修复2将init_capture改为同步函数核心解决事件循环冲突
# def init_capture(attempt: int = 1):
# """初始化捕获器同步版本带SEI修复参数+重连逻辑)"""
# print(f"第 {attempt}/{MAX_RETRIES} 次尝试初始化RTMP捕获器")
#
# # 指定FFmpeg后端创建捕获器
# cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG)
# if not cap.isOpened():
# raise RuntimeError(f"无法打开RTMP流 (第{attempt}次尝试)")
#
# # 仅保留Python版OpenCV支持的核心参数
# cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) # 缓冲区设为1减少SEI帧积压
# cap.set(cv2.CAP_PROP_FOURCC, FOURCC) # 指定H264解码器减少SEI解析开销
# cap.set(cv2.CAP_PROP_FPS, TARGET_FPS) # 同步流帧率,避免冗余处理
#
# # 获取分辨率(增加异常处理)
# try:
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# if width <= 0 or height <= 0:
# width, height = 1280, 720 # 默认分辨率兜底
# except Exception as e:
# print(f"获取分辨率失败,使用默认值: {e}")
# width, height = 1280, 720
#
# print(f"RTMP捕获器初始化成功分辨率: {width}x{height}")
# return cap, (width, height)
#
# # 初始化捕获器(支持重连)
# cap = None
# width, height = 1280, 720
# for attempt in range(1, MAX_RETRIES + 1):
# try:
# # ✅ 关键修复3直接在线程池执行同步的init_capture不再嵌套事件循环
# cap, (width, height) = await loop.run_in_executor(
# read_rtmp_frames_executor,
# init_capture, # 直接传函数不再用lambda嵌套loop.run_until_complete
# attempt # 传递attempt参数
# )
# break
# except RuntimeError as e:
# print(f"初始化失败: {e}")
# if attempt >= MAX_RETRIES:
# raise RuntimeError(f"所有{MAX_RETRIES}次初始化尝试均失败")
# await asyncio.sleep(RETRY_DELAY)
#
# try:
# last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8)
# consecutive_corrupted_frames = 0
# frame_count = 0
# time_start = time.time_ns()
#
# while not cancel_flag.is_set():
# try:
# # 读取帧(使用线程池避免阻塞事件循环)
# ret, frame = await loop.run_in_executor(
# read_rtmp_frames_executor,
# cap.read # 直接传递方法,更简洁
# )
#
# # 增强SEI帧/损坏帧处理逻辑
# current_frame = last_valid_frame.copy()
# if ret and frame is not None and frame.size > 0:
# # 正常帧:格式转换 + 更新兜底帧
# processed_frame = ensure_cv8uc3(frame)
# if processed_frame is not None:
# current_frame = processed_frame
# last_valid_frame = current_frame.copy()
# consecutive_corrupted_frames = 0
# else:
# consecutive_corrupted_frames += 1
# else:
# # SEI帧/损坏帧:使用兜底帧 + 计数
# consecutive_corrupted_frames += 1
# if consecutive_corrupted_frames % 15 == 0:
# print(f"跳过SEI/损坏帧 (连续: {consecutive_corrupted_frames})")
#
# # 获取OSD信息保留原逻辑
# osd_info = None
# if device and topic_osd_info and method_osd_info:
# try:
# osd_msg = await loop.run_in_executor(
# read_rtmp_frames_executor,
# device.get_latest_message,
# topic_osd_info,
# method_osd_info
# )
# if osd_msg and hasattr(osd_msg, 'data'):
# osd_info = Air_Attitude(
# gimbal_pitch=osd_msg.data.gimbal_pitch,
# gimbal_roll=osd_msg.data.gimbal_roll,
# gimbal_yaw=osd_msg.data.gimbal_yaw,
# height=osd_msg.data.height,
# latitude=osd_msg.data.latitude,
# longitude=osd_msg.data.longitude
# )
# except Exception as e:
# print(f"获取OSD信息失败: {str(e)}")
#
# # 时间戳和队列处理(优化超时逻辑)
# timestamp = time.time_ns()
# if frame_queue is not None and timestamp_frame_queue is not None:
# try:
# # 非阻塞放入队列,避免长时间等待
# if not frame_queue.full():
# await asyncio.wait_for(
# frame_queue.put((current_frame, osd_info, timestamp)),
# timeout=0.01
# )
# timestamp_frame_queue.append({
# "timestamp": timestamp,
# "frame": current_frame
# })
# frame_count += 1
#
# # 每秒打印一次帧数统计
# if timestamp - time_start > 1000000000:
# fps = frame_count / ((timestamp - time_start) / 1000000000)
# print(f"读取帧数: {frame_count} | 实时FPS: {fps:.2f}")
# frame_count = 0
# time_start = timestamp
# else:
# # 队列满时短暂休眠避免CPU占用过高
# await asyncio.sleep(0.001)
# except asyncio.TimeoutError:
# print("帧队列已满,跳过此帧")
#
# # 连续损坏帧触发重连逻辑(增强版)
# if consecutive_corrupted_frames > MAX_CORRUPTED:
# print(f"连续{MAX_CORRUPTED}帧异常,尝试重新初始化捕获器")
# # 释放旧捕获器
# if cap and cap.isOpened():
# await loop.run_in_executor(
# read_rtmp_frames_executor,
# cap.release
# )
# # 重新初始化
# reconnected = False
# for attempt in range(1, MAX_RETRIES + 1):
# try:
# cap, (width, height) = await loop.run_in_executor(
# read_rtmp_frames_executor,
# init_capture,
# attempt
# )
# consecutive_corrupted_frames = 0
# last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8)
# reconnected = True
# break
# except RuntimeError as e:
# print(f"重连失败 (第{attempt}次): {e}")
# if attempt >= MAX_RETRIES:
# raise RuntimeError("重连次数超限,停止尝试")
# await asyncio.sleep(RETRY_DELAY)
# if not reconnected:
# break
#
# except Exception as e:
# print(f"帧处理错误: {str(e)}")
# await asyncio.sleep(0.1)
#
# except asyncio.CancelledError:
# print("读取任务已取消")
# except Exception as e:
# print(f"流读取异常: {str(e)}")
# # 异常恢复机制(增强版)
# if not cancel_flag.is_set():
# await asyncio.sleep(RETRY_DELAY)
# # 重新初始化捕获器
# reconnected = False
# for attempt in range(1, MAX_RETRIES + 1):
# try:
# cap, (width, height) = await loop.run_in_executor(
# read_rtmp_frames_executor,
# init_capture,
# attempt
# )
# reconnected = True
# break
# except RuntimeError as e:
# print(f"异常恢复重连失败 (第{attempt}次): {e}")
# if attempt >= MAX_RETRIES:
# raise
# await asyncio.sleep(RETRY_DELAY)
# if reconnected:
# # 恢复后继续运行(递归调用自身,保持逻辑完整)
# await read_rtmp_frames(
# loop=loop,
# read_rtmp_frames_executor=read_rtmp_frames_executor,
# video_url=video_url,
# device=device,
# topic_camera_osd=topic_camera_osd,
# method_camera_osd=method_camera_osd,
# topic_osd_info=topic_osd_info,
# method_osd_info=method_osd_info,
# cancel_flag=cancel_flag,
# frame_queue=frame_queue,
# timestamp_frame_queue=timestamp_frame_queue
# )
# finally:
# # 确保资源释放
# if cap:
# await loop.run_in_executor(
# read_rtmp_frames_executor,
# lambda: cap.release() if cap.isOpened() else None
# )
# print("RTMP流读取已停止")
def init_capture_with_sei_fix(video_url: str, attempt: int = 1):
"""
修复SEI错误的VideoCapture初始化
"""
print(f"\n===== 第 {attempt}/{MAX_RETRIES} 次尝试连接 =====")
# ✅ 关键设置FFmpeg全局参数屏蔽SEI帧截断日志
os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR" # 只输出FFmpeg致命错误屏蔽警告包括SEI截断
# 初始化VideoCapture指定FFmpeg后端
cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
raise RuntimeError(f"无法打开RTMP流 (第{attempt}次尝试)")
# 设置核心参数
cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE) # 小缓冲区,实时推帧
cap.set(cv2.CAP_PROP_FOURCC, FOURCC) # 指定H264解码器
cap.set(cv2.CAP_PROP_FPS, TARGET_FPS) # 同步流帧率
# 获取流分辨率
try:
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
except:
width, height = 1440, 1080 # 默认分辨率
print(f"拉流成功:分辨率 {width}x{height}")
return cap, (width, height)
def ensure_cv8uc3(frame):
"""确保帧格式为8位3通道BGR"""
if frame is None or frame.size == 0:
return None
if frame.dtype != np.uint8:
frame = frame.astype(np.uint8)
if len(frame.shape) == 2: # 灰度图转彩色
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
elif frame.shape[2] == 4: # RGBA转BGR
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
return frame
async def read_rtmp_frames(
loop,
read_rtmp_frames_executor: ThreadPoolExecutor,
executor: ThreadPoolExecutor,
video_url: str,
device: Optional[MQTTDevice] = None,
topic_camera_osd: Optional[str] = None,
@ -1625,151 +1924,90 @@ async def read_rtmp_frames(
method_osd_info: Optional[str] = None,
cancel_flag: Optional[asyncio.Event] = None,
frame_queue: asyncio.Queue = None,
timestamp_frame_queue=None
timestamp_frame_queue: TimestampedQueue = None
):
"""
基于 OpenCV+FFmpeg 读取 RTMP 流帧兼容优化版解决URL格式错误+超时问题
核心优化改用FFmpeg环境变量传递容错参数提升兼容性
修复问题解决流超时URL识别失败的问题提升连接成功率
保留功能低延迟丢包容错断流自动重连自适应分辨率
"""
max_retries = 20
retry_delay = 3 # 小幅增加重试间隔,避免频繁重试给服务端压力
pic_count = 0
attempt = 0
time_start = time.time_ns()
frame_count = 0
"""异步RTMP读取修复SEI错误"""
print(f"开始测试 RTMP 拉流(验证 SEI 丢包问题): {video_url}")
print("测试说明1. 控制台无 SEI 截断警告即为正常 2. 观察 imshow 窗口画面是否稳定 3. 按 'q' 退出")
if cancel_flag is None:
cancel_flag = asyncio.Event()
if timestamp_frame_queue is None:
timestamp_frame_queue = []
if frame_queue is None:
frame_queue = asyncio.Queue(maxsize=30) # 给队列设置默认容量,防止内存溢出
# ✅ 关键改动1通过FFmpeg环境变量传递容错参数兼容所有OpenCV/FFmpeg环境
# 这是比URL拼接更稳定的方式OpenCV底层FFmpeg会自动读取这些环境变量
os.environ['FFMPEG_FLAGS'] = (
"nobuffer,fastseek,discardcorrupt," # 核心容错:无缓冲、快速寻址、丢弃损坏帧
"low_delay," # 低延迟模式
"ignore_err," # 忽略解码错误
"max_delay=0" # 最大延迟设为0
)
# 额外设置FFmpeg的日志级别减少无关输出可选
os.environ['FFREPORT'] = "level=warning"
# 创建预览队列
preview_task = None
print(f"开始读取RTMP流兼容模式已配置FFmpeg容错参数: {video_url}")
while not cancel_flag.is_set() and attempt < max_retries:
attempt += 1
if cancel_flag.is_set():
logger.info("收到停止信号,终止 RTMP 读取")
break
# 初始化捕获器
cap = None
width, height = None, None
stream_fps = None
width, height = 1280, 720
for attempt in range(1, MAX_RETRIES + 1):
try:
cap, (width, height) = await loop.run_in_executor(
executor,
init_capture_with_sei_fix,
video_url,
attempt
)
break
except RuntimeError as e:
print(f"{attempt} 次尝试失败:{e}{RETRY_DELAY} 秒后重试")
if attempt >= MAX_RETRIES:
raise RuntimeError(f"所有{MAX_RETRIES}次初始化尝试均失败")
await asyncio.sleep(RETRY_DELAY)
try:
logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8)
consecutive_corrupted_frames = 0
frame_count = 0
fps_count = 0
time_start = time.time_ns()
fps_start = time.time()
# ✅ 关键改动2恢复原始RTMP URL避免格式识别错误
cap = await loop.run_in_executor(
read_rtmp_frames_executor,
lambda: cv2.VideoCapture(video_url, cv2.CAP_FFMPEG)
)
# ✅ 关键改动3优化超时和缓存参数解决30秒超时问题
# 调整超时时间OpenCV的超时参数在部分环境下需要略大于实际预期推荐设为30秒
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 30000)) # 连接超时30秒适配底层逻辑
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000)) # 读取帧超时10秒避免长时间阻塞
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)) # 缓存从1调整为2平衡低延迟和稳定性丢包场景下更稳定
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_FPS, 25))
# 关闭自动处理功能,减少额外开销
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_AUTO_WB, 0))
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0))
# ✅ 关键改动4流连接预热提升首次打开成功率
await asyncio.sleep(0.5) # 短暂等待让FFmpeg完成底层初始化
# 校验流是否成功打开
is_opened = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.isOpened())
if not is_opened:
logger.warning(f"尝试 {attempt}打开RTMP流失败准备重试")
# 释放资源前先短暂等待,避免资源泄露
await asyncio.sleep(0.1)
await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.release() if cap else None)
await asyncio.sleep(retry_delay)
continue
# 自适应获取流的【真实分辨率】
width = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
height = await loop.run_in_executor(read_rtmp_frames_executor, lambda: int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
stream_fps = await loop.run_in_executor(read_rtmp_frames_executor, lambda: cap.get(cv2.CAP_PROP_FPS))
# 兜底分辨率,防止异常
if width is None or height is None or width == 0 or height == 0:
logger.warning("使用默认分辨率 1920x1080")
width, height = 1920, 1080
logger.info(f"视频分辨率: {width}x{height}, 流帧率: {stream_fps:.1f} FPS")
logger.info(f"成功启动 OpenCV+FFmpeg 连接 RTMP 流: {video_url}")
# 初始化帧读取状态
frame_sequence = 0
last_timestamp = time_start
consecutive_corrupted_frames = 0 # 连续损坏帧计数
max_consecutive_corrupted = 20 # 适配丢包场景的最大连续损坏帧数
print(f"拉流成功:分辨率 {width}x{height},开始预览(按 'q' 退出)")
while not cancel_flag.is_set():
try:
# 读取帧,线程池执行避免阻塞协程
ret, frame = await loop.run_in_executor(
read_rtmp_frames_executor,
lambda: cap.read()
)
# 异步读取帧
ret, frame = await loop.run_in_executor(executor, cap.read)
current_time_ns = time.time_ns()
frame_sequence += 1
# 帧有效性判断 + 格式转换
current_frame = last_valid_frame.copy()
if ret and frame is not None and frame.size > 0:
frame = ensure_cv8uc3(frame)
current_frame = frame.copy()
last_valid_frame = current_frame
consecutive_corrupted_frames = 0
frame_count += 1
# 处理帧数据(微调损坏帧判断,提升容错)
img = None
is_corrupted = False
try:
# 放宽帧形状判断,允许偶尔的分辨率小幅波动(丢包导致)
valid_frame_shape = (height, width, 3)
if ret and frame is not None and frame.shape in [valid_frame_shape, (height, width)]:
# 完整有效帧处理统一转为3通道BGR格式
img = frame.copy()
if len(img.shape) == 2: # 灰度帧转为3通道
img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
consecutive_corrupted_frames = 0 # 重置连续损坏计数
fps_count += 1
else:
# 损坏帧/空帧处理
if consecutive_corrupted_frames % 5 == 0: # 每5帧打印一次日志减少冗余输出
logger.warning(f"帧数据损坏/空帧, 序列: {frame_sequence}")
is_corrupted = True
consecutive_corrupted_frames += 1
if consecutive_corrupted_frames % 15 == 0:
print(f"跳过 SEI 帧/临时解码异常,连续异常帧:{consecutive_corrupted_frames}(画面稳定)")
# 创建替代帧
if consecutive_corrupted_frames <= max_consecutive_corrupted:
img = np.zeros((height, width, 3), dtype=np.uint8)
else:
img = np.zeros((height, width, 3), dtype=np.uint8)
logger.error(f"连续损坏帧过多 ({consecutive_corrupted_frames}),创建空白帧")
# # 放入预览队列
# if enable_preview and preview_queue is not None:
# try:
# # 清空旧帧,只保留最新的
# while not preview_queue.empty():
# try:
# preview_queue.get_nowait()
# except asyncio.QueueEmpty:
# break
#
# if not preview_queue.full():
# await asyncio.wait_for(preview_queue.put(current_frame), timeout=0.001)
# except (asyncio.QueueFull, asyncio.TimeoutError):
# pass
except Exception as frame_error:
logger.error(f"帧数据处理错误: {frame_error}")
img = np.zeros((height, width, 3), dtype=np.uint8)
is_corrupted = True
consecutive_corrupted_frames += 1
# 获取OSD信息 - 保留原逻辑完全不变
# OSD信息
osd_info = None
if device and topic_osd_info and method_osd_info:
try:
osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
osd_msg = await loop.run_in_executor(
executor, device.get_latest_message, topic_osd_info, method_osd_info
)
if osd_msg and hasattr(osd_msg, 'data'):
osd_info = Air_Attitude(
gimbal_pitch=osd_msg.data.gimbal_pitch,
@ -1779,82 +2017,328 @@ async def read_rtmp_frames(
latitude=osd_msg.data.latitude,
longitude=osd_msg.data.longitude
)
except Exception as osd_error:
logger.warning(f"获取OSD信息失败: {osd_error}")
# 放入帧队列 - 保留原逻辑完全不变
if img is not None and not frame_queue.full():
# 确保时间戳递增
if current_time_ns <= last_timestamp:
current_time_ns = last_timestamp + 1
last_timestamp = current_time_ns
# 统计信息
pic_count += 1
if current_time_ns - time_start > 1000000000: # 1秒
elapsed_seconds = (current_time_ns - time_start) / 1e9
fps = pic_count / elapsed_seconds if elapsed_seconds > 0 else 0
corrupted_rate = (consecutive_corrupted_frames / pic_count * 100) if pic_count > 0 else 0
print(
f"readFrames 序列:{frame_sequence} 帧数:{pic_count} FPS:{fps:.2f} 损坏率:{corrupted_rate:.1f}%")
pic_count = 0
time_start = current_time_ns
# 准备帧数据
frame_data = {
"sequence": frame_sequence,
"frame": img,
"osd_info": osd_info,
"timestamp": current_time_ns,
"is_corrupted": is_corrupted
}
time_ns = time.time_ns()
if img is not None and osd_info is not None:
await frame_queue.put((img, osd_info, time_ns))
timestamp_frame_queue.append({
"timestamp": time_ns,
"frame": img
})
if frame_sequence % 100 == 0: # 每100帧输出一次日志
logger.debug(f"已处理帧 序列:{frame_sequence} 累计:{frame_count}")
elif frame_queue.full():
logger.warning("帧队列已满,跳过此帧")
await asyncio.sleep(0.001) # 短暂等待
# 连续帧损坏触发重连前置判断
if consecutive_corrupted_frames > max_consecutive_corrupted:
logger.warning(f"连续{consecutive_corrupted_frames}帧损坏,触发流重连逻辑")
break
except Exception as e:
logger.error(f"读取帧数据时出错: {e}", exc_info=True)
break
if consecutive_corrupted_frames < 5: # 减少日志输出
print(f"获取OSD信息失败: {str(e)}")
except Exception as e:
logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
if attempt < max_retries:
await asyncio.sleep(retry_delay)
else:
raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
finally:
# 安全释放opencv的VideoCapture资源
if cap is not None:
await loop.run_in_executor(
read_rtmp_frames_executor,
lambda: cap.release() if cap.isOpened() else None
# 帧数统计
timestamp = time.time_ns()
if frame_queue is not None and timestamp_frame_queue is not None and ret:
try:
if not frame_queue.full():
await asyncio.wait_for(
frame_queue.put((current_frame, osd_info, timestamp)),
timeout=0.01
)
logger.info("OpenCV VideoCapture 资源已释放")
timestamp_frame_queue.append({"timestamp": timestamp, "frame": current_frame})
except (asyncio.QueueFull, asyncio.TimeoutError):
pass
if frame_count > 0:
total_time = (time.time_ns() - time_start) / 1e9
avg_fps = frame_count / total_time if total_time > 0 else 0
print(f"RTMP流读取完成总帧数: {frame_count}, 总时间: {total_time:.2f}秒, 平均FPS: {avg_fps:.2f}")
else:
print("RTMP流读取失败未获取到任何帧")
# 实时FPS统计
fps_elapsed = time.time() - fps_start
if fps_elapsed >= 1.0:
fps = fps_count / fps_elapsed
timestamp_elapsed = (timestamp - time_start) / 1000000000
if timestamp_elapsed > 0:
timestamp_fps = frame_count / timestamp_elapsed
print(f"实时 FPS{fps:.2f} | 累计FPS{timestamp_fps:.2f} | 分辨率:{width}x{height}")
logger.info(f"RTMP 流已结束或被取消,累计处理帧数: {frame_count}")
fps_count = 0
fps_start = time.time()
# 连续损坏帧重连
if consecutive_corrupted_frames > MAX_CORRUPTED:
print(f"连续 {MAX_CORRUPTED} 帧异常,停止预览,尝试重连")
if cap and cap.isOpened():
await loop.run_in_executor(executor, cap.release)
reconnected = False
for retry_attempt in range(1, MAX_RETRIES + 1):
try:
cap, (width, height) = await loop.run_in_executor(
executor, init_capture_with_sei_fix, video_url, retry_attempt
)
consecutive_corrupted_frames = 0
last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8)
reconnected = True
print(f"重连成功 (第{retry_attempt}次)")
break
except RuntimeError as e:
print(f"重连失败 (第{retry_attempt}次): {e}")
if retry_attempt >= MAX_RETRIES:
print("重连次数超限,退出")
cancel_flag.set()
await asyncio.sleep(RETRY_DELAY)
if not reconnected:
break
# 小幅延迟避免CPU占用过高
await asyncio.sleep(0.001)
except asyncio.CancelledError:
print("读取任务被取消")
break
except Exception as e:
print(f"帧处理错误: {str(e)}")
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("读取任务已取消")
except Exception as e:
print(f"流读取异常: {str(e)}")
finally:
# 资源释放
if cap:
await loop.run_in_executor(executor, lambda: cap.release() if cap.isOpened() else None)
print("释放VideoCapture资源")
if preview_task is not None:
preview_task.cancel()
try:
await preview_task
except asyncio.CancelledError:
pass
except Exception:
pass
await loop.run_in_executor(executor, cv2.destroyAllWindows)
print("RTMP流读取已停止")
#
# async def read_rtmp_frames_skip_sei(
# loop,
# read_rtmp_frames_executor: ThreadPoolExecutor,
# video_url: str,
# device: Optional[MQTTDevice] = None,
# topic_camera_osd: Optional[str] = None,
# method_camera_osd: Optional[str] = None,
# topic_osd_info: Optional[str] = None,
# method_osd_info: Optional[str] = None,
# cancel_flag: Optional[asyncio.Event] = None,
# frame_queue: asyncio.Queue = None,
# timestamp_frame_queue=None
# ):
# """
# 优化版(兼容 Python OpenCV + 屏蔽 SEI 帧错误)
# ✅ 核心修复1. 移除 Python 不支持的 CAP_PROP_OUTPUT_FORMAT 2. 屏蔽 FFmpeg SEI 截断日志
# ✅ 核心实现:配置解码器参数忽略 SEI 帧,保证实时拉流无性能损耗,保留异步逻辑和队列功能
# ✅ 解决问题:[h264 @ xxx] SEI type 245 truncated 错误,同时保证拉流实时性和有效性
# """
# max_retries = 20
# retry_delay = 3
# pic_count = 0
# attempt = 0
# time_start = cv2.getTickCount()
# frame_count = 0
#
# # ✅ 关键优化:设置 FFmpeg 全局参数,屏蔽 SEI 帧截断日志Python OpenCV 间接控制 FFmpeg
# # 禁止 FFmpeg 输出冗余警告,仅保留致命错误,彻底隐藏 SEI type 245 truncated 信息
# os.environ["OPENCV_FFMPEG_LOG_LEVEL"] = "ERROR"
#
# # 核心拉流参数(沿用同步测试代码的有效配置)
# BUFFER_SIZE = 1 # 减少缓存,不积压 SEI 帧
# TARGET_FPS = 30 # 适配 RTMP 流帧率
# FOURCC = cv2.VideoWriter_fourcc(*'H264') # 兼容 YUV420P 像素格式,减少 SEI 解析冗余
#
# # 初始化默认参数
# if cancel_flag is None:
# cancel_flag = asyncio.Event()
# if timestamp_frame_queue is None:
# timestamp_frame_queue = []
# if frame_queue is None:
# frame_queue = asyncio.Queue(maxsize=5) # 小队列减少缓存,保证实时性
#
# print(f"开始读取 RTMP 流OpenCV 优化版,跳过 SEI 帧错误): {video_url}")
#
# while not cancel_flag.is_set() and attempt < max_retries:
# attempt += 1
# if cancel_flag.is_set():
# logger.info("收到停止信号,终止 RTMP 读取")
# break
#
# cap = None
# consecutive_corrupted_frames = 0
# max_consecutive_corrupted = 30
# last_valid_frame = None
#
# try:
# logger.info(f"尝试连接 RTMP 流 (尝试 {attempt}/{max_retries}): {video_url}")
#
# # ✅ 步骤1初始化 VideoCapture指定 FFmpeg 后端,修复 Python 兼容问题
# def init_capture_with_sei_skip():
# # 1. 指定 FFmpeg 后端,启用高级参数配置
# cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG)
# if not cap.isOpened():
# return None
#
# # ✅ 步骤2配置解码器参数仅保留 Python 支持的核心参数,移除 CAP_PROP_OUTPUT_FORMAT
# # 2.1 关键配置:缓冲区大小设为 1实时推帧不缓存 SEI 帧
# cap.set(cv2.CAP_PROP_BUFFERSIZE, BUFFER_SIZE)
#
# # 2.2 配置像素格式,确保与硬件兼容(对应 PIX_FMT_YUV420P
# cap.set(cv2.CAP_PROP_FOURCC, FOURCC)
#
# # 2.3 配置帧率,与流帧率同步,避免冗余处理
# cap.set(cv2.CAP_PROP_FPS, TARGET_FPS)
#
# # ✅ 步骤3可选启用硬件加速保留原逻辑增加异常兼容
# try:
# # 启用 CUDA 硬件加速(对应 cv::CAP_PROP_HW_ACCELERATION
# cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_CUDA)
# logger.info("已启用 CUDA 硬件加速,降低 SEI 帧解析负担")
# except (AttributeError, cv2.error):
# try:
# # 备用:启用 VA-API 硬件加速Intel 平台)
# cap.set(cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_VAAPI)
# logger.info("已启用 VA-API 硬件加速,降低 SEI 帧解析负担")
# except (AttributeError, cv2.error):
# logger.info("未检测到硬件加速模块,使用软件解码(仍可跳过 SEI 帧错误)")
#
# # ✅ 步骤4替代 CAP_PROP_OUTPUT_FORMAT定义帧格式转换函数确保 CV_8UC3 格式)
# global ensure_cv8uc3 # 全局声明,方便后续帧处理调用
# def ensure_cv8uc3(frame):
# if frame.dtype != np.uint8:
# frame = frame.astype(np.uint8)
# if len(frame.shape) == 2: # 灰度图转彩色
# frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
# elif frame.shape[2] == 4: # RGBA 转 BGR
# frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
# return frame
#
# # ✅ 步骤5获取流分辨率初始化兜底帧增加容错防止获取失败
# try:
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# except:
# width, height = 1280, 720 # 默认分辨率,提升鲁棒性
# nonlocal last_valid_frame
# last_valid_frame = np.zeros((height, width, 3), dtype=np.uint8)
#
# return cap
#
# # 在线程池中初始化捕获器,避免阻塞事件循环
# cap = await loop.run_in_executor(
# read_rtmp_frames_executor,
# init_capture_with_sei_skip
# )
#
# if cap is None or not cap.isOpened():
# logger.warning(f"尝试 {attempt} 次OpenCV 捕获器初始化失败,准备重试")
# await asyncio.sleep(retry_delay)
# continue
#
# # 获取有效分辨率,打印启动日志
# try:
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# except:
# width, height = 1280, 720
# logger.info(f"OpenCV 捕获器已启动(跳过 SEI 帧),分辨率:{width}x{height}")
# logger.info("解码器已配置:忽略 SEI 帧,仅处理核心视频帧,无截断错误日志")
#
# # ✅ 步骤6读取视频流跳过 SEI 帧影响(整合格式转换,保留异步逻辑)
# while not cancel_flag.is_set():
# # 读取一帧(解码器自动跳过 SEI 帧,仅返回核心解码帧)
# def read_frame_core():
# ret, frame = cap.read()
# return ret, frame
#
# ret, frame = await loop.run_in_executor(
# read_rtmp_frames_executor,
# read_frame_core
# )
#
# current_time = cv2.getTickCount()
# frame_count += 1
# img = None
# is_corrupted = False
#
# # ✅ 流程判断:帧有效性检查 + 格式转换(替代 CAP_PROP_OUTPUT_FORMAT
# if ret and frame is not None and frame.size > 0:
# # 非 SEI 帧,解码成功,转换为 CV_8UC3 格式
# frame = ensure_cv8uc3(frame)
# img = frame.copy()
# last_valid_frame = img.copy()
# consecutive_corrupted_frames = 0
# is_corrupted = False
# else:
# # SEI 帧或解码异常,使用兜底帧(避免解码器状态异常)
# img = last_valid_frame.copy()
# is_corrupted = True
# consecutive_corrupted_frames += 1
# if consecutive_corrupted_frames % 15 == 0:
# logger.warning(f"跳过 SEI 帧/解码异常,连续损坏帧: {consecutive_corrupted_frames}(解码器状态稳定)")
#
# # 强制兜底,保证帧非空
# if img is None or img.size == 0:
# img = np.zeros((height, width, 3), dtype=np.uint8)
#
# # 获取 OSD 信息(保留原逻辑,实时性优先)
# osd_info = None
# if device and topic_osd_info and method_osd_info:
# try:
# osd_msg = device.get_latest_message(topic=topic_osd_info, method=method_osd_info)
# if osd_msg and hasattr(osd_msg, 'data'):
# osd_info = Air_Attitude(
# gimbal_pitch=osd_msg.data.gimbal_pitch,
# gimbal_roll=osd_msg.data.gimbal_roll,
# gimbal_yaw=osd_msg.data.gimbal_yaw,
# height=osd_msg.data.height,
# latitude=osd_msg.data.latitude,
# longitude=osd_msg.data.longitude
# )
# except Exception as osd_error:
# pass
#
# # 实时放入队列,满队列丢弃旧帧(保证实时性,保留原逻辑)
# if img is not None:
# try:
# if frame_queue.full():
# frame_queue.get_nowait()
# current_time_ns = time.time_ns()
# await frame_queue.put((img, osd_info, current_time_ns), timeout=0.001)
# except asyncio.TimeoutError:
# pass
#
# # 统计实时 FPS验证性能无额外 CPU 开销)
# pic_count += 1
# tick_count = cv2.getTickCount() - time_start
# elapsed_seconds = tick_count / cv2.getTickFrequency()
# if elapsed_seconds >= 1.0:
# fps = pic_count / elapsed_seconds
# print(f"实时统计FPS:{fps:.2f} 分辨率:{img.shape[1]}x{img.shape[0]} 无 SEI 帧截断错误日志")
# pic_count = 0
# time_start = cv2.getTickCount()
#
# # 连续损坏帧触发重连(解码器状态异常时重试)
# if consecutive_corrupted_frames > max_consecutive_corrupted:
# logger.warning(f"连续{consecutive_corrupted_frames}帧异常,触发捕获器重连")
# break
#
# except Exception as e:
# logger.error(f"RTMP 流错误 (尝试 {attempt}/{max_retries}): {e}")
# if attempt < max_retries:
# await asyncio.sleep(retry_delay)
# else:
# raise RuntimeError(f"无法连接 RTMP 流 (尝试 {max_retries} 次后失败): {video_url}")
# finally:
# # 安全释放捕获器资源
# if cap is not None and hasattr(cap, 'isOpened') and cap.isOpened():
# cap.release()
# logger.info("OpenCV 捕获器资源已释放")
#
# # 统计最终结果
# total_elapsed = (cv2.getTickCount() - time_start) / cv2.getTickFrequency()
# if frame_count > 0 and frame_count > max_retries:
# avg_fps = frame_count / total_elapsed if total_elapsed > 0 else 0
# print(f"\nRTMP 流读取完成,总有效帧数: {frame_count}, 平均 FPS: {avg_fps:.2f},无 SEI 帧截断错误日志")
# else:
# print("\nRTMP 流读取失败,未获取到有效帧")
#
# logger.info(f"RTMP 流已结束,累计处理帧数: {frame_count}")
@ -1942,7 +2426,7 @@ from PIL import Image, ImageDraw, ImageFont
chinese_font = None
try:
chinese_font = ImageFont.truetype("config/SIMSUN.TTC", 60)
chinese_font = ImageFont.truetype("config/SIMSUN.TTC", 20)
except:
chinese_font = ImageFont.load_default()
@ -2187,6 +2671,7 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
"box": [x1, y1, x2, y2]
})
label = f"{en_name}:{confidence:.2f}:{track_id}"
label_name = f"{en_name}"
# 计算文本位置
text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=4)[
0]
@ -2201,8 +2686,9 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
frame_copy = put_chinese_text(
temp_img,
# label, # 置信度、类别、用作测试
"", # 注释掉汉字
(text_x, text_y - text_height),
# "", # 注释掉汉字
label_name, # 仅显示汉字
(text_x, text_y- 40),
)
else:
cls_count += 1
@ -2225,6 +2711,7 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
# 准备标签文本
# label = f"{chinese_label.get(cls_id, class_name)}: {confidence:.2f}:{track_id}"
label = f"{confidence:.2f}:{track_id}"
label_name = f"{en_name}"
# 计算文本位置
text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fontScale=8, thickness=8)[0]
text_width, text_height = text_size[0], text_size[1]
@ -2240,8 +2727,9 @@ async def write_results_to_rtmp(task_id: str, output_url: str = None, input_fps:
frame_copy = put_chinese_text(
temp_img,
# label, # 置信度、类别、用作测试
"", # 注释掉汉字
(text_x, text_y - text_height),
# "", # 注释掉汉字
label_name, # 仅显示汉字
(text_x,text_y- 40),
)
if invade_state:

View File

@ -664,11 +664,11 @@ async def run_back_Multi_Detect_async(request, request_json, stop_event: asyncio
{
'path': config.model_path,
'engine_path': config.engine_path,
'so_path': config.so_path,
# 'engine_path': config.engine_path,
# 'so_path': config.so_path,
# # 测试代码
# 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine",
# 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll",
'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\renche.engine",
'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\renche\myplugins.dll",
# 工地安全帽
# 'engine_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\gdaq_hat_0926\gdaq_hat_0926.engine",
# 'so_path': r"D:\project\AI-PYTHON\Ai_tottle\engine\gdaq_hat_0926\myplugins.dll",
@ -1657,6 +1657,8 @@ async def websocket_endpoint(request: Request, ws):
camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt"
if model2 == "M4D":
camera_para_url = "meta_data/camera_para/xyzj_camera_para.txt"
elif model2 == "M3TD":
camera_para_url = "meta_data/camera_para/hami_camera_para .txt"
elif model2 == "M4TD":
camera_para_url = "meta_data/camera_para/hami_camera_para .txt"
camera_file_path = downFile(camera_para_url)