AI_python_yoooger/ai2/cv_video.py
2025-08-01 15:42:38 +08:00

571 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from threading import Thread, Lock, Event
import time
import queue
from ultralytics import YOLO # 导入 Ultralytics YOLO 模型
import os, cv2, torch, time, queue, subprocess
import numpy as np
# 全局变量
ifAI = {'status': False}
deskLock = Lock()
frame_queue = queue.Queue(maxsize=60) # 增加帧缓冲队列大小
processed_frame_queue = queue.Queue(maxsize=30) # 处理后的帧队列
stop_event = Event()
def setIfAI(pb1):
deskLock.acquire()
ifAI['status'] = pb1
deskLock.release()
def getIfAI():
return ifAI['status']
def stopAIVideo():
print("正在停止AI视频处理...")
setIfAI(False)
stop_event.set()
# 等待足够长的时间确保资源释放
wait_count = 0
max_wait = 5 # 减少最大等待时间到5秒
while stop_event.is_set() and wait_count < max_wait:
time.sleep(0.5)
wait_count += 1
if wait_count >= max_wait:
print("警告: 停止AI视频处理超时强制终止")
# 不使用_thread._interrupt_main(),改用其他方式强制终止
try:
# 尝试终止可能运行的进程
import os
import signal
import psutil
# 查找并终止可能的FFmpeg进程
current_process = psutil.Process(os.getpid())
for child in current_process.children(recursive=True):
try:
child_name = child.name().lower()
if 'ffmpeg' in child_name:
print(f"正在终止子进程: {child.pid} ({child_name})")
child.send_signal(signal.SIGTERM)
except:
pass
except:
pass
else:
print("AI视频处理已停止")
def startAIVideo(video_path, output_url, m1, cls, confidence):
if ifAI['status']:
stopAIVideo()
time.sleep(1)
stop_event.clear()
thread = Thread(target=startAIVideo2,
args=(video_path, output_url, m1, cls, confidence))
# cls2_thread = Thread(target=cls2_find, args=(video_path,m1, cls, confidence))
# cls2_thread.daemon = True # 守护线程,主程序退出时线程也会退出
thread.daemon = True # 守护线程,主程序退出时线程也会退出
thread.start()
# cls2_thread.start()
def read_frames(cap, frame_queue):
"""优化的帧读取线程"""
frame_count = 0
last_time = time.time()
last_fps_time = time.time()
# 减小目标帧间隔时间,提高读取帧率
target_time_per_frame = 1.0 / 60.0 # 目标帧间隔时间提高到60fps
# 添加连接断开检测
connection_error_count = 0
max_connection_errors = 10 # 最多允许连续10次连接错误
last_successful_read = time.time()
max_read_wait = 30.0 # 30秒无法读取则认为连接断开
# 预先丢弃几帧,确保从新帧开始处理
for _ in range(5):
cap.grab()
while not stop_event.is_set():
current_time = time.time()
elapsed_time = current_time - last_time
# 检查是否长时间无法读取帧
if current_time - last_successful_read > max_read_wait:
print(f"警告: {max_read_wait}秒内未能读取到有效帧,可能连接已断开")
stop_event.set()
break
# 帧率控制,但更积极地读取
if elapsed_time < target_time_per_frame:
time.sleep(target_time_per_frame - elapsed_time)
continue
# 当队列快满时,跳过一些帧以避免延迟累积
if frame_queue.qsize() > frame_queue.maxsize * 0.8:
# 跳过一些帧
cap.grab()
last_time = time.time()
continue
ret, frame = cap.read()
if not ret:
print("拉流错误:无法读取帧")
connection_error_count += 1
if connection_error_count >= max_connection_errors:
print(f"连续{max_connection_errors}次无法读取帧,可能连接已断开,正在停止流程...")
stop_event.set()
break
time.sleep(0.5) # 短暂等待后重试
continue
# 成功读取了帧,重置错误计数
connection_error_count = 0
last_successful_read = time.time()
frame_count += 1
if frame_count % 60 == 0: # 每60帧计算一次FPS
current_fps_time = time.time()
fps = 60 / (current_fps_time - last_fps_time)
print(f"拉流FPS: {fps:.2f}")
last_fps_time = current_fps_time
last_time = time.time()
frame_queue.put((frame, time.time())) # 添加时间戳
def process_frames(frame_queue, processed_frame_queue, ov_model, cls, confidence):
"""处理帧的线程,添加帧率控制"""
error_count = 0 # 添加错误计数器
max_errors = 5 # 最大容许错误次数
frame_count = 0
process_times = [] # 用于计算平均处理时间
# 设置YOLO模型配置提高性能
ov_model.conf = confidence # 设置置信度阈值
# 优化推理性能
try:
# 尝试启用ONNX Runtime加速
ov_model.to('cuda:0' if torch.cuda.is_available() else 'cpu')
# 调整批处理大小为1减少内存占用
if hasattr(ov_model, 'args') and hasattr(ov_model.args, 'batch'):
ov_model.args.batch = 1
# 使用half精度提高性能
if torch.cuda.is_available() and hasattr(ov_model, 'model'):
try:
ov_model.model = ov_model.model.half()
except Exception as half_err:
print(f"半精度转换失败: {half_err}")
except Exception as e:
print(f"模型优化配置警告: {e}")
# 缓存先前的检测结果,用于提高稳定性
last_results = None
skip_counter = 0
max_skip = 2 # 最多跳过几帧不处理
while not stop_event.is_set():
try:
if processed_frame_queue.qsize() >= processed_frame_queue.maxsize * 0.8:
# 如果输出队列接近满,等待一段时间
time.sleep(0.01)
continue
frame, timestamp = frame_queue.get(timeout=0.2)
# 处理延迟过大的帧
if time.time() - timestamp > 0.3: # 减少延迟阈值
continue
frame_count += 1
# 间隔采样每n帧处理一次减少计算量
if skip_counter > 0 and last_results is not None:
skip_counter -= 1
# 使用上次的检测结果
annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5)
processed_frame_queue.put((annotated_frame, timestamp))
continue
process_start = time.time()
# 动态调整处理尺寸,根据队列积压情况
resize_scale = 1.0
if frame_queue.qsize() > frame_queue.maxsize * 0.7:
resize_scale = 0.4 # 高负载时大幅降低分辨率
elif frame_queue.qsize() > frame_queue.maxsize * 0.5:
resize_scale = 0.6 # 中等负载时适度降低分辨率
elif frame_queue.qsize() > frame_queue.maxsize * 0.3:
resize_scale = 0.8 # 轻微负载时轻微降低分辨率
# 调整图像尺寸以加快处理
if resize_scale < 1.0:
process_frame = cv2.resize(frame, (0, 0), fx=resize_scale, fy=resize_scale)
else:
process_frame = frame
# 执行推理
try:
results = ov_model(process_frame, classes=cls, show=False)
last_results = results[0] # 保存检测结果用于后续帧
# 统计数量
num_objs = len(last_results.boxes) if hasattr(last_results, "boxes") else 0
# 统计各类别数量
class_counts = {}
if hasattr(last_results, "boxes") and hasattr(last_results.boxes, "cls"):
for c in last_results.boxes.cls.cpu().numpy():
class_counts[int(c)] = class_counts.get(int(c), 0) + 1
# # 绘制检测框
if resize_scale < 1.0:
annotated_frame = cv2.resize(results[0].plot(conf=False, line_width=1, font_size=1.5),
(frame.shape[1], frame.shape[0]))
else:
annotated_frame = results[0].plot(conf=False, line_width=1, font_size=1.5)
# 在画面左上角写入统计信息
text = f"Total: {num_objs} " + " ".join([f"cls{c}:{n}" for c, n in class_counts.items()])
cv2.putText(annotated_frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
# 在负载高时启用跳帧处理
if frame_queue.qsize() > frame_queue.maxsize * 0.5:
skip_counter = max_skip
except Exception as infer_err:
print(f"推理错误: {infer_err}")
if last_results is not None:
# 使用上次的结果
annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5)
else:
# 如果没有上次的结果,简单返回原始帧
annotated_frame = frame.copy()
process_end = time.time()
process_times.append(process_end - process_start)
if len(process_times) > 30:
process_times.pop(0)
if frame_count % 30 == 0:
avg_process_time = sum(process_times) / len(process_times)
fps = 1.0 / avg_process_time if avg_process_time > 0 else 0
print(f"模型处理FPS: {fps:.2f}, 平均处理时间: {avg_process_time*1000:.2f}ms, 队列大小: {frame_queue.qsize()}, 缩放比例: {resize_scale:.2f}")
processed_frame_queue.put((annotated_frame, timestamp))
error_count = 0 # 成功处理后重置错误计数
except queue.Empty:
continue
except Exception as e:
error_count += 1
print(f"处理帧错误: {e}")
if error_count >= max_errors:
print(f"连续处理错误达到{max_errors}次,正在停止处理...")
stop_event.set()
break
def write_frames(processed_frame_queue, pipe, size):
"""写入帧的线程,添加平滑处理"""
last_write_time = time.time()
target_time_per_frame = 1.0 / 30.0 # 30fps
pipe_error_count = 0 # 添加错误计数
max_pipe_errors = 3 # 最大容忍错误数
frame_count = 0
last_fps_time = time.time()
skipped_frames = 0
# 使用队列存储最近几帧,用于在需要时进行插值
recent_frames = []
max_recent_frames = 5 # 增加缓存帧数量,提高平滑性
# 使用双缓冲机制提高写入速度
buffer1 = bytearray(size[0] * size[1] * 3)
buffer2 = bytearray(size[0] * size[1] * 3)
current_buffer = buffer1
# 帧率控制参数
min_frame_interval = target_time_per_frame * 0.5 # 允许的最小帧间隔
max_frame_interval = target_time_per_frame * 2.0 # 允许的最大帧间隔
while not stop_event.is_set():
try:
# 获取处理后的帧,超时时间较短以便更平滑地处理
frame, timestamp = processed_frame_queue.get(timeout=0.05)
# 存储最近的帧用于插值
recent_frames.append(frame)
if len(recent_frames) > max_recent_frames:
recent_frames.pop(0)
current_time = time.time()
elapsed = current_time - last_write_time
# 如果两帧间隔太短,考虑合并或跳过
if elapsed < min_frame_interval and len(recent_frames) >= 2:
skipped_frames += 1
continue
# 如果两帧间隔太长,进行插值平滑
if elapsed > max_frame_interval and len(recent_frames) >= 2:
# 计算需要插入的帧数
frames_to_insert = min(int(elapsed / target_time_per_frame), 3)
for i in range(frames_to_insert):
# 创建插值帧
weight = (i + 1) / (frames_to_insert + 1)
interpolated_frame = cv2.addWeighted(recent_frames[-2], 1-weight, recent_frames[-1], weight, 0)
# 切换双缓冲
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
# 高效调整大小并写入
interpolated_resized = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR)
img_bytes = interpolated_resized.tobytes()
# 写入管道
pipe.stdin.write(img_bytes)
pipe.stdin.flush()
# 切换双缓冲
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
# 正常写入当前帧 - 使用高效的调整大小方法
resized_frame = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR)
img_bytes = resized_frame.tobytes()
pipe.stdin.write(img_bytes)
pipe.stdin.flush()
frame_count += 1
if frame_count % 30 == 0:
current_fps_time = time.time()
fps = 30 / (current_fps_time - last_fps_time)
print(f"推流FPS: {fps:.2f}, 跳过的帧: {skipped_frames}, 队列大小: {processed_frame_queue.qsize()}")
last_fps_time = current_fps_time
skipped_frames = 0
last_write_time = time.time()
pipe_error_count = 0 # 成功写入后重置错误计数
except queue.Empty:
# 队列为空且有足够的最近帧时,考虑生成插值帧以保持流畅
if len(recent_frames) >= 2 and time.time() - last_write_time > target_time_per_frame:
try:
# 创建插值帧
interpolated_frame = cv2.addWeighted(recent_frames[-2], 0.5, recent_frames[-1], 0.5, 0)
# 切换双缓冲
current_buffer = buffer2 if current_buffer is buffer1 else buffer1
resized_frame = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR)
img_bytes = resized_frame.tobytes()
pipe.stdin.write(img_bytes)
pipe.stdin.flush()
last_write_time = time.time()
except Exception:
pass
continue
except Exception as e:
print(f"写入帧错误: {e}")
pipe_error_count += 1
if pipe_error_count >= max_pipe_errors:
print("FFmpeg管道错误过多正在终止进程...")
stop_event.set() # 主动结束所有线程
break
def startAIVideo2(video_path, output_url, m1, cls, confidence):
rtmp = output_url
setIfAI(True)
cap = None
pipe = None
read_thread = None
process_thread = None
write_thread = None
ov_model = None
try:
global frame_queue, processed_frame_queue, stop_event
stop_event = Event()
os.environ["OMP_NUM_THREADS"] = "4"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
print(f"PyTorch 可用: {torch.__version__}, CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
torch.cuda.empty_cache()
print("预加载YOLO模型...")
model_params = {}
try:
test_model = YOLO(m1)
if hasattr(test_model, "task"):
model_params["task"] = "detect"
if torch.cuda.is_available():
model_params["half"] = True
import inspect
if "verbose" in inspect.signature(YOLO.__init__).parameters:
model_params["verbose"] = False
except Exception as e:
print(f"参数检测失败,将使用默认参数: {e}")
model_params = {}
retry_count = 0
while retry_count < 3:
try:
ov_model = YOLO(m1, **model_params)
dummy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8)
for _ in range(3):
ov_model(dummy_frame, classes=cls, conf=confidence, show=False)
print("YOLO模型加载成功并预热完成")
break
except Exception as e:
retry_count += 1
print(f"YOLO模型加载失败(尝试 {retry_count}/3): {e}")
if "unexpected keyword" in str(e):
param = str(e).split("'")[-2]
if param in model_params:
print(f"移除不支持的参数: {param}")
del model_params[param]
time.sleep(2)
if ov_model is None:
raise Exception("无法加载YOLO模型")
ov_model.conf = confidence
print(f"正在连接视频流: {video_path}")
cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 5)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
if not cap.isOpened():
raise Exception(f"无法打开视频流: {video_path}")
try:
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0)
except Exception as e:
print(f"无法设置自动曝光参数: {e}")
frame_queue = queue.Queue(maxsize=80)
processed_frame_queue = queue.Queue(maxsize=40)
size = (1920, 1080)
sizeStr = f"{size[0]}x{size[1]}"
command = [
'ffmpeg', '-y',
'-f', 'rawvideo', '-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', sizeStr,
'-r', '30',
'-i', '-',
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-x264-params', 'sei=0',
'-pix_fmt', 'yuv420p',
'-f', 'flv',
'-g', '30',
'-keyint_min', '30',
'-sc_threshold', '0',
'-b:v', '2500k',
'-maxrate', '3000k',
'-bufsize', '3000k',
'-threads', '4',
'-vsync', '1',
rtmp
]
print(f"启动FFmpeg推流到: {rtmp}")
pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
bufsize=10*1024*1024)
def monitor_ffmpeg_output(pipe):
while not stop_event.is_set():
line = pipe.stderr.readline().decode('utf-8', errors='ignore')
if line and ("error" in line.lower()):
print(f"FFmpeg错误: {line.strip()}")
if "Cannot open connection" in line:
stop_event.set()
break
Thread(target=monitor_ffmpeg_output, args=(pipe,), daemon=True).start()
read_thread = Thread(target=read_frames, args=(cap, frame_queue), daemon=True, name="ReadThread")
process_thread = Thread(target=process_frames, args=(frame_queue, processed_frame_queue, ov_model, cls, confidence), daemon=True, name="ProcessThread")
write_thread = Thread(target=write_frames, args=(processed_frame_queue, pipe, size), daemon=True, name="WriteThread")
print("开始推流处理...")
read_thread.start()
process_thread.start()
write_thread.start()
last_check = time.time()
while getIfAI() and not stop_event.is_set():
if not all([t.is_alive() for t in [read_thread, process_thread, write_thread]]):
print("检测到线程停止,退出")
stop_event.set()
break
if pipe.poll() is not None:
print("FFmpeg退出")
stop_event.set()
break
if time.time() - last_check > 30:
print(f"输入队列: {frame_queue.qsize()}/{frame_queue.maxsize} | 输出队列: {processed_frame_queue.qsize()}/{processed_frame_queue.maxsize}")
last_check = time.time()
time.sleep(0.1)
except Exception as e:
print(f"错误: {e}")
finally:
print("清理资源...")
stop_event.set()
setIfAI(False)
for t in [read_thread, process_thread, write_thread]:
if t and t.is_alive():
t.join(timeout=2)
try:
if cap: cap.release()
if pipe:
try:
import signal
os.kill(pipe.pid, signal.SIGTERM)
except: pass
pipe.stdin.close()
pipe.terminate()
try:
pipe.wait(timeout=2)
except:
pipe.kill()
except Exception as e:
print(f"释放资源时出错: {e}")
try:
cv2.destroyAllWindows()
except:
pass
print("资源释放完毕")
if __name__ == '__main__':
sn = "1581F6QAD243C00BP71E"
video_path = f"rtmp://222.212.85.86:1935/live/{sn}"
# FFmpeg 推流地址
rtmp = f"rtmp://222.212.85.86:1935/live/{sn}ai"
try:
startAIVideo2(video_path, rtmp, "best.pt", [0, 1, 2, 3, 4],0.4)
except KeyboardInterrupt:
print("程序被用户中断")
stopAIVideo()
except Exception as e:
print(f"程序异常: {e}")