import threading import os from ultralytics import YOLO import cv2 import threading import os from ultralytics import YOLO import datetime #@yoooger #----------------------------------------------------------------------------------------------------------------------------------------------- # 定义颜色映射 colors = { "0": (0, 255, 0), # 绿色 "1": (0, 0, 255), # 蓝色 "2": (255, 0, 0), # 红色 "3": (255, 255, 0), # 黄色 "4": (0, 255, 255), # 青色 "5": (255, 0, 255), # 紫色 "6": (128, 0, 0), # 紫色 "7": (0, 128, 0), # 绿色 "8": (0, 0, 128), # 蓝色 "9": (128, 128, 0), # 黄色 "10": (128, 0, 128), # 紫色 "11": (0, 128, 128), # 青色 } #----------------------------------------------------------------------------------------------------------------------------------------------- def draw_and_save(frame, boxes, save_folder, frame_number, model_id): """绘制检测框并保存当前帧""" for box in boxes: class_id, (x1, y1, x2, y2) = box color = colors[class_id] # 根据类别选择颜色 cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # 保存当前帧到指定文件夹 cv2.imwrite(os.path.join(save_folder, f"model_{model_id}_frame_{frame_number}.jpg"), frame) #----------------------------------------------------------------------------------------------------------------------------------------------- def process_images(image_folder_path, model, save_folder, model_id, classes_str, event): """处理文件夹中的图片,检测物体并触发自定义事件""" os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在) classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表 # 回调函数用于绘制并保存帧 def callback(frame, boxes, save_folder, frame_number): draw_and_save(frame, boxes, save_folder, frame_number, model_id) event.subscribe(callback) # 订阅绘制和保存作业 # 遍历文件夹中的每个图片文件 for frame_number, image_name in enumerate(os.listdir(image_folder_path)): image_path = os.path.join(image_folder_path, image_name) frame = cv2.imread(image_path) # 读取图片 if frame is None: continue # 跳过无法读取的图片 # 使用 YOLO 进行推理 results = model(frame) # 输入当前帧 boxes = [] # 定义一个空列表,用于存放检测框 # 处理 YOLO 的输出 for result in results: for box in result.boxes: class_id = int(box.cls) # 获取类别ID if class_id in classes: # 检查类别是否在指定类别数组中 x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标 boxes.append((class_id, (x1, y1, x2, y2))) #------------------------------------------------------------------------------------------------------------------------------------- def process_video(video_path, model, save_folder, model_id, classes_str, event): """处理视频,检测物体并触发自定义事件""" os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在) classes = list(map(int, classes_str.split(',')))# 将逗号分隔的字符串转换为整数列表 cap = cv2.VideoCapture(video_path)# 打开视频文件,使用cv2.VideoCapture()函数,截取视频的帧 skip_frames = 0 # 初始化跳帧数 # 修改callback函数以接受3个参数 def callback(frame, boxes, save_folder): draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id) event.subscribe(callback) # 订阅绘制和保存作业 while cap.isOpened(): ret, frame = cap.read() if not ret: break if skip_frames > 0: # 跳过指定帧数,设置skip_frames为0可取消跳帧 skip_frames -= 1 # 倒数计时器 continue # 使用 YOLO 进行推理 results = model(frame) # 输入当前帧 boxes = [] # 定义一个空列表,用于存放检测框 # 处理 YOLO 的输出 for result in results: for box in result.boxes: class_id = int(box.cls) # 获取类别ID if class_id in classes: # 检查类别是否在指定类别数组中 x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标 boxes.append((class_id, (x1, y1, x2, y2))) #如果检测到物体,则跳过30帧 if boxes: event.trigger(frame, boxes, save_folder) skip_frames = 100 # 检测到物体后跳过30帧 cap.release() def process_video_threshold(video_path, model, save_folder, model_id, classes_str, event, threshold): """ 处理视频,并在指定区域绘制检测框,并在指定阈值触发事件 参数说明: video_path: 视频路径(应当改为视频流??) model: YOLO 模型 save_folder: 保存帧的文件夹 model_id: 模型ID classes_str: 要检测的类别(字符串形式) event: 自定义事件(CustomEvent0、CustomEvent1 等) threshold: 触发事件的阈值 """ os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在) classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表 cap = cv2.VideoCapture(video_path) # 打开视频文件 skip_frames = 0 # 初始化跳帧数 # 订阅绘制和保存作业 event.subscribe(lambda frame, boxes, save_folder: draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id)) while cap.isOpened(): ret, frame = cap.read() if not ret: break if skip_frames > 0: # 跳过指定帧数 skip_frames -= 1 continue # 使用 YOLO 进行推理 results = model(frame, conf=0.4, # 置信度阈值 iou=0.5, # 交并比阈值 ) # 输入当前帧 boxes = [] # 定义一个空列表,用于存放检测框 # 处理 YOLO 的输出 for result in results: for box in result.boxes: class_id = int(box.cls) # 获取类别ID if class_id in classes: # 检查类别是否在指定类别数组中 x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标 boxes.append((class_id, (x1, y1, x2, y2))) # 计算区域内目标数量 object_count = len(boxes) # 如果目标数量超过阈值,则触发事件 if object_count > threshold: event.trigger(frame, boxes, save_folder) skip_frames = 100 # 跳过100帧 cap.release() #------------------------------------------------------------------------------------------------------------------------------------------------------ # 自定义事件类(用于订阅和触发作业,检测到物体时触发作业) class CustomEvent0: def __init__(self): self._listeners = [] def subscribe(self, listener): """订阅一个作业,当事件触发时会执行这个作业""" self._listeners.append(listener) def trigger(self, frame, boxes, save_folder): """触发事件,执行所有订阅的作业""" for listener in self._listeners: listener(frame, boxes, save_folder) #自定义事件类(用于订阅和触发作业,检测到物体大于一定数量时触发作业) class CustomEvent1: def __init__(self, threshold=5): """初始化事件类,并设置触发阈值""" self._listeners = [] self.threshold = threshold # 设置触发作业的物体数量阈值 def subscribe(self, listener): """订阅一个作业,当事件触发时会执行这个作业""" self._listeners.append(listener) def trigger(self, frame, boxes, save_folder): """当检测物体数量超过阈值时,触发事件""" if len(boxes) >= self.threshold: # 检查是否超过阈值 for listener in self._listeners: listener(frame, boxes, save_folder) def in_area_trigger(self, frame, boxes, save_folder, class_id=None, x_min=100, y_min=100, x_max=500, y_max=500): """检测屏幕范围内特定类别的物体数量是否超过阈值""" count_in_area = 0 for box in boxes: obj_class_id, (x1, y1, x2, y2) = box # 检查类别(如果指定)和坐标范围 if (class_id is None or obj_class_id == class_id) and x1 >= x_min and y1 >= y_min and x2 <= x_max and y2 <= y_max: count_in_area += 1 # 如果数量超过阈值,触发事件 if count_in_area >= self.threshold: for listener in self._listeners: listener(frame, boxes, save_folder) #自定义事件类(用于订阅和触发作业,检测到物体小于一定数量时触发作业) class CustomEvent2: def __init__(self, threshold=5): """初始化事件类,并设置触发阈值""" self._listeners = [] self.threshold = threshold # 设置触发作业的物体数量阈值 def subscribe(self, listener): """订阅一个作业,当事件触发时会执行这个作业""" self._listeners.append(listener) def trigger(self, frame, boxes, save_folder): """当检测物体数量超过阈值时,触发事件""" if len(boxes) >= self.threshold: # 检查是否超过阈值 for listener in self._listeners: listener(frame, boxes, save_folder) #----------------------------------------------------------------------------------------------------------------------------------------------- # 获取当前日期和时间精确到分钟作为run_name current_date = datetime.datetime.now().strftime("%Y%m%d_%H%M") # 定义处理任务的函数 def process_task(video_path, model_path, save_folder, classes_str, event_type, threshold=None, model_id=None): """ 统一处理任务的函数 """ model = YOLO(model_path) # 加载 YOLO 模型 os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在) if threshold is not None: # 使用阈值处理视频 threading.Thread( target=process_video_threshold, args=(video_path, model, save_folder, model_id, classes_str, event_type, threshold) ).start() else: # 不使用阈值处理视频 threading.Thread( target=process_video, args=(video_path, model, save_folder, model_id, classes_str, event_type) ).start() def Start_program(video_path, Start_String): """ 启动程序,根据 Start_String 启动特定的模型 :param video_path: 视频路径 :param Start_String: 启动模型的字符串 """ task_configurations = { ''' "模型名称": { "model_path": "模型路径", "save_folder": "保存文件夹路径", "classes": "检测物体的类别", "event": 自定义事件类实例, Customevent0:if have aim /1: aim number more than one number /2: aim number less than one number "threshold": 触发阈值(可选), "model_id": 模型ID(可选) } ''' # 出现了人 "have_peoples": { "model_path": "yolo11n.pt", "save_folder": f"output/{current_date}_People_in_the_area", "classes": "0", "event": CustomEvent0(), "threshold": None, "model_id": 1, }, #发现了人员聚集 "many_peoples": { "model_path": "yolo11n.pt", "save_folder": f"output/{current_date}_Many_people", "classes": "0", "event": CustomEvent1(threshold=5), "threshold": 5, "model_id": 2, }, #发现了无安全帽人员 "no_helmet": { "model_path": "gdaq.pt", "save_folder": f"output/{current_date}_Have_no_helmet", "classes": "1", "event": CustomEvent0(), "threshold": None, "model_id": 3, }, #无安全绳 "no_safety_line": { "model_path": "gdaq.pt", "save_folder": f"output/{current_date}_Have_no_safety_line", "classes": "3", "event": CustomEvent0(), "threshold": None, "model_id": 4, }, "smoke": { "model_path": "smoke.pt", "save_folder": f"output/{current_date}_Have_smoke", "classes": "1", "event": CustomEvent0(), "threshold": None, "model_id": 5, }, #发现了烟雾 "fire": { "model_path": "smoke.pt", "save_folder": f"output/{current_date}_Have_fire", "classes": "0", "event": CustomEvent0(), "threshold": None, "model_id": 6, }, #护栏破损 "HULAN_POSUN": { "model_path": "gdaq.pt", "save_folder": f"output/{current_date}_HULAN_POSUN", "classes": "6", "event": CustomEvent0(), "threshold": None, "model_id": 7, }, } # 将 Start_String 拆分为列表 start_flags = Start_String.split(",") for flag in start_flags: if flag in task_configurations: config = task_configurations[flag] process_task( video_path, config["model_path"], config["save_folder"], config["classes"], config["event"], config["threshold"], config["model_id"], #we difine model_id to identify the model ) if __name__ == '__main__': # 使用线程处理视频 video_path = r"D:\work\DJI_20241122093213_0043_V.mp4" # 替换为你的视频路径 String = "have_peoples,many_peoples,no_helmet,no_safety_line" # 设置要启动的模型 Start_program(video_path, String) # 启动程序