from threading import Thread, Lock, Event import time import queue from ultralytics import YOLO # 导入 Ultralytics YOLO 模型 import os, cv2, torch, time, queue, subprocess import numpy as np # 全局变量 ifAI = {'status': False} deskLock = Lock() frame_queue = queue.Queue(maxsize=60) # 增加帧缓冲队列大小 processed_frame_queue = queue.Queue(maxsize=30) # 处理后的帧队列 stop_event = Event() def setIfAI(pb1): deskLock.acquire() ifAI['status'] = pb1 deskLock.release() def getIfAI(): return ifAI['status'] def stopAIVideo(): print("正在停止AI视频处理...") setIfAI(False) stop_event.set() # 等待足够长的时间确保资源释放 wait_count = 0 max_wait = 5 # 减少最大等待时间到5秒 while stop_event.is_set() and wait_count < max_wait: time.sleep(0.5) wait_count += 1 if wait_count >= max_wait: print("警告: 停止AI视频处理超时,强制终止") # 不使用_thread._interrupt_main(),改用其他方式强制终止 try: # 尝试终止可能运行的进程 import os import signal import psutil # 查找并终止可能的FFmpeg进程 current_process = psutil.Process(os.getpid()) for child in current_process.children(recursive=True): try: child_name = child.name().lower() if 'ffmpeg' in child_name: print(f"正在终止子进程: {child.pid} ({child_name})") child.send_signal(signal.SIGTERM) except: pass except: pass else: print("AI视频处理已停止") def startAIVideo(video_path, output_url, m1, cls, confidence): if ifAI['status']: stopAIVideo() time.sleep(1) stop_event.clear() thread = Thread(target=startAIVideo2, args=(video_path, output_url, m1, cls, confidence)) # cls2_thread = Thread(target=cls2_find, args=(video_path,m1, cls, confidence)) # cls2_thread.daemon = True # 守护线程,主程序退出时线程也会退出 thread.daemon = True # 守护线程,主程序退出时线程也会退出 thread.start() # cls2_thread.start() def read_frames(cap, frame_queue): """优化的帧读取线程""" frame_count = 0 last_time = time.time() last_fps_time = time.time() # 减小目标帧间隔时间,提高读取帧率 target_time_per_frame = 1.0 / 60.0 # 目标帧间隔时间(提高到60fps) # 添加连接断开检测 connection_error_count = 0 max_connection_errors = 10 # 最多允许连续10次连接错误 last_successful_read = time.time() max_read_wait = 30.0 # 30秒无法读取则认为连接断开 # 预先丢弃几帧,确保从新帧开始处理 for _ in range(5): cap.grab() while not stop_event.is_set(): current_time = time.time() elapsed_time = current_time - last_time # 检查是否长时间无法读取帧 if current_time - last_successful_read > max_read_wait: print(f"警告: {max_read_wait}秒内未能读取到有效帧,可能连接已断开") stop_event.set() break # 帧率控制,但更积极地读取 if elapsed_time < target_time_per_frame: time.sleep(target_time_per_frame - elapsed_time) continue # 当队列快满时,跳过一些帧以避免延迟累积 if frame_queue.qsize() > frame_queue.maxsize * 0.8: # 跳过一些帧 cap.grab() last_time = time.time() continue ret, frame = cap.read() if not ret: print("拉流错误:无法读取帧") connection_error_count += 1 if connection_error_count >= max_connection_errors: print(f"连续{max_connection_errors}次无法读取帧,可能连接已断开,正在停止流程...") stop_event.set() break time.sleep(0.5) # 短暂等待后重试 continue # 成功读取了帧,重置错误计数 connection_error_count = 0 last_successful_read = time.time() frame_count += 1 if frame_count % 60 == 0: # 每60帧计算一次FPS current_fps_time = time.time() fps = 60 / (current_fps_time - last_fps_time) print(f"拉流FPS: {fps:.2f}") last_fps_time = current_fps_time last_time = time.time() frame_queue.put((frame, time.time())) # 添加时间戳 def process_frames(frame_queue, processed_frame_queue, ov_model, cls, confidence): """处理帧的线程,添加帧率控制""" error_count = 0 # 添加错误计数器 max_errors = 5 # 最大容许错误次数 frame_count = 0 process_times = [] # 用于计算平均处理时间 # 设置YOLO模型配置,提高性能 ov_model.conf = confidence # 设置置信度阈值 # 优化推理性能 try: # 尝试启用ONNX Runtime加速 ov_model.to('cuda:0' if torch.cuda.is_available() else 'cpu') # 调整批处理大小为1,减少内存占用 if hasattr(ov_model, 'args') and hasattr(ov_model.args, 'batch'): ov_model.args.batch = 1 # 使用half精度,提高性能 if torch.cuda.is_available() and hasattr(ov_model, 'model'): try: ov_model.model = ov_model.model.half() except Exception as half_err: print(f"半精度转换失败: {half_err}") except Exception as e: print(f"模型优化配置警告: {e}") # 缓存先前的检测结果,用于提高稳定性 last_results = None skip_counter = 0 max_skip = 2 # 最多跳过几帧不处理 while not stop_event.is_set(): try: if processed_frame_queue.qsize() >= processed_frame_queue.maxsize * 0.8: # 如果输出队列接近满,等待一段时间 time.sleep(0.01) continue frame, timestamp = frame_queue.get(timeout=0.2) # 处理延迟过大的帧 if time.time() - timestamp > 0.3: # 减少延迟阈值 continue frame_count += 1 # 间隔采样,每n帧处理一次,减少计算量 if skip_counter > 0 and last_results is not None: skip_counter -= 1 # 使用上次的检测结果 annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5) processed_frame_queue.put((annotated_frame, timestamp)) continue process_start = time.time() # 动态调整处理尺寸,根据队列积压情况 resize_scale = 1.0 if frame_queue.qsize() > frame_queue.maxsize * 0.7: resize_scale = 0.4 # 高负载时大幅降低分辨率 elif frame_queue.qsize() > frame_queue.maxsize * 0.5: resize_scale = 0.6 # 中等负载时适度降低分辨率 elif frame_queue.qsize() > frame_queue.maxsize * 0.3: resize_scale = 0.8 # 轻微负载时轻微降低分辨率 # 调整图像尺寸以加快处理 if resize_scale < 1.0: process_frame = cv2.resize(frame, (0, 0), fx=resize_scale, fy=resize_scale) else: process_frame = frame # 执行推理 try: results = ov_model(process_frame, classes=cls, show=False) last_results = results[0] # 保存检测结果用于后续帧 # 统计数量 num_objs = len(last_results.boxes) if hasattr(last_results, "boxes") else 0 # 统计各类别数量 class_counts = {} if hasattr(last_results, "boxes") and hasattr(last_results.boxes, "cls"): for c in last_results.boxes.cls.cpu().numpy(): class_counts[int(c)] = class_counts.get(int(c), 0) + 1 # # 绘制检测框 if resize_scale < 1.0: annotated_frame = cv2.resize(results[0].plot(conf=False, line_width=1, font_size=1.5), (frame.shape[1], frame.shape[0])) else: annotated_frame = results[0].plot(conf=False, line_width=1, font_size=1.5) # 在画面左上角写入统计信息 text = f"Total: {num_objs} " + " ".join([f"cls{c}:{n}" for c, n in class_counts.items()]) cv2.putText(annotated_frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2) # 在负载高时启用跳帧处理 if frame_queue.qsize() > frame_queue.maxsize * 0.5: skip_counter = max_skip except Exception as infer_err: print(f"推理错误: {infer_err}") if last_results is not None: # 使用上次的结果 annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5) else: # 如果没有上次的结果,简单返回原始帧 annotated_frame = frame.copy() process_end = time.time() process_times.append(process_end - process_start) if len(process_times) > 30: process_times.pop(0) if frame_count % 30 == 0: avg_process_time = sum(process_times) / len(process_times) fps = 1.0 / avg_process_time if avg_process_time > 0 else 0 print(f"模型处理FPS: {fps:.2f}, 平均处理时间: {avg_process_time*1000:.2f}ms, 队列大小: {frame_queue.qsize()}, 缩放比例: {resize_scale:.2f}") processed_frame_queue.put((annotated_frame, timestamp)) error_count = 0 # 成功处理后重置错误计数 except queue.Empty: continue except Exception as e: error_count += 1 print(f"处理帧错误: {e}") if error_count >= max_errors: print(f"连续处理错误达到{max_errors}次,正在停止处理...") stop_event.set() break def write_frames(processed_frame_queue, pipe, size): """写入帧的线程,添加平滑处理""" last_write_time = time.time() target_time_per_frame = 1.0 / 30.0 # 30fps pipe_error_count = 0 # 添加错误计数 max_pipe_errors = 3 # 最大容忍错误数 frame_count = 0 last_fps_time = time.time() skipped_frames = 0 # 使用队列存储最近几帧,用于在需要时进行插值 recent_frames = [] max_recent_frames = 5 # 增加缓存帧数量,提高平滑性 # 使用双缓冲机制提高写入速度 buffer1 = bytearray(size[0] * size[1] * 3) buffer2 = bytearray(size[0] * size[1] * 3) current_buffer = buffer1 # 帧率控制参数 min_frame_interval = target_time_per_frame * 0.5 # 允许的最小帧间隔 max_frame_interval = target_time_per_frame * 2.0 # 允许的最大帧间隔 while not stop_event.is_set(): try: # 获取处理后的帧,超时时间较短以便更平滑地处理 frame, timestamp = processed_frame_queue.get(timeout=0.05) # 存储最近的帧用于插值 recent_frames.append(frame) if len(recent_frames) > max_recent_frames: recent_frames.pop(0) current_time = time.time() elapsed = current_time - last_write_time # 如果两帧间隔太短,考虑合并或跳过 if elapsed < min_frame_interval and len(recent_frames) >= 2: skipped_frames += 1 continue # 如果两帧间隔太长,进行插值平滑 if elapsed > max_frame_interval and len(recent_frames) >= 2: # 计算需要插入的帧数 frames_to_insert = min(int(elapsed / target_time_per_frame), 3) for i in range(frames_to_insert): # 创建插值帧 weight = (i + 1) / (frames_to_insert + 1) interpolated_frame = cv2.addWeighted(recent_frames[-2], 1-weight, recent_frames[-1], weight, 0) # 切换双缓冲 current_buffer = buffer2 if current_buffer is buffer1 else buffer1 # 高效调整大小并写入 interpolated_resized = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR) img_bytes = interpolated_resized.tobytes() # 写入管道 pipe.stdin.write(img_bytes) pipe.stdin.flush() # 切换双缓冲 current_buffer = buffer2 if current_buffer is buffer1 else buffer1 # 正常写入当前帧 - 使用高效的调整大小方法 resized_frame = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR) img_bytes = resized_frame.tobytes() pipe.stdin.write(img_bytes) pipe.stdin.flush() frame_count += 1 if frame_count % 30 == 0: current_fps_time = time.time() fps = 30 / (current_fps_time - last_fps_time) print(f"推流FPS: {fps:.2f}, 跳过的帧: {skipped_frames}, 队列大小: {processed_frame_queue.qsize()}") last_fps_time = current_fps_time skipped_frames = 0 last_write_time = time.time() pipe_error_count = 0 # 成功写入后重置错误计数 except queue.Empty: # 队列为空且有足够的最近帧时,考虑生成插值帧以保持流畅 if len(recent_frames) >= 2 and time.time() - last_write_time > target_time_per_frame: try: # 创建插值帧 interpolated_frame = cv2.addWeighted(recent_frames[-2], 0.5, recent_frames[-1], 0.5, 0) # 切换双缓冲 current_buffer = buffer2 if current_buffer is buffer1 else buffer1 resized_frame = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR) img_bytes = resized_frame.tobytes() pipe.stdin.write(img_bytes) pipe.stdin.flush() last_write_time = time.time() except Exception: pass continue except Exception as e: print(f"写入帧错误: {e}") pipe_error_count += 1 if pipe_error_count >= max_pipe_errors: print("FFmpeg管道错误过多,正在终止进程...") stop_event.set() # 主动结束所有线程 break def startAIVideo2(video_path, output_url, m1, cls, confidence): rtmp = output_url setIfAI(True) cap = None pipe = None read_thread = None process_thread = None write_thread = None ov_model = None try: global frame_queue, processed_frame_queue, stop_event stop_event = Event() os.environ["OMP_NUM_THREADS"] = "4" os.environ["CUDA_VISIBLE_DEVICES"] = "0" print(f"PyTorch 可用: {torch.__version__}, CUDA可用: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"GPU: {torch.cuda.get_device_name(0)}") torch.cuda.empty_cache() print("预加载YOLO模型...") model_params = {} try: test_model = YOLO(m1) if hasattr(test_model, "task"): model_params["task"] = "detect" if torch.cuda.is_available(): model_params["half"] = True import inspect if "verbose" in inspect.signature(YOLO.__init__).parameters: model_params["verbose"] = False except Exception as e: print(f"参数检测失败,将使用默认参数: {e}") model_params = {} retry_count = 0 while retry_count < 3: try: ov_model = YOLO(m1, **model_params) dummy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8) for _ in range(3): ov_model(dummy_frame, classes=cls, conf=confidence, show=False) print("YOLO模型加载成功并预热完成") break except Exception as e: retry_count += 1 print(f"YOLO模型加载失败(尝试 {retry_count}/3): {e}") if "unexpected keyword" in str(e): param = str(e).split("'")[-2] if param in model_params: print(f"移除不支持的参数: {param}") del model_params[param] time.sleep(2) if ov_model is None: raise Exception("无法加载YOLO模型") ov_model.conf = confidence print(f"正在连接视频流: {video_path}") cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) if not cap.isOpened(): raise Exception(f"无法打开视频流: {video_path}") try: cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) except Exception as e: print(f"无法设置自动曝光参数: {e}") frame_queue = queue.Queue(maxsize=80) processed_frame_queue = queue.Queue(maxsize=40) size = (1920, 1080) sizeStr = f"{size[0]}x{size[1]}" command = [ 'ffmpeg', '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', sizeStr, '-r', '30', '-i', '-', '-c:v', 'libx264', '-preset', 'ultrafast', '-tune', 'zerolatency', '-x264-params', 'sei=0', '-pix_fmt', 'yuv420p', '-f', 'flv', '-g', '30', '-keyint_min', '30', '-sc_threshold', '0', '-b:v', '2500k', '-maxrate', '3000k', '-bufsize', '3000k', '-threads', '4', '-vsync', '1', rtmp ] print(f"启动FFmpeg推流到: {rtmp}") pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10*1024*1024) def monitor_ffmpeg_output(pipe): while not stop_event.is_set(): line = pipe.stderr.readline().decode('utf-8', errors='ignore') if line and ("error" in line.lower()): print(f"FFmpeg错误: {line.strip()}") if "Cannot open connection" in line: stop_event.set() break Thread(target=monitor_ffmpeg_output, args=(pipe,), daemon=True).start() read_thread = Thread(target=read_frames, args=(cap, frame_queue), daemon=True, name="ReadThread") process_thread = Thread(target=process_frames, args=(frame_queue, processed_frame_queue, ov_model, cls, confidence), daemon=True, name="ProcessThread") write_thread = Thread(target=write_frames, args=(processed_frame_queue, pipe, size), daemon=True, name="WriteThread") print("开始推流处理...") read_thread.start() process_thread.start() write_thread.start() last_check = time.time() while getIfAI() and not stop_event.is_set(): if not all([t.is_alive() for t in [read_thread, process_thread, write_thread]]): print("检测到线程停止,退出") stop_event.set() break if pipe.poll() is not None: print("FFmpeg退出") stop_event.set() break if time.time() - last_check > 30: print(f"输入队列: {frame_queue.qsize()}/{frame_queue.maxsize} | 输出队列: {processed_frame_queue.qsize()}/{processed_frame_queue.maxsize}") last_check = time.time() time.sleep(0.1) except Exception as e: print(f"错误: {e}") finally: print("清理资源...") stop_event.set() setIfAI(False) for t in [read_thread, process_thread, write_thread]: if t and t.is_alive(): t.join(timeout=2) try: if cap: cap.release() if pipe: try: import signal os.kill(pipe.pid, signal.SIGTERM) except: pass pipe.stdin.close() pipe.terminate() try: pipe.wait(timeout=2) except: pipe.kill() except Exception as e: print(f"释放资源时出错: {e}") try: cv2.destroyAllWindows() except: pass print("资源释放完毕") if __name__ == '__main__': sn = "1581F6QAD243C00BP71E" video_path = f"rtmp://222.212.85.86:1935/live/{sn}" # FFmpeg 推流地址 rtmp = f"rtmp://222.212.85.86:1935/live/{sn}ai" try: startAIVideo2(video_path, rtmp, "best.pt", [0, 1, 2, 3, 4],0.4) except KeyboardInterrupt: print("程序被用户中断") stopAIVideo() except Exception as e: print(f"程序异常: {e}")