import cv2 import subprocess from threading import Thread, Lock, Event import time import queue import numpy as np import datetime import os from ultralytics import YOLO # 导入 Ultralytics YOLO 模型 # 全局变量 ifAI = {'status': False} deskLock = Lock() frame_queue = queue.Queue(maxsize=60) # 增加帧缓冲队列大小 processed_frame_queue = queue.Queue(maxsize=30) # 处理后的帧队列 stop_event = Event() def setIfAI(pb1): deskLock.acquire() ifAI['status'] = pb1 deskLock.release() def getIfAI(): return ifAI['status'] def stopAIVideo(): print("正在停止AI视频处理...") setIfAI(False) stop_event.set() # 等待足够长的时间确保资源释放 wait_count = 0 max_wait = 5 # 减少最大等待时间到5秒 while stop_event.is_set() and wait_count < max_wait: time.sleep(0.5) wait_count += 1 if wait_count >= max_wait: print("警告: 停止AI视频处理超时,强制终止") # 不使用_thread._interrupt_main(),改用其他方式强制终止 try: # 尝试终止可能运行的进程 import os import signal import psutil # 查找并终止可能的FFmpeg进程 current_process = psutil.Process(os.getpid()) for child in current_process.children(recursive=True): try: child_name = child.name().lower() if 'ffmpeg' in child_name: print(f"正在终止子进程: {child.pid} ({child_name})") child.send_signal(signal.SIGTERM) except: pass except: pass else: print("AI视频处理已停止") def startAIVideo(video_path, output_url, m1, cls, confidence): if ifAI['status']: stopAIVideo() time.sleep(1) stop_event.clear() thread = Thread(target=startAIVideo2, args=(video_path, output_url, m1, cls, confidence)) # cls2_thread = Thread(target=cls2_find, args=(video_path,m1, cls, confidence)) # cls2_thread.daemon = True # 守护线程,主程序退出时线程也会退出 thread.daemon = True # 守护线程,主程序退出时线程也会退出 thread.start() # cls2_thread.start() def read_frames(cap, frame_queue): """优化的帧读取线程""" frame_count = 0 last_time = time.time() last_fps_time = time.time() # 减小目标帧间隔时间,提高读取帧率 target_time_per_frame = 1.0 / 60.0 # 目标帧间隔时间(提高到60fps) # 添加连接断开检测 connection_error_count = 0 max_connection_errors = 10 # 最多允许连续10次连接错误 last_successful_read = time.time() max_read_wait = 30.0 # 30秒无法读取则认为连接断开 # 预先丢弃几帧,确保从新帧开始处理 for _ in range(5): cap.grab() while not stop_event.is_set(): current_time = time.time() elapsed_time = current_time - last_time # 检查是否长时间无法读取帧 if current_time - last_successful_read > max_read_wait: print(f"警告: {max_read_wait}秒内未能读取到有效帧,可能连接已断开") stop_event.set() break # 帧率控制,但更积极地读取 if elapsed_time < target_time_per_frame: time.sleep(target_time_per_frame - elapsed_time) continue # 当队列快满时,跳过一些帧以避免延迟累积 if frame_queue.qsize() > frame_queue.maxsize * 0.8: # 跳过一些帧 cap.grab() last_time = time.time() continue ret, frame = cap.read() if not ret: print("拉流错误:无法读取帧") connection_error_count += 1 if connection_error_count >= max_connection_errors: print(f"连续{max_connection_errors}次无法读取帧,可能连接已断开,正在停止流程...") stop_event.set() break time.sleep(0.5) # 短暂等待后重试 continue # 成功读取了帧,重置错误计数 connection_error_count = 0 last_successful_read = time.time() frame_count += 1 if frame_count % 60 == 0: # 每60帧计算一次FPS current_fps_time = time.time() fps = 60 / (current_fps_time - last_fps_time) print(f"拉流FPS: {fps:.2f}") last_fps_time = current_fps_time last_time = time.time() frame_queue.put((frame, time.time())) # 添加时间戳 def process_frames(frame_queue, processed_frame_queue, ov_model, cls, confidence, use_fp16=False): """处理帧的线程,添加帧率控制,支持半精度FP16""" import torch import time import queue import cv2 error_count = 0 # 添加错误计数器 max_errors = 5 # 最大容许错误次数 frame_count = 0 process_times = [] # 用于计算平均处理时间 # 设置YOLO模型配置,提高性能 ov_model.conf = confidence # 设置置信度阈值 # 将模型移到设备(GPU或CPU) device = 'cuda:0' if torch.cuda.is_available() else 'cpu' try: ov_model.to(device) # 调整批处理大小为1,减少内存占用 if hasattr(ov_model, 'args') and hasattr(ov_model.args, 'batch'): ov_model.args.batch = 1 # 启用半精度 if use_fp16 and device.startswith('cuda') and hasattr(ov_model, 'model'): try: ov_model.model = ov_model.model.half() print("✅ 启用半精度 FP16 模式") except Exception as half_err: print(f"⚠️ 半精度转换失败: {half_err}") else: print("ℹ️ 半精度模式未启用") except Exception as e: print(f"⚠️ 模型设备配置警告: {e}") # 缓存先前的检测结果,用于提高稳定性 last_results = None skip_counter = 0 max_skip = 2 # 最多跳过几帧不处理 while not stop_event.is_set(): try: if processed_frame_queue.qsize() >= processed_frame_queue.maxsize * 0.8: time.sleep(0.01) continue frame, timestamp = frame_queue.get(timeout=0.2) if time.time() - timestamp > 0.3: continue frame_count += 1 if skip_counter > 0 and last_results is not None: skip_counter -= 1 annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5) processed_frame_queue.put((annotated_frame, timestamp)) continue process_start = time.time() resize_scale = 1.0 qsize = frame_queue.qsize() maxsize = frame_queue.maxsize if qsize > maxsize * 0.7: resize_scale = 0.4 elif qsize > maxsize * 0.5: resize_scale = 0.6 elif qsize > maxsize * 0.3: resize_scale = 0.8 if resize_scale < 1.0: process_frame = cv2.resize(frame, (0, 0), fx=resize_scale, fy=resize_scale) else: process_frame = frame try: results = ov_model(process_frame, classes=cls, show=False) last_results = results[0] if resize_scale < 1.0: annotated_frame = cv2.resize(results[0].plot(conf=False, line_width=1, font_size=1.5), (frame.shape[1], frame.shape[0])) else: annotated_frame = results[0].plot(conf=False, line_width=1, font_size=1.5) if qsize > maxsize * 0.5: skip_counter = max_skip except Exception as infer_err: print(f"推理错误: {infer_err}") if last_results is not None: annotated_frame = last_results.plot(conf=False, line_width=1, font_size=1.5) else: annotated_frame = frame.copy() process_end = time.time() process_times.append(process_end - process_start) if len(process_times) > 30: process_times.pop(0) if frame_count % 30 == 0: avg_process_time = sum(process_times) / len(process_times) fps = 1.0 / avg_process_time if avg_process_time > 0 else 0 print( f"模型处理FPS: {fps:.2f}, 平均处理时间: {avg_process_time * 1000:.2f}ms, 队列大小: {qsize}, 缩放比例: {resize_scale:.2f}") processed_frame_queue.put((annotated_frame, timestamp)) error_count = 0 except queue.Empty: continue except Exception as e: error_count += 1 print(f"处理帧错误: {e}") if error_count >= max_errors: print(f"连续处理错误达到{max_errors}次,正在停止处理...") stop_event.set() break def write_frames(processed_frame_queue, pipe, size): """写入帧的线程,添加平滑处理""" last_write_time = time.time() target_time_per_frame = 1.0 / 30.0 # 30fps pipe_error_count = 0 # 添加错误计数 max_pipe_errors = 3 # 最大容忍错误数 frame_count = 0 last_fps_time = time.time() skipped_frames = 0 # 使用队列存储最近几帧,用于在需要时进行插值 recent_frames = [] max_recent_frames = 5 # 增加缓存帧数量,提高平滑性 # 使用双缓冲机制提高写入速度 buffer1 = bytearray(size[0] * size[1] * 3) buffer2 = bytearray(size[0] * size[1] * 3) current_buffer = buffer1 # 帧率控制参数 min_frame_interval = target_time_per_frame * 0.5 # 允许的最小帧间隔 max_frame_interval = target_time_per_frame * 2.0 # 允许的最大帧间隔 while not stop_event.is_set(): try: # 获取处理后的帧,超时时间较短以便更平滑地处理 frame, timestamp = processed_frame_queue.get(timeout=0.05) # 存储最近的帧用于插值 recent_frames.append(frame) if len(recent_frames) > max_recent_frames: recent_frames.pop(0) current_time = time.time() elapsed = current_time - last_write_time # 如果两帧间隔太短,考虑合并或跳过 if elapsed < min_frame_interval and len(recent_frames) >= 2: skipped_frames += 1 continue # 如果两帧间隔太长,进行插值平滑 if elapsed > max_frame_interval and len(recent_frames) >= 2: # 计算需要插入的帧数 frames_to_insert = min(int(elapsed / target_time_per_frame), 3) for i in range(frames_to_insert): # 创建插值帧 weight = (i + 1) / (frames_to_insert + 1) interpolated_frame = cv2.addWeighted(recent_frames[-2], 1 - weight, recent_frames[-1], weight, 0) # 切换双缓冲 current_buffer = buffer2 if current_buffer is buffer1 else buffer1 # 高效调整大小并写入 interpolated_resized = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR) img_bytes = interpolated_resized.tobytes() # 写入管道 pipe.stdin.write(img_bytes) pipe.stdin.flush() # 切换双缓冲 current_buffer = buffer2 if current_buffer is buffer1 else buffer1 # 正常写入当前帧 - 使用高效的调整大小方法 resized_frame = cv2.resize(frame, size, interpolation=cv2.INTER_LINEAR) img_bytes = resized_frame.tobytes() pipe.stdin.write(img_bytes) pipe.stdin.flush() frame_count += 1 if frame_count % 30 == 0: current_fps_time = time.time() fps = 30 / (current_fps_time - last_fps_time) print(f"推流FPS: {fps:.2f}, 跳过的帧: {skipped_frames}, 队列大小: {processed_frame_queue.qsize()}") last_fps_time = current_fps_time skipped_frames = 0 last_write_time = time.time() pipe_error_count = 0 # 成功写入后重置错误计数 except queue.Empty: # 队列为空且有足够的最近帧时,考虑生成插值帧以保持流畅 if len(recent_frames) >= 2 and time.time() - last_write_time > target_time_per_frame: try: # 创建插值帧 interpolated_frame = cv2.addWeighted(recent_frames[-2], 0.5, recent_frames[-1], 0.5, 0) # 切换双缓冲 current_buffer = buffer2 if current_buffer is buffer1 else buffer1 resized_frame = cv2.resize(interpolated_frame, size, interpolation=cv2.INTER_LINEAR) img_bytes = resized_frame.tobytes() pipe.stdin.write(img_bytes) pipe.stdin.flush() last_write_time = time.time() except Exception: pass continue except Exception as e: print(f"写入帧错误: {e}") pipe_error_count += 1 if pipe_error_count >= max_pipe_errors: print("FFmpeg管道错误过多,正在终止进程...") stop_event.set() # 主动结束所有线程 break def cls2_find(video_path, m1, cls, confidence): try: ov_model = YOLO(m1) # ---------------------------------MinIO 存储路径(用于后续上传)------------------------------------ minio_path = "AIResults" # -------------------------------获取当前日期,用于存储图像目录-------------------------------------- date_str = datetime.datetime.now().strftime("%Y%m%d") save_dir = f"{date_str}" if not os.path.exists(save_dir): os.makedirs(save_dir) # 打开视频流 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print("Error: Could not open video.") return # 获取视频的帧率 (fps) fps = cap.get(cv2.CAP_PROP_FPS) # -----------------------------根据模型设置类别------------------------------------------------- if m1 == "gdaq.pt": # 仅当使用 gdaq.pt 时,保存类别 2 和 4 cls2 = [2, 4] elif m1 == "best.pt": # 仅当使用 best.pt 时,保存类别 0 cls2 = [0] else: # 其它模型不保存 cls2 = [] # ------------------------------------------cls2检测-------------------------------------------- skip_frames = int(fps * 10) # 设置跳过帧数为 10 秒 while cap.isOpened() and not stop_event.is_set(): if skip_frames > 0: skip_frames -= 1 # 逐帧减少 cap.grab() # 仅抓取帧,不进行解码 continue # 跳过处理 ret, frame = cap.read() if not ret: break # 无法读取帧时退出 # 目标检测 results = ov_model(frame, conf=confidence, classes=cls, show=False) for result in results: for box in result.boxes: cls_index = int(box.cls[0]) # 获取类别索引 # 如果检测到的类别在 cls2 里,跳过 10 秒 if cls_index in cls2: skip_frames = int(fps * 10) # 设置跳过帧数为 10 秒 # upload_and_insert_to_db(frame, ov_model, cls_index, save_dir, minio_path) filename = f"{save_dir}/frame_{int(cap.get(cv2.CAP_PROP_POS_FRAMES))}_cls2.jpg" cv2.imwrite(filename, frame) print(f"保存图像: {filename}") # 绘制检测框 x1, y1, x2, y2 = map(int, box.xyxy[0]) label = f"{result.names[cls_index]} {box.conf[0]:.2f}" cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) except Exception as e: print(f"cls2_find错误: {e}") finally: if 'cap' in locals() and cap is not None: cap.release() def startAIVideo2(video_path, output_url, m1, cls, confidence): rtmp = output_url setIfAI(True) # OpenCV拉流优化参数 cap = None pipe = None read_thread = None process_thread = None write_thread = None ov_model = None try: # 设置环境变量,提高YOLO性能 os.environ["OMP_NUM_THREADS"] = "4" # 限制OpenMP线程数 os.environ["CUDA_VISIBLE_DEVICES"] = "0" # 使用GPU 0 # 导入必要的库 try: import torch print(f"PyTorch 可用: {torch.__version__}, CUDA可用: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"GPU: {torch.cuda.get_device_name(0)}") # 预先分配GPU内存,避免动态分配造成的卡顿 torch.cuda.empty_cache() except ImportError: print("未检测到PyTorch,将使用CPU模式") # 预加载YOLO模型并增加重试机制 print("预加载YOLO模型...") max_retries = 3 retry_count = 0 # 模型加载参数优化 - 移除不支持的参数 model_params = {} # 尝试检测YOLO版本以适配不同版本的参数 try: # 先简单尝试加载模型,不带任何参数 test_model = YOLO(m1) # 如果成功,检查可用的参数 if hasattr(test_model, "task"): model_params["task"] = "detect" # 指定任务类型 # 检查是否支持half精度 if torch.cuda.is_available(): model_params["half"] = True # 检查是否支持verbose参数 import inspect if "verbose" in inspect.signature(YOLO.__init__).parameters: model_params["verbose"] = False print(f"检测到支持的YOLO参数: {model_params}") except Exception as e: print(f"参数检测失败,将使用默认参数: {e}") model_params = {} while retry_count < max_retries: try: # 使用优化的模型加载参数 ov_model = YOLO(m1, **model_params) # 预热模型 - 多次运行,确保模型完全加载 dummy_frame = np.zeros((1080, 1920, 3), dtype=np.uint8) for _ in range(3): # 多次预热 ov_model(dummy_frame, classes=cls, conf=confidence, show=False) print("YOLO模型加载成功并预热完成") break except Exception as e: retry_count += 1 print(f"YOLO模型加载失败 (尝试 {retry_count}/{max_retries}): {e}") # 如果是参数问题,尝试减少参数 if "got an unexpected keyword argument" in str(e) and model_params: # 提取不支持的参数 param_name = str(e).split("'")[-2] if "'" in str(e) else None if param_name and param_name in model_params: print(f"移除不支持的参数: {param_name}") del model_params[param_name] time.sleep(2) # 等待一段时间后重试 if ov_model is None: raise Exception("无法加载YOLO模型,达到最大重试次数") # 优化模型推理参数 ov_model.conf = confidence # 置信度阈值 # 打开视频流 print(f"正在连接视频流: {video_path}") # 添加超时参数,防止长时间等待 cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) # 设置FFMPEG的读取超时参数 cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000) # 10秒连接超时 cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) # 5秒读取超时 if not cap.isOpened(): raise Exception(f"无法打开视频流: {video_path}") # 优化OpenCV的拉流参数 cap.set(cv2.CAP_PROP_BUFFERSIZE, 5) # 增加缓冲区 cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) # 尝试禁用自动曝光和自动白平衡,减少闪烁 try: cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0) # 关闭自动曝光 except Exception as e: print(f"注意: 无法设置自动曝光参数: {e}") # 设置全局帧队列大小 global frame_queue, processed_frame_queue frame_queue = queue.Queue(maxsize=80) # 增加队列大小以处理波动 processed_frame_queue = queue.Queue(maxsize=40) # 检查RTMP服务器是否可达 import socket rtmp_parts = rtmp.replace("rtmp://", "").split("/")[0].split(":") rtmp_host = rtmp_parts[0] rtmp_port = int(rtmp_parts[1]) if len(rtmp_parts) > 1 else 1935 try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(3) s.connect((rtmp_host, rtmp_port)) s.close() print(f"RTMP服务器连接成功: {rtmp_host}:{rtmp_port}") except Exception as e: print(f"警告: RTMP服务器可能不可达 {rtmp_host}:{rtmp_port}: {e}") # 继续执行,有些服务器只接受RTMP协议而不接受TCP连接测试 size = (int(1280), int(720)) # 降低输出分辨率,减轻负担 sizeStr = f"{size[0]}x{size[1]}" # 优化FFMPEG推流参数,提高平滑度 command = ['ffmpeg', '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', sizeStr, '-r', '30', '-i', '-', '-c:v', 'h264', '-pix_fmt', 'yuv420p', '-preset', 'ultrafast', '-tune', 'zerolatency', '-f', 'flv', '-g', '30', '-bufsize', '4000k', # 增加缓冲区 '-maxrate', '4000k', # 增加最大比特率 '-b:v', '2500k', # 设置视频比特率 '-vsync', '1', # 帧率同步模式 '-threads', '4', # 限制线程数 '-reconnect', '1', # 断开时尝试重新连接 '-reconnect_at_eof', '1', '-reconnect_streamed', '1', '-reconnect_delay_max', '5', # 最大重连延迟5秒 '-x264opts', 'no-scenecut:keyint=30:min-keyint=30', # 固定关键帧间隔 '-flvflags', 'no_duration_filesize', rtmp] # 创建推流进程 print(f"正在启动FFmpeg推流到: {rtmp}") pipe = subprocess.Popen(command, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 * 1024 * 1024) # 增加缓冲区大小 # 创建线程监控FFmpeg输出 def monitor_ffmpeg_output(pipe): while not stop_event.is_set(): line = pipe.stderr.readline().decode('utf-8', errors='ignore') if not line: continue if "Error" in line or "error" in line: print(f"FFmpeg错误: {line.strip()}") if "Cannot open connection" in line: print("无法连接到RTMP服务器,正在停止流程...") stop_event.set() break ffmpeg_monitor = Thread(target=monitor_ffmpeg_output, args=(pipe,)) ffmpeg_monitor.daemon = True ffmpeg_monitor.start() # 尝试设置线程优先级 try: import psutil p = psutil.Process() p.nice(psutil.HIGH_PRIORITY_CLASS if os.name == 'nt' else -10) print("已设置程序为高优先级") except: print("无法设置进程优先级") # 创建并启动线程 read_thread = Thread(target=read_frames, args=(cap, frame_queue)) process_thread = Thread(target=process_frames, args=(frame_queue, processed_frame_queue, ov_model, cls, confidence)) write_thread = Thread(target=write_frames, args=(processed_frame_queue, pipe, size)) # 设置线程为守护线程 read_thread.daemon = True process_thread.daemon = True write_thread.daemon = True # 设置线程优先级 read_thread.name = "ReadThread" process_thread.name = "ProcessThread" write_thread.name = "WriteThread" # 启动线程 print("开始处理视频流...") read_thread.start() process_thread.start() write_thread.start() # 定期检查性能并输出日志 performance_check_interval = 30 # 每30秒检查一次 last_check_time = time.time() # 等待线程结束 while getIfAI() and not stop_event.is_set(): # 检查线程是否都在运行 if not (read_thread.is_alive() and process_thread.is_alive() and write_thread.is_alive()): print("检测到某个线程已停止运行,正在终止所有线程...") stop_event.set() break # 检查FFmpeg进程是否存活 if pipe.poll() is not None: print(f"FFmpeg进程已退出,状态码: {pipe.returncode}") stop_event.set() break # 定期输出性能统计 current_time = time.time() if current_time - last_check_time > performance_check_interval: print(f"性能统计 - 输入队列: {frame_queue.qsize()}/{frame_queue.maxsize}, " + f"输出队列: {processed_frame_queue.qsize()}/{processed_frame_queue.maxsize}") last_check_time = current_time time.sleep(0.1) except Exception as e: print(f"发生错误: {e}") finally: # 清理资源 print("正在清理资源...") stop_event.set() setIfAI(False) # 等待线程结束,增加超时机制 timeout = 3 # 减少超时等待时间到3秒 start_time = time.time() # 等待线程正常结束 if read_thread and process_thread and write_thread: while ( read_thread.is_alive() or process_thread.is_alive() or write_thread.is_alive()) and time.time() - start_time < timeout: time.sleep(0.1) # 强制结束持续运行的线程 if read_thread.is_alive() or process_thread.is_alive() or write_thread.is_alive(): print("警告: 部分线程未能正常结束,强制终止") # 清空队列 try: while not frame_queue.empty(): frame_queue.get_nowait() except: pass try: while not processed_frame_queue.empty(): processed_frame_queue.get_nowait() except: pass # 确保资源完全释放 if cap is not None: try: cap.release() print("视频捕获资源已释放") except: pass if pipe is not None: try: # 先向ffmpeg发送信号,让它正常结束 try: import signal os.kill(pipe.pid, signal.SIGTERM) print(f"已向FFmpeg进程(PID:{pipe.pid})发送终止信号") except: pass # 关闭管道 pipe.stdin.close() # 尝试正常终止 pipe.terminate() # 等待一段时间让进程自行结束 try: pipe.wait(timeout=2) # 等待进程结束 print(f"FFmpeg进程已终止,退出码: {pipe.returncode}") except: # 如果等待超时,强制杀死进程 pipe.kill() print("FFmpeg进程已强制终止") except Exception as e: print(f"关闭FFmpeg时出错: {e}") try: cv2.destroyAllWindows() except: pass print("所有资源已清理完毕") if __name__ == '__main__': sn = "1581F6QAD243C00BP71E" # video_path = f"rtmp://112.44.103.230:1935/live/{sn}" video_path = f"rtmp://112.44.103.230:1935/live/123456" # FFmpeg 推流地址 rtmp = f"rtmp://112.44.103.230:1935/live/{sn}ai" try: startAIVideo2(video_path, rtmp, "best.pt", [0, 1, 2, 3, 4], 0.4) except KeyboardInterrupt: print("程序被用户中断") stopAIVideo() except Exception as e: print(f"程序异常: {e}")