import cv2 import subprocess from threading import Thread, Lock, Event import time import queue import numpy as np from ultralytics import YOLO import datetime import os import uuid # ---------- 会话管理器 ---------- stream_sessions = {} def start_video_session(video_path, output_url, model_path, cls, confidence,cls2=[],push = False): """ 启动一个视频流处理会话,返回 session_id。 """ session_id = str(uuid.uuid4()) stop_event = Event() frame_q = queue.Queue(maxsize=5) processed_q = queue.Queue(maxsize=5) model_container = { 'model': YOLO(model_path), 'lock': Lock() } # 启动后台线程 thread = Thread( target=stream_worker, args= (video_path, output_url, cls, confidence, stop_event, frame_q, processed_q, model_container,cls2,push), daemon=True ) thread.start() stream_sessions[session_id] = { 'thread': thread, 'stop_event': stop_event, 'model_container': model_container } print(f"会话 {session_id} 已启动,视频源: {video_path}") return session_id def stop_video_session(session_id): """ 停止指定会话。 """ sess = stream_sessions.get(session_id) if not sess: print(f"会话 {session_id} 不存在。") return sess['stop_event'].set() sess['thread'].join() print(f"会话 {session_id} 已停止。") del stream_sessions[session_id] def switch_model_session(session_id, new_model_path): """ 动态切换指定会话的模型权重。 """ sess = stream_sessions.get(session_id) if not sess: print(f"会话 {session_id} 不存在,无法切换模型。") return with sess['model_container']['lock']: sess['model_container']['model'] = YOLO(new_model_path) print(f"会话 {session_id} 模型已切换为: {new_model_path}") def _read_frames(cap, frame_q, stop_event): """ 读取帧,保持30fps,丢弃延迟帧,若连续2秒无有效帧则触发停止 """ last = time.time() target = 1.0 / 30.0 no_frame_start = None # 开始无法读取帧的时间点 while not stop_event.is_set(): now = time.time() if now - last < target: time.sleep(target - (now - last)) continue if frame_q.qsize() < frame_q.maxsize: ret, frm = cap.read() if not ret: if no_frame_start is None: no_frame_start = time.time() elif time.time() - no_frame_start >= 2: print("⚠️ 超过2秒未获取到帧,自动停止会话。") stop_event.set() break continue else: no_frame_start = None # 成功读取帧,重置计时 frame_q.put((frm, time.time())) last = time.time() else: time.sleep(0.001) def _process_frames(frame_q, processed_q, cls, confidence, model_container, stop_event): """ 检测并绘制 """ skip_counter = 0 while not stop_event.is_set(): try: frm, ts = frame_q.get(timeout=1) if time.time() - ts > 0.5: continue skip_counter = (skip_counter + 1) % 2 if skip_counter != 0: continue with model_container['lock']: results = model_container['model'].track(frm, persist=True, classes=cls, conf=confidence) counts = {} for r in results: if hasattr(r, 'boxes'): for b in r.boxes: cid = int(b.cls[0]) counts[cid] = counts.get(cid, 0) + 1 ann = results[0].plot(conf=False, line_width=1, font_size=1.5) y_off = 30 for cid, cnt in counts.items(): txt = f"Class {cid}: {cnt}" cv2.putText(ann, txt, (10, y_off), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2) y_off += 25 processed_q.put((ann, ts)) except queue.Empty: continue except Exception as e: print(f"_process_frames 错误: {e}") def _write_frames(processed_q, pipe, size, stop_event): """ 推流写入 """ last = time.time() target = 1.0 / 30.0 while not stop_event.is_set(): try: frm, ts = processed_q.get(timeout=1) now = time.time() if now - last < target: time.sleep(target - (now - last)) img = cv2.resize(frm, size, interpolation=cv2.INTER_LINEAR) pipe.stdin.write(img.tobytes()) last = time.time() except queue.Empty: continue except Exception as e: print(f"_write_frames 错误: {e}") break def _cls2_find(video_path, confidence, save_dir, stop_event): """ 指定类别检测并上传数据库,检测到每个跟踪目标只保存一次后续忽略。 """ model = YOLO("gdaq.pt") cls2 = [2, 4] cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 30 # 存储已经保存过的 track_id seen_tracks = set() while cap.isOpened() and not stop_event.is_set(): ret, frame = cap.read() if not ret: break # 使用 track 来获取每个目标的 track_id results = model.track(frame, persist=True, classes=cls2, conf=confidence, show=False) for res in results: for box in res.boxes: tid = int(box.id[0]) # 跟踪ID cls_id = int(box.cls[0]) if cls_id in cls2 and tid not in seen_tracks: seen_tracks.add(tid) fn = os.path.join( save_dir, f"track_{tid}_frame_{int(cap.get(cv2.CAP_PROP_POS_FRAMES))}.jpg" ) cv2.imwrite(fn, frame) print(f"保存: {fn}") # TODO: 在这里做上传和数据库写入 # 小睡一会,防止 CPU 占用过高 time.sleep(0.01) cap.release() # ---------- 核心处理逻辑 ---------- def stream_worker(video_path, output_url, cls, confidence, stop_event, frame_q, processed_q, model_container, cls2=[],push = False): """ 会话主函数:打开流、预热模型、启动子线程、等待停止。 """ # 创建保存目录(cls2 功能) date_str = datetime.datetime.now().strftime("%Y%m%d") save_dir = f"AIResults_{date_str}" os.makedirs(save_dir, exist_ok=True) # 打开视频流 cap = cv2.VideoCapture(video_path, cv2.CAP_FFMPEG) cap.set(cv2.CAP_PROP_BUFFERSIZE, 2) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) # 预加载并预热模型 print(f"[{video_path}] 预加载YOLO模型...") with model_container['lock']: dummy = np.zeros((1080, 1920, 3), dtype=np.uint8) model_container['model'].track(dummy, classes=cls, conf=confidence, show=False) # 构造 FFmpeg 推流命令 size = (1920, 1080) size_str = f"{size[0]}x{size[1]}" command = [ 'ffmpeg', '-y', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-s', size_str, '-r', '30', '-probesize', '32', '-analyzeduration', '0', '-i', '-', '-c:v', 'h264', '-pix_fmt', 'yuv420p', '-preset', 'ultrafast', '-tune', 'zerolatency', '-f', 'flv', '-g', '30', '-bufsize', '1000k', '-maxrate', '2000k', '-x264opts', 'no-scenecut:keyint=30:min-keyint=30', '-flvflags', 'no_duration_filesize', output_url ] pipe = subprocess.Popen(command, stdin=subprocess.PIPE) # 启动子线程 threads = [] threads.append(Thread(target=_read_frames, args=(cap, frame_q, stop_event), daemon=True)) threads.append(Thread(target=_process_frames, args=(frame_q, processed_q, cls, confidence, model_container, stop_event), daemon=True)) if push: threads.append(Thread(target=_write_frames, args=(processed_q, pipe, size, stop_event), daemon=True)) if cls2: threads.append(Thread(target=_cls2_find, args=(video_path, confidence, save_dir, stop_event), daemon=True)) for t in threads: t.start() print(f"[{video_path}] 开始处理流...") # 等待停止信号 while not stop_event.is_set() and cap.isOpened(): time.sleep(0.1) # 清理 print(f"[{video_path}] 停止中,正在清理...") cap.release() pipe.stdin.close() pipe.wait() for q in (frame_q, processed_q): while not q.empty(): try: q.get_nowait() except queue.Empty: break print(f"[{video_path}] 会话结束。")