784 lines
29 KiB
Python
784 lines
29 KiB
Python
|
|
import cv2
|
|||
|
|
import subprocess
|
|||
|
|
from threading import Thread, Lock, Event
|
|||
|
|
import time
|
|||
|
|
import queue
|
|||
|
|
import numpy as np
|
|||
|
|
|
|||
|
|
import datetime
|
|||
|
|
import os
|
|||
|
|
from ultralytics import YOLO # 导入 Ultralytics YOLO 模型
|
|||
|
|
|
|||
|
|
# 全局变量
|
|||
|
|
ifAI = {'status': False}
|
|||
|
|
deskLock = Lock()
|
|||
|
|
frame_queue = queue.Queue(maxsize=60) # 增加帧缓冲队列大小
|
|||
|
|
processed_frame_queue = queue.Queue(maxsize=30) # 处理后的帧队列
|
|||
|
|
stop_event = Event()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def setIfAI(pb1):
|
|||
|
|
deskLock.acquire()
|
|||
|
|
ifAI['status'] = pb1
|
|||
|
|
deskLock.release()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def getIfAI():
|
|||
|
|
return ifAI['status']
|
|||
|
|
|
|||
|
|
|
|||
|
|
def stopAIVideo():
|
|||
|
|
print("正在停止AI视频处理...")
|
|||
|
|
setIfAI(False)
|
|||
|
|
stop_event.set()
|
|||
|
|
|
|||
|
|
# 等待足够长的时间确保资源释放
|
|||
|
|
wait_count = 0
|
|||
|
|
max_wait = 5 # 减少最大等待时间到5秒
|
|||
|
|
|
|||
|
|
while stop_event.is_set() and wait_count < max_wait:
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
wait_count += 1
|
|||
|
|
|
|||
|
|
if wait_count >= max_wait:
|
|||
|
|
print("警告: 停止AI视频处理超时,强制终止")
|
|||
|
|
# 不使用_thread._interrupt_main(),改用其他方式强制终止
|
|||
|
|
try:
|
|||
|
|
# 尝试终止可能运行的进程
|
|||
|
|
import os
|
|||
|
|
import signal
|
|||
|
|
import psutil
|
|||
|
|
|
|||
|
|
# 查找并终止可能的FFmpeg进程
|
|||
|
|
current_process = psutil.Process(os.getpid())
|
|||
|
|
for child in current_process.children(recursive=True):
|
|||
|
|
try:
|
|||
|
|
child_name = child.name().lower()
|
|||
|
|
if 'ffmpeg' in child_name:
|
|||
|
|
print(f"正在终止子进程: {child.pid} ({child_name})")
|
|||
|
|
child.send_signal(signal.SIGTERM)
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
else:
|
|||
|
|
print("AI视频处理已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def startAIVideo(video_path, output_url, m1, cls, confidence):
|
|||
|
|
if ifAI['status']:
|
|||
|
|
stopAIVideo()
|
|||
|
|
time.sleep(1)
|
|||
|
|
stop_event.clear()
|
|||
|
|
thread = Thread(target=startAIVideo2,
|
|||
|
|
args=(video_path, output_url, m1, cls, confidence))
|
|||
|
|
# cls2_thread = Thread(target=cls2_find, args=(video_path,m1, cls, confidence))
|
|||
|
|
# cls2_thread.daemon = True # 守护线程,主程序退出时线程也会退出
|
|||
|
|
thread.daemon = True # 守护线程,主程序退出时线程也会退出
|
|||
|
|
|
|||
|
|
thread.start()
|
|||
|
|
# cls2_thread.start()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def read_frames(cap, frame_queue):
|
|||
|
|
"""优化的帧读取线程"""
|
|||
|
|
frame_count = 0
|
|||
|
|
last_time = time.time()
|
|||
|
|
last_fps_time = time.time()
|
|||
|
|
# 减小目标帧间隔时间,提高读取帧率
|
|||
|
|
target_time_per_frame = 1.0 / 60.0 # 目标帧间隔时间(提高到60fps)
|
|||
|
|
|
|||
|
|
# 添加连接断开检测
|
|||
|
|
connection_error_count = 0
|
|||
|
|
max_connection_errors = 10 # 最多允许连续10次连接错误
|
|||
|
|
last_successful_read = time.time()
|
|||
|
|
max_read_wait = 30.0 # 30秒无法读取则认为连接断开
|
|||
|
|
|
|||
|
|
# 预先丢弃几帧,确保从新帧开始处理
|
|||
|
|
for _ in range(5):
|
|||
|
|
cap.grab()
|
|||
|
|
|
|||
|
|
while not stop_event.is_set():
|
|||
|
|
current_time = time.time()
|
|||
|
|
elapsed_time = current_time - last_time
|
|||
|
|
|
|||
|
|
# 检查是否长时间无法读取帧
|
|||
|
|
if current_time - last_successful_read > max_read_wait:
|
|||
|
|
print(f"警告: {max_read_wait}秒内未能读取到有效帧,可能连接已断开")
|
|||
|
|
stop_event.set()
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 帧率控制,但更积极地读取
|
|||
|
|
if elapsed_time < target_time_per_frame:
|
|||
|
|
time.sleep(target_time_per_frame - elapsed_time)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 当队列快满时,跳过一些帧以避免延迟累积
|
|||
|
|
if frame_queue.qsize() > frame_queue.maxsize * 0.8:
|
|||
|
|
# 跳过一些帧
|
|||
|
|
cap.grab()
|
|||
|
|
last_time = time.time()
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
ret, frame = cap.read()
|
|||
|
|
if not ret:
|
|||
|
|
print("拉流错误:无法读取帧")
|
|||
|
|
connection_error_count += 1
|
|||
|
|
if connection_error_count >= max_connection_errors:
|
|||
|
|
print(f"连续{max_connection_errors}次无法读取帧,可能连接已断开,正在停止流程...")
|
|||
|
|
stop_event.set()
|
|||
|
|
break
|
|||
|
|
time.sleep(0.5) # 短暂等待后重试
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 成功读取了帧,重置错误计数
|
|||
|
|
connection_error_count = 0
|
|||
|
|
last_successful_read = time.time()
|
|||
|
|
|
|||
|
|
frame_count += 1
|
|||
|
|
if frame_count % 60 == 0: # 每60帧计算一次FPS
|
|||
|
|
current_fps_time = time.time()
|
|||
|
|
fps = 60 / (current_fps_time - last_fps_time)
|
|||
|
|
print(f"拉流FPS: {fps:.2f}")
|
|||
|
|
last_fps_time = current_fps_time
|
|||
|
|
|
|||
|
|
last_time = time.time()
|
|||
|
|
frame_queue.put((frame, time.time())) # 添加时间戳
|
|||
|
|
|
|||
|
|
|
|||
|
|
def process_frames(frame_queue, processed_frame_queue, ov_model, cls, confidence, use_fp16=False):
|
|||
|
|
"""处理帧的线程,添加帧率控制,支持半精度FP16"""
|
|||
|
|
|
|||
|
|
import torch
|
|||
|
|
import time
|
|||
|
|
import queue
|
|||
|
|
import cv2
|
|||
|
|
|
|||
|
|
error_count = 0 # 添加错误计数器
|
|||
|
|
max_errors = 5 # 最大容许错误次数
|
|||
|
|
frame_count = 0
|
|||
|
|
process_times = [] # 用于计算平均处理时间
|
|||
|
|
|
|||
|
|
# 设置YOLO模型配置,提高性能
|
|||
|
|
ov_model.conf = confidence # 设置置信度阈值
|
|||
|
|
|
|||
|
|
# 将模型移到设备(GPU或CPU)
|
|||
|
|
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
|
|||
|
|
try:
|
|||
|
|
ov_model.to(device)
|
|||
|
|
# 调整批处理大小为1,减少内存占用
|
|||
|
|
if hasattr(ov_model, 'args') and hasattr(ov_model.args, 'batch'):
|
|||
|
|
ov_model.args.batch = 1
|
|||
|
|
# 启用半精度
|
|||
|
|
if use_fp16 and device.startswith('cuda') and hasattr(ov_model, 'model'):
|
|||
|
|
try:
|
|||
|
|
ov_model.model = ov_model.model.half()
|
|||
|
|
print("✅ 启用半精度 FP16 模式")
|
|||
|
|
except Exception as half_err:
|
|||
|
|
print(f"⚠️ 半精度转换失败: {half_err}")
|
|||
|
|
else:
|
|||
|
|
print("ℹ️ 半精度模式未启用")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"⚠️ 模型设备配置警告: {e}")
|
|||
|
|
|
|||
|
|
# 缓存先前的检测结果,用于提高稳定性
|
|||
|
|
last_results = None
|
|||
|
|
skip_counter = 0
|
|||
|
|
max_skip = 2 # 最多跳过几帧不处理
|
|||
|
|
|
|||
|
|
while not stop_event.is_set():
|
|||
|
|
try:
|
|||
|
|
if processed_frame_queue.qsize() >= processed_frame_queue.maxsize * 0.8:
|
|||
|
|
time.sleep(0.01)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
frame, timestamp = frame_queue.get(timeout=0.2)
|
|||
|
|
|
|||
|
|
if time.time() - timestamp > 0.3:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
frame_count += 1
|
|||
|
|
|
|||
|
|
if skip_counter > 0 and last_results is not None:
|
|||
|
|
skip_counter -= 1
|
|||
|
|
annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5)
|
|||
|
|
processed_frame_queue.put((annotated_frame, timestamp))
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
process_start = time.time()
|
|||
|
|
|
|||
|
|
resize_scale = 1.0
|
|||
|
|
qsize = frame_queue.qsize()
|
|||
|
|
maxsize = frame_queue.maxsize
|
|||
|
|
|
|||
|
|
if qsize > maxsize * 0.7:
|
|||
|
|
resize_scale = 0.4
|
|||
|
|
elif qsize > maxsize * 0.5:
|
|||
|
|
resize_scale = 0.6
|
|||
|
|
elif qsize > maxsize * 0.3:
|
|||
|
|
resize_scale = 0.8
|
|||
|
|
|
|||
|
|
if resize_scale < 1.0:
|
|||
|
|
process_frame = cv2.resize(frame, (0, 0), fx=resize_scale, fy=resize_scale)
|
|||
|
|
else:
|
|||
|
|
process_frame = frame
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
results = ov_model(process_frame, classes=cls, show=False)
|
|||
|
|
last_results = results[0]
|
|||
|
|
|
|||
|
|
if resize_scale < 1.0:
|
|||
|
|
annotated_frame = cv2.resize(results[0].plot(conf=False, line_width=1, font_size=1.5),
|
|||
|
|
(frame.shape[1], frame.shape[0]))
|
|||
|
|
else:
|
|||
|
|
annotated_frame = results[0].plot(conf=False, line_width=1, font_size=1.5)
|
|||
|
|
|
|||
|
|
if qsize > maxsize * 0.5:
|
|||
|
|
skip_counter = max_skip
|
|||
|
|
except Exception as infer_err:
|
|||
|
|
print(f"推理错误: {infer_err}")
|
|||
|
|
if last_results is not None:
|
|||
|
|
annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5)
|
|||
|
|
else:
|
|||
|
|
annotated_frame = frame.copy()
|
|||
|
|
|
|||
|
|
process_end = time.time()
|
|||
|
|
process_times.append(process_end - process_start)
|
|||
|
|
if len(process_times) > 30:
|
|||
|
|
process_times.pop(0)
|
|||
|
|
|
|||
|
|
if frame_count % 30 == 0:
|
|||
|
|
avg_process_time = sum(process_times) / len(process_times)
|
|||
|
|
fps = 1.0 / avg_process_time if avg_process_time > 0 else 0
|
|||
|
|
print(
|
|||
|
|
f"模型处理FPS: {fps:.2f}, 平均处理时间: {avg_process_time * 1000:.2f}ms, 队列大小: {qsize}, 缩放比例: {resize_scale:.2f}")
|
|||
|
|
|
|||
|
|
processed_frame_queue.put((annotated_frame, timestamp))
|
|||
|
|
error_count = 0
|
|||
|
|
except queue.Empty:
|
|||
|
|
continue
|
|||
|
|
except Exception as e:
|
|||
|
|
error_count += 1
|
|||
|
|
print(f"处理帧错误: {e}")
|
|||
|
|
if error_count >= max_errors:
|
|||
|
|
print(f"连续处理错误达到{max_errors}次,正在停止处理...")
|
|||
|
|
stop_event.set()
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
|
|||
|
|
def write_frames(processed_frame_queue, pipe, size):
|
|||
|
|
"""写入帧的线程,添加平滑处理"""
|
|||
|
|
last_write_time = time.time()
|
|||
|
|
target_time_per_frame = 1.0 / 30.0 # 30fps
|
|||
|
|
pipe_error_count = 0 # 添加错误计数
|
|||
|
|
max_pipe_errors = 3 # 最大容忍错误数
|
|||
|
|
frame_count = 0
|
|||
|
|
last_fps_time = time.time()
|
|||
|
|
skipped_frames = 0
|
|||
|
|
|
|||
|
|
# 使用队列存储最近几帧,用于在需要时进行插值
|
|||
|
|
recent_frames = []
|
|||
|
|
max_recent_frames = 5 # 增加缓存帧数量,提高平滑性
|
|||
|
|
|
|||
|
|
# 使用双缓冲机制提高写入速度
|
|||
|
|
buffer1 = bytearray(size[0] * size[1] * 3)
|
|||
|
|
buffer2 = bytearray(size[0] * size[1] * 3)
|
|||
|
|
current_buffer = buffer1
|
|||
|
|
|
|||
|
|
# 帧率控制参数
|
|||
|
|
min_frame_interval = target_time_per_frame * 0.5 # 允许的最小帧间隔
|
|||
|
|
max_frame_interval = target_time_per_frame * 2.0 # 允许的最大帧间隔
|
|||
|
|
|
|||
|
|
while not stop_event.is_set():
|
|||
|
|
try:
|
|||
|
|
# 获取处理后的帧,超时时间较短以便更平滑地处理
|
|||
|
|
frame, timestamp = processed_frame_queue.get(timeout=0.05)
|
|||
|
|
|
|||
|
|
# 存储最近的帧用于插值
|
|||
|
|
recent_frames.append(frame)
|
|||
|
|
if len(recent_frames) > max_recent_frames:
|
|||
|
|
recent_frames.pop(0)
|
|||
|
|
|
|||
|
|
current_time = time.time()
|
|||
|
|
elapsed = current_time - last_write_time
|
|||
|
|
|
|||
|
|
# 如果两帧间隔太短,考虑合并或跳过
|
|||
|
|
if elapsed < min_frame_interval and len(recent_frames) >= 2:
|
|||
|
|
skipped_frames += 1
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 如果两帧间隔太长,进行插值平滑
|
|||
|
|
if elapsed > max_frame_interval and len(recent_frames) >= 2:
|
|||
|
|
# 计算需要插入的帧数
|
|||
|
|
frames_to_insert = min(int(elapsed / target_time_per_frame), 3)
|
|||
|
|
|
|||
|
|
for i in range(frames_to_insert):
|
|||
|
|
# 创建插值帧
|
|||
|
|
weight = (i + 1) / (frames_to_insert + 1)
|
|||
|
|
interpolated_frame = cv2.addWeighted(recent_frames[-2], 1 - weight, recent_frames[-1], weight, 0)
|
|||
|
|
|
|||
|
|
# 切换双缓冲
|
|||
|
|
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
|
|||
|
|
|
|||
|
|
# 高效调整大小并写入
|
|||
|
|
interpolated_resized = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR)
|
|||
|
|
img_bytes = interpolated_resized.tobytes()
|
|||
|
|
|
|||
|
|
# 写入管道
|
|||
|
|
pipe.stdin.write(img_bytes)
|
|||
|
|
pipe.stdin.flush()
|
|||
|
|
|
|||
|
|
# 切换双缓冲
|
|||
|
|
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
|
|||
|
|
|
|||
|
|
# 正常写入当前帧 - 使用高效的调整大小方法
|
|||
|
|
resized_frame = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR)
|
|||
|
|
img_bytes = resized_frame.tobytes()
|
|||
|
|
pipe.stdin.write(img_bytes)
|
|||
|
|
pipe.stdin.flush()
|
|||
|
|
|
|||
|
|
frame_count += 1
|
|||
|
|
if frame_count % 30 == 0:
|
|||
|
|
current_fps_time = time.time()
|
|||
|
|
fps = 30 / (current_fps_time - last_fps_time)
|
|||
|
|
print(f"推流FPS: {fps:.2f}, 跳过的帧: {skipped_frames}, 队列大小: {processed_frame_queue.qsize()}")
|
|||
|
|
last_fps_time = current_fps_time
|
|||
|
|
skipped_frames = 0
|
|||
|
|
|
|||
|
|
last_write_time = time.time()
|
|||
|
|
pipe_error_count = 0 # 成功写入后重置错误计数
|
|||
|
|
|
|||
|
|
except queue.Empty:
|
|||
|
|
# 队列为空且有足够的最近帧时,考虑生成插值帧以保持流畅
|
|||
|
|
if len(recent_frames) >= 2 and time.time() - last_write_time > target_time_per_frame:
|
|||
|
|
try:
|
|||
|
|
# 创建插值帧
|
|||
|
|
interpolated_frame = cv2.addWeighted(recent_frames[-2], 0.5, recent_frames[-1], 0.5, 0)
|
|||
|
|
|
|||
|
|
# 切换双缓冲
|
|||
|
|
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
|
|||
|
|
|
|||
|
|
resized_frame = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR)
|
|||
|
|
img_bytes = resized_frame.tobytes()
|
|||
|
|
pipe.stdin.write(img_bytes)
|
|||
|
|
pipe.stdin.flush()
|
|||
|
|
last_write_time = time.time()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
continue
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"写入帧错误: {e}")
|
|||
|
|
pipe_error_count += 1
|
|||
|
|
if pipe_error_count >= max_pipe_errors:
|
|||
|
|
print("FFmpeg管道错误过多,正在终止进程...")
|
|||
|
|
stop_event.set() # 主动结束所有线程
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
|
|||
|
|
def cls2_find(video_path, m1, cls, confidence):
|
|||
|
|
try:
|
|||
|
|
ov_model = YOLO(m1)
|
|||
|
|
|
|||
|
|
# ---------------------------------MinIO 存储路径(用于后续上传)------------------------------------
|
|||
|
|
minio_path = "AIResults"
|
|||
|
|
|
|||
|
|
# -------------------------------获取当前日期,用于存储图像目录--------------------------------------
|
|||
|
|
date_str = datetime.datetime.now().strftime("%Y%m%d")
|
|||
|
|
save_dir = f"{date_str}"
|
|||
|
|
if not os.path.exists(save_dir):
|
|||
|
|
os.makedirs(save_dir)
|
|||
|
|
|
|||
|
|
# 打开视频流
|
|||
|
|
cap = cv2.VideoCapture(video_path)
|
|||
|
|
if not cap.isOpened():
|
|||
|
|
print("Error: Could not open video.")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 获取视频的帧率 (fps)
|
|||
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|||
|
|
|
|||
|
|
# -----------------------------根据模型设置类别-------------------------------------------------
|
|||
|
|
if m1 == "gdaq.pt": # 仅当使用 gdaq.pt 时,保存类别 2 和 4
|
|||
|
|
cls2 = [2, 4]
|
|||
|
|
elif m1 == "best.pt": # 仅当使用 best.pt 时,保存类别 0
|
|||
|
|
cls2 = [0]
|
|||
|
|
else: # 其它模型不保存
|
|||
|
|
cls2 = []
|
|||
|
|
|
|||
|
|
# ------------------------------------------cls2检测--------------------------------------------
|
|||
|
|
skip_frames = int(fps * 10) # 设置跳过帧数为 10 秒
|
|||
|
|
|
|||
|
|
while cap.isOpened() and not stop_event.is_set():
|
|||
|
|
if skip_frames > 0:
|
|||
|
|
skip_frames -= 1 # 逐帧减少
|
|||
|
|
cap.grab() # 仅抓取帧,不进行解码
|
|||
|
|
continue # 跳过处理
|
|||
|
|
|
|||
|
|
ret, frame = cap.read()
|
|||
|
|
if not ret:
|
|||
|
|
break # 无法读取帧时退出
|
|||
|
|
|
|||
|
|
# 目标检测
|
|||
|
|
results = ov_model(frame, conf=confidence, classes=cls, show=False)
|
|||
|
|
|
|||
|
|
for result in results:
|
|||
|
|
for box in result.boxes:
|
|||
|
|
cls_index = int(box.cls[0]) # 获取类别索引
|
|||
|
|
|
|||
|
|
# 如果检测到的类别在 cls2 里,跳过 10 秒
|
|||
|
|
if cls_index in cls2:
|
|||
|
|
skip_frames = int(fps * 10) # 设置跳过帧数为 10 秒
|
|||
|
|
# upload_and_insert_to_db(frame, ov_model, cls_index, save_dir, minio_path)
|
|||
|
|
filename = f"{save_dir}/frame_{int(cap.get(cv2.CAP_PROP_POS_FRAMES))}_cls2.jpg"
|
|||
|
|
cv2.imwrite(filename, frame)
|
|||
|
|
print(f"保存图像: {filename}")
|
|||
|
|
|
|||
|
|
# 绘制检测框
|
|||
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0])
|
|||
|
|
label = f"{result.names[cls_index]} {box.conf[0]:.2f}"
|
|||
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
|
|||
|
|
cv2.putText(frame, label, (x1, y1 - 10),
|
|||
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"cls2_find错误: {e}")
|
|||
|
|
finally:
|
|||
|
|
if 'cap' in locals() and cap is not None:
|
|||
|
|
cap.release()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def startAIVideo2(video_path, output_url, m1, cls, confidence):
|
|||
|
|
rtmp = output_url
|
|||
|
|
setIfAI(True)
|
|||
|
|
|
|||
|
|
# OpenCV拉流优化参数
|
|||
|
|
cap = None
|
|||
|
|
pipe = None
|
|||
|
|
read_thread = None
|
|||
|
|
process_thread = None
|
|||
|
|
write_thread = None
|
|||
|
|
ov_model = None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 设置环境变量,提高YOLO性能
|
|||
|
|
os.environ["OMP_NUM_THREADS"] = "4" # 限制OpenMP线程数
|
|||
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "0" # 使用GPU 0
|
|||
|
|
|
|||
|
|
# 导入必要的库
|
|||
|
|
try:
|
|||
|
|
import torch
|
|||
|
|
print(f"PyTorch 可用: {torch.__version__}, CUDA可用: {torch.cuda.is_available()}")
|
|||
|
|
if torch.cuda.is_available():
|
|||
|
|
print(f"GPU: {torch.cuda.get_device_name(0)}")
|
|||
|
|
# 预先分配GPU内存,避免动态分配造成的卡顿
|
|||
|
|
torch.cuda.empty_cache()
|
|||
|
|
except ImportError:
|
|||
|
|
print("未检测到PyTorch,将使用CPU模式")
|
|||
|
|
|
|||
|
|
# 预加载YOLO模型并增加重试机制
|
|||
|
|
print("预加载YOLO模型...")
|
|||
|
|
max_retries = 3
|
|||
|
|
retry_count = 0
|
|||
|
|
|
|||
|
|
# 模型加载参数优化 - 移除不支持的参数
|
|||
|
|
model_params = {}
|
|||
|
|
# 尝试检测YOLO版本以适配不同版本的参数
|
|||
|
|
try:
|
|||
|
|
# 先简单尝试加载模型,不带任何参数
|
|||
|
|
test_model = YOLO(m1)
|
|||
|
|
# 如果成功,检查可用的参数
|
|||
|
|
if hasattr(test_model, "task"):
|
|||
|
|
model_params["task"] = "detect" # 指定任务类型
|
|||
|
|
|
|||
|
|
# 检查是否支持half精度
|
|||
|
|
if torch.cuda.is_available():
|
|||
|
|
model_params["half"] = True
|
|||
|
|
|
|||
|
|
# 检查是否支持verbose参数
|
|||
|
|
import inspect
|
|||
|
|
if "verbose" in inspect.signature(YOLO.__init__).parameters:
|
|||
|
|
model_params["verbose"] = False
|
|||
|
|
|
|||
|
|
print(f"检测到支持的YOLO参数: {model_params}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"参数检测失败,将使用默认参数: {e}")
|
|||
|
|
model_params = {}
|
|||
|
|
|
|||
|
|
while retry_count < max_retries:
|
|||
|
|
try:
|
|||
|
|
# 使用优化的模型加载参数
|
|||
|
|
ov_model = YOLO(m1, **model_params)
|
|||
|
|
|
|||
|
|
# 预热模型 - 多次运行,确保模型完全加载
|
|||
|
|
dummy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8)
|
|||
|
|
for _ in range(3): # 多次预热
|
|||
|
|
ov_model(dummy_frame, classes=cls, conf=confidence, show=False)
|
|||
|
|
print("YOLO模型加载成功并预热完成")
|
|||
|
|
break
|
|||
|
|
except Exception as e:
|
|||
|
|
retry_count += 1
|
|||
|
|
print(f"YOLO模型加载失败 (尝试 {retry_count}/{max_retries}): {e}")
|
|||
|
|
# 如果是参数问题,尝试减少参数
|
|||
|
|
if "got an unexpected keyword argument" in str(e) and model_params:
|
|||
|
|
# 提取不支持的参数
|
|||
|
|
param_name = str(e).split("'")[-2] if "'" in str(e) else None
|
|||
|
|
if param_name and param_name in model_params:
|
|||
|
|
print(f"移除不支持的参数: {param_name}")
|
|||
|
|
del model_params[param_name]
|
|||
|
|
time.sleep(2) # 等待一段时间后重试
|
|||
|
|
|
|||
|
|
if ov_model is None:
|
|||
|
|
raise Exception("无法加载YOLO模型,达到最大重试次数")
|
|||
|
|
|
|||
|
|
# 优化模型推理参数
|
|||
|
|
ov_model.conf = confidence # 置信度阈值
|
|||
|
|
|
|||
|
|
# 打开视频流
|
|||
|
|
print(f"正在连接视频流: {video_path}")
|
|||
|
|
# 添加超时参数,防止长时间等待
|
|||
|
|
cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG)
|
|||
|
|
|
|||
|
|
# 设置FFMPEG的读取超时参数
|
|||
|
|
cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000) # 10秒连接超时
|
|||
|
|
cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) # 5秒读取超时
|
|||
|
|
|
|||
|
|
if not cap.isOpened():
|
|||
|
|
raise Exception(f"无法打开视频流: {video_path}")
|
|||
|
|
|
|||
|
|
# 优化OpenCV的拉流参数
|
|||
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 增加缓冲区
|
|||
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
|
|||
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
|
|||
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
|
|||
|
|
|
|||
|
|
# 尝试禁用自动曝光和自动白平衡,减少闪烁
|
|||
|
|
try:
|
|||
|
|
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # 关闭自动曝光
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"注意: 无法设置自动曝光参数: {e}")
|
|||
|
|
|
|||
|
|
# 设置全局帧队列大小
|
|||
|
|
global frame_queue, processed_frame_queue
|
|||
|
|
frame_queue = queue.Queue(maxsize=80) # 增加队列大小以处理波动
|
|||
|
|
processed_frame_queue = queue.Queue(maxsize=40)
|
|||
|
|
|
|||
|
|
# 检查RTMP服务器是否可达
|
|||
|
|
import socket
|
|||
|
|
rtmp_parts = rtmp.replace("rtmp://", "").split("/")[0].split(":")
|
|||
|
|
rtmp_host = rtmp_parts[0]
|
|||
|
|
rtmp_port = int(rtmp_parts[1]) if len(rtmp_parts) > 1 else 1935
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|||
|
|
s.settimeout(3)
|
|||
|
|
s.connect((rtmp_host, rtmp_port))
|
|||
|
|
s.close()
|
|||
|
|
print(f"RTMP服务器连接成功: {rtmp_host}:{rtmp_port}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"警告: RTMP服务器可能不可达 {rtmp_host}:{rtmp_port}: {e}")
|
|||
|
|
# 继续执行,有些服务器只接受RTMP协议而不接受TCP连接测试
|
|||
|
|
|
|||
|
|
size = (int(1280), int(720)) # 降低输出分辨率,减轻负担
|
|||
|
|
sizeStr = f"{size[0]}x{size[1]}"
|
|||
|
|
|
|||
|
|
# 优化FFMPEG推流参数,提高平滑度
|
|||
|
|
command = ['ffmpeg',
|
|||
|
|
'-y',
|
|||
|
|
'-f', 'rawvideo',
|
|||
|
|
'-vcodec', 'rawvideo',
|
|||
|
|
'-pix_fmt', 'bgr24',
|
|||
|
|
'-s', sizeStr,
|
|||
|
|
'-r', '30',
|
|||
|
|
'-i', '-',
|
|||
|
|
'-c:v', 'h264',
|
|||
|
|
'-pix_fmt', 'yuv420p',
|
|||
|
|
'-preset', 'ultrafast',
|
|||
|
|
'-tune', 'zerolatency',
|
|||
|
|
'-f', 'flv',
|
|||
|
|
'-g', '30',
|
|||
|
|
'-bufsize', '4000k', # 增加缓冲区
|
|||
|
|
'-maxrate', '4000k', # 增加最大比特率
|
|||
|
|
'-b:v', '2500k', # 设置视频比特率
|
|||
|
|
'-vsync', '1', # 帧率同步模式
|
|||
|
|
'-threads', '4', # 限制线程数
|
|||
|
|
'-reconnect', '1', # 断开时尝试重新连接
|
|||
|
|
'-reconnect_at_eof', '1',
|
|||
|
|
'-reconnect_streamed', '1',
|
|||
|
|
'-reconnect_delay_max', '5', # 最大重连延迟5秒
|
|||
|
|
'-x264opts', 'no-scenecut:keyint=30:min-keyint=30', # 固定关键帧间隔
|
|||
|
|
'-flvflags', 'no_duration_filesize',
|
|||
|
|
rtmp]
|
|||
|
|
|
|||
|
|
# 创建推流进程
|
|||
|
|
print(f"正在启动FFmpeg推流到: {rtmp}")
|
|||
|
|
pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE,
|
|||
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|||
|
|
bufsize=10 * 1024 * 1024) # 增加缓冲区大小
|
|||
|
|
|
|||
|
|
# 创建线程监控FFmpeg输出
|
|||
|
|
def monitor_ffmpeg_output(pipe):
|
|||
|
|
while not stop_event.is_set():
|
|||
|
|
line = pipe.stderr.readline().decode('utf-8', errors='ignore')
|
|||
|
|
if not line:
|
|||
|
|
continue
|
|||
|
|
if "Error" in line or "error" in line:
|
|||
|
|
print(f"FFmpeg错误: {line.strip()}")
|
|||
|
|
if "Cannot open connection" in line:
|
|||
|
|
print("无法连接到RTMP服务器,正在停止流程...")
|
|||
|
|
stop_event.set()
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
ffmpeg_monitor = Thread(target=monitor_ffmpeg_output, args=(pipe,))
|
|||
|
|
ffmpeg_monitor.daemon = True
|
|||
|
|
ffmpeg_monitor.start()
|
|||
|
|
|
|||
|
|
# 尝试设置线程优先级
|
|||
|
|
try:
|
|||
|
|
import psutil
|
|||
|
|
p = psutil.Process()
|
|||
|
|
p.nice(psutil.HIGH_PRIORITY_CLASS if os.name == 'nt' else -10)
|
|||
|
|
print("已设置程序为高优先级")
|
|||
|
|
except:
|
|||
|
|
print("无法设置进程优先级")
|
|||
|
|
|
|||
|
|
# 创建并启动线程
|
|||
|
|
read_thread = Thread(target=read_frames, args=(cap, frame_queue))
|
|||
|
|
process_thread = Thread(target=process_frames,
|
|||
|
|
args=(frame_queue, processed_frame_queue, ov_model, cls, confidence))
|
|||
|
|
write_thread = Thread(target=write_frames, args=(processed_frame_queue, pipe, size))
|
|||
|
|
|
|||
|
|
# 设置线程为守护线程
|
|||
|
|
read_thread.daemon = True
|
|||
|
|
process_thread.daemon = True
|
|||
|
|
write_thread.daemon = True
|
|||
|
|
|
|||
|
|
# 设置线程优先级
|
|||
|
|
read_thread.name = "ReadThread"
|
|||
|
|
process_thread.name = "ProcessThread"
|
|||
|
|
write_thread.name = "WriteThread"
|
|||
|
|
|
|||
|
|
# 启动线程
|
|||
|
|
print("开始处理视频流...")
|
|||
|
|
read_thread.start()
|
|||
|
|
process_thread.start()
|
|||
|
|
write_thread.start()
|
|||
|
|
|
|||
|
|
# 定期检查性能并输出日志
|
|||
|
|
performance_check_interval = 30 # 每30秒检查一次
|
|||
|
|
last_check_time = time.time()
|
|||
|
|
|
|||
|
|
# 等待线程结束
|
|||
|
|
while getIfAI() and not stop_event.is_set():
|
|||
|
|
# 检查线程是否都在运行
|
|||
|
|
if not (read_thread.is_alive() and process_thread.is_alive() and write_thread.is_alive()):
|
|||
|
|
print("检测到某个线程已停止运行,正在终止所有线程...")
|
|||
|
|
stop_event.set()
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 检查FFmpeg进程是否存活
|
|||
|
|
if pipe.poll() is not None:
|
|||
|
|
print(f"FFmpeg进程已退出,状态码: {pipe.returncode}")
|
|||
|
|
stop_event.set()
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 定期输出性能统计
|
|||
|
|
current_time = time.time()
|
|||
|
|
if current_time - last_check_time > performance_check_interval:
|
|||
|
|
print(f"性能统计 - 输入队列: {frame_queue.qsize()}/{frame_queue.maxsize}, " +
|
|||
|
|
f"输出队列: {processed_frame_queue.qsize()}/{processed_frame_queue.maxsize}")
|
|||
|
|
last_check_time = current_time
|
|||
|
|
|
|||
|
|
time.sleep(0.1)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"发生错误: {e}")
|
|||
|
|
finally:
|
|||
|
|
# 清理资源
|
|||
|
|
print("正在清理资源...")
|
|||
|
|
stop_event.set()
|
|||
|
|
setIfAI(False)
|
|||
|
|
|
|||
|
|
# 等待线程结束,增加超时机制
|
|||
|
|
timeout = 3 # 减少超时等待时间到3秒
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
# 等待线程正常结束
|
|||
|
|
if read_thread and process_thread and write_thread:
|
|||
|
|
while (
|
|||
|
|
read_thread.is_alive() or process_thread.is_alive() or write_thread.is_alive()) and time.time() - start_time < timeout:
|
|||
|
|
time.sleep(0.1)
|
|||
|
|
|
|||
|
|
# 强制结束持续运行的线程
|
|||
|
|
if read_thread.is_alive() or process_thread.is_alive() or write_thread.is_alive():
|
|||
|
|
print("警告: 部分线程未能正常结束,强制终止")
|
|||
|
|
|
|||
|
|
# 清空队列
|
|||
|
|
try:
|
|||
|
|
while not frame_queue.empty():
|
|||
|
|
frame_queue.get_nowait()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
while not processed_frame_queue.empty():
|
|||
|
|
processed_frame_queue.get_nowait()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 确保资源完全释放
|
|||
|
|
if cap is not None:
|
|||
|
|
try:
|
|||
|
|
cap.release()
|
|||
|
|
print("视频捕获资源已释放")
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if pipe is not None:
|
|||
|
|
try:
|
|||
|
|
# 先向ffmpeg发送信号,让它正常结束
|
|||
|
|
try:
|
|||
|
|
import signal
|
|||
|
|
os.kill(pipe.pid, signal.SIGTERM)
|
|||
|
|
print(f"已向FFmpeg进程(PID:{pipe.pid})发送终止信号")
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 关闭管道
|
|||
|
|
pipe.stdin.close()
|
|||
|
|
|
|||
|
|
# 尝试正常终止
|
|||
|
|
pipe.terminate()
|
|||
|
|
|
|||
|
|
# 等待一段时间让进程自行结束
|
|||
|
|
try:
|
|||
|
|
pipe.wait(timeout=2) # 等待进程结束
|
|||
|
|
print(f"FFmpeg进程已终止,退出码: {pipe.returncode}")
|
|||
|
|
except:
|
|||
|
|
# 如果等待超时,强制杀死进程
|
|||
|
|
pipe.kill()
|
|||
|
|
print("FFmpeg进程已强制终止")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"关闭FFmpeg时出错: {e}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
cv2.destroyAllWindows()
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
print("所有资源已清理完毕")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
sn = "1581F6QAD243C00BP71E"
|
|||
|
|
# video_path = f"rtmp://112.44.103.230:1935/live/{sn}"
|
|||
|
|
video_path = f"rtmp://112.44.103.230:1935/live/123456"
|
|||
|
|
# FFmpeg 推流地址
|
|||
|
|
rtmp = f"rtmp://112.44.103.230:1935/live/{sn}ai"
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
startAIVideo2(video_path, rtmp, "best.pt", [0, 1, 2, 3, 4], 0.4)
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
print("程序被用户中断")
|
|||
|
|
stopAIVideo()
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"程序异常: {e}")
|