AI_python_yoooger/event/A_lot_of.py
2025-07-24 14:12:53 +08:00

355 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
import os
from ultralytics import YOLO
import cv2
import threading
import os
from ultralytics import YOLO
import datetime
#@yoooger
#-----------------------------------------------------------------------------------------------------------------------------------------------
# 定义颜色映射
colors = {
"0": (0, 255, 0), # 绿色
"1": (0, 0, 255), # 蓝色
"2": (255, 0, 0), # 红色
"3": (255, 255, 0), # 黄色
"4": (0, 255, 255), # 青色
"5": (255, 0, 255), # 紫色
"6": (128, 0, 0), # 紫色
"7": (0, 128, 0), # 绿色
"8": (0, 0, 128), # 蓝色
"9": (128, 128, 0), # 黄色
"10": (128, 0, 128), # 紫色
"11": (0, 128, 128), # 青色
}
#-----------------------------------------------------------------------------------------------------------------------------------------------
def draw_and_save(frame, boxes, save_folder, frame_number, model_id):
"""绘制检测框并保存当前帧"""
for box in boxes:
class_id, (x1, y1, x2, y2) = box
color = colors[class_id] # 根据类别选择颜色
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 保存当前帧到指定文件夹
cv2.imwrite(os.path.join(save_folder, f"model_{model_id}_frame_{frame_number}.jpg"), frame)
#-----------------------------------------------------------------------------------------------------------------------------------------------
def process_images(image_folder_path, model, save_folder, model_id, classes_str, event):
"""处理文件夹中的图片,检测物体并触发自定义事件"""
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表
# 回调函数用于绘制并保存帧
def callback(frame, boxes, save_folder, frame_number):
draw_and_save(frame, boxes, save_folder, frame_number, model_id)
event.subscribe(callback) # 订阅绘制和保存作业
# 遍历文件夹中的每个图片文件
for image_name in enumerate(os.listdir(image_folder_path)):
image_path = os.path.join(image_folder_path, image_name)
frame = cv2.imread(image_path) # 读取图片
if frame is None:
continue # 跳过无法读取的图片
# 使用 YOLO 进行推理
results = model(frame) # 输入当前帧
boxes = [] # 定义一个空列表,用于存放检测框
# 处理 YOLO 的输出
for result in results:
for box in result.boxes:
class_id = int(box.cls) # 获取类别ID
if class_id in classes: # 检查类别是否在指定类别数组中
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
boxes.append((class_id, (x1, y1, x2, y2)))
#-------------------------------------------------------------------------------------------------------------------------------------
def process_video(video_path, model, save_folder, model_id, classes_str, event):
"""处理视频,检测物体并触发自定义事件"""
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
classes = list(map(int, classes_str.split(',')))# 将逗号分隔的字符串转换为整数列表
cap = cv2.VideoCapture(video_path)# 打开视频文件使用cv2.VideoCapture()函数,截取视频的帧
skip_frames = 0 # 初始化跳帧数
# 修改callback函数以接受3个参数
def callback(frame, boxes, save_folder):
draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id)
event.subscribe(callback) # 订阅绘制和保存作业
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if skip_frames > 0: # 跳过指定帧数设置skip_frames为0可取消跳帧
skip_frames -= 1 # 倒数计时器
continue
# 使用 YOLO 进行推理
results = model(frame) # 输入当前帧
boxes = [] # 定义一个空列表,用于存放检测框
# 处理 YOLO 的输出
for result in results:
for box in result.boxes:
class_id = int(box.cls) # 获取类别ID
if class_id in classes: # 检查类别是否在指定类别数组中
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
boxes.append((class_id, (x1, y1, x2, y2)))
#如果检测到物体则跳过30帧
if boxes:
event.trigger(frame, boxes, save_folder)
skip_frames = 100 # 检测到物体后跳过30帧
cap.release()
def process_video_threshold(video_path, model, save_folder, model_id, classes_str, event, threshold):
"""
处理视频,并在指定区域绘制检测框,并在指定阈值触发事件
参数说明:
video_path: 视频路径(应当改为视频流??)
model: YOLO 模型
save_folder: 保存帧的文件夹
model_id: 模型ID
classes_str: 要检测的类别(字符串形式)
event: 自定义事件(CustomEvent0、CustomEvent1 等)
threshold: 触发事件的阈值
"""
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表
cap = cv2.VideoCapture(video_path) # 打开视频文件
skip_frames = 0 # 初始化跳帧数
# 订阅绘制和保存作业
event.subscribe(lambda frame, boxes, save_folder: draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if skip_frames > 0: # 跳过指定帧数
skip_frames -= 1
continue
# 使用 YOLO 进行推理
results = model(frame,
conf=0.4, # 置信度阈值
iou=0.5, # 交并比阈值
) # 输入当前帧
boxes = [] # 定义一个空列表,用于存放检测框
# 处理 YOLO 的输出
for result in results:
for box in result.boxes:
class_id = int(box.cls) # 获取类别ID
if class_id in classes: # 检查类别是否在指定类别数组中
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
boxes.append((class_id, (x1, y1, x2, y2)))
# 计算区域内目标数量
object_count = len(boxes)
# 如果目标数量超过阈值,则触发事件
if object_count > threshold:
event.trigger(frame, boxes, save_folder)
skip_frames = 100 # 跳过100帧
cap.release()
#------------------------------------------------------------------------------------------------------------------------------------------------------
# 自定义事件类(用于订阅和触发作业,检测到物体时触发作业)
class CustomEvent0:
def __init__(self):
self._listeners = []
def subscribe(self, listener):
"""订阅一个作业,当事件触发时会执行这个作业"""
self._listeners.append(listener)
def trigger(self, frame, boxes, save_folder):
"""触发事件,执行所有订阅的作业"""
for listener in self._listeners:
listener(frame, boxes, save_folder)
#自定义事件类(用于订阅和触发作业,检测到物体大于一定数量时触发作业)
class CustomEvent1:
def __init__(self, threshold=5):
"""初始化事件类,并设置触发阈值"""
self._listeners = []
self.threshold = threshold # 设置触发作业的物体数量阈值
def subscribe(self, listener):
"""订阅一个作业,当事件触发时会执行这个作业"""
self._listeners.append(listener)
def trigger(self, frame, boxes, save_folder):
"""当检测物体数量超过阈值时,触发事件"""
if len(boxes) >= self.threshold: # 检查是否超过阈值
for listener in self._listeners:
listener(frame, boxes, save_folder)
def in_area_trigger(self, frame, boxes, save_folder, class_id=None, x_min=100, y_min=100, x_max=500, y_max=500):
"""检测屏幕范围内特定类别的物体数量是否超过阈值"""
count_in_area = 0
for box in boxes:
obj_class_id, (x1, y1, x2, y2) = box
# 检查类别(如果指定)和坐标范围
if (class_id is None or obj_class_id == class_id) and x1 >= x_min and y1 >= y_min and x2 <= x_max and y2 <= y_max:
count_in_area += 1
# 如果数量超过阈值,触发事件
if count_in_area >= self.threshold:
for listener in self._listeners:
listener(frame, boxes, save_folder)
#自定义事件类(用于订阅和触发作业,检测到物体小于一定数量时触发作业)
class CustomEvent2:
def __init__(self, threshold=5):
"""初始化事件类,并设置触发阈值"""
self._listeners = []
self.threshold = threshold # 设置触发作业的物体数量阈值
def subscribe(self, listener):
"""订阅一个作业,当事件触发时会执行这个作业"""
self._listeners.append(listener)
def trigger(self, frame, boxes, save_folder):
"""当检测物体数量超过阈值时,触发事件"""
if len(boxes) >= self.threshold: # 检查是否超过阈值
for listener in self._listeners:
listener(frame, boxes, save_folder)
#-----------------------------------------------------------------------------------------------------------------------------------------------
# 获取当前日期和时间精确到分钟作为run_name
current_date = datetime.datetime.now().strftime("%Y%m%d_%H%M")
# 定义处理任务的函数
def process_task(video_path, model_path, save_folder, classes_str, event_type, threshold=None, model_id=None):
"""
统一处理任务的函数
"""
model = YOLO(model_path) # 加载 YOLO 模型
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
if threshold is not None:
# 使用阈值处理视频
threading.Thread(
target=process_video_threshold,
args=(video_path, model, save_folder, model_id, classes_str, event_type, threshold)
).start()
else:
# 不使用阈值处理视频
threading.Thread(
target=process_video,
args=(video_path, model, save_folder, model_id, classes_str, event_type)
).start()
def Start_program(video_path, Start_String):
"""
启动程序,根据 Start_String 启动特定的模型
:param video_path: 视频路径
:param Start_String: 启动模型的字符串
"""
task_configurations = {
'''
"模型名称": {
"model_path": "模型路径",
"save_folder": "保存文件夹路径",
"classes": "检测物体的类别",
"event": 自定义事件类实例, Customevent0:if have aim /1: aim number more than one number /2: aim number less than one number
"threshold": 触发阈值(可选),
"model_id": 模型ID可选
}
'''
# 出现了人
"have_peoples": {
"model_path": "yolo11n.pt",
"save_folder": f"output/{current_date}_People_in_the_area",
"classes": "0",
"event": CustomEvent0(),
"threshold": None,
"model_id": 1,
},
#发现了人员聚集
"many_peoples": {
"model_path": "yolo11n.pt",
"save_folder": f"output/{current_date}_Many_people",
"classes": "0",
"event": CustomEvent1(threshold=5),
"threshold": 5,
"model_id": 2,
},
#发现了无安全帽人员
"no_helmet": {
"model_path": "gdaq.pt",
"save_folder": f"output/{current_date}_Have_no_helmet",
"classes": "1",
"event": CustomEvent0(),
"threshold": None,
"model_id": 3,
},
#无安全绳
"no_safety_line": {
"model_path": "gdaq.pt",
"save_folder": f"output/{current_date}_Have_no_safety_line",
"classes": "3",
"event": CustomEvent0(),
"threshold": None,
"model_id": 4,
},
"smoke": {
"model_path": "smoke.pt",
"save_folder": f"output/{current_date}_Have_smoke",
"classes": "1",
"event": CustomEvent0(),
"threshold": None,
"model_id": 5,
},
#发现了烟雾
"fire": {
"model_path": "smoke.pt",
"save_folder": f"output/{current_date}_Have_fire",
"classes": "0",
"event": CustomEvent0(),
"threshold": None,
"model_id": 6,
},
#护栏破损
"HULAN_POSUN": {
"model_path": "gdaq.pt",
"save_folder": f"output/{current_date}_HULAN_POSUN",
"classes": "6",
"event": CustomEvent0(),
"threshold": None,
"model_id": 7,
},
}
# 将 Start_String 拆分为列表
start_flags = Start_String.split(",")
for flag in start_flags:
if flag in task_configurations:
config = task_configurations[flag]
process_task(
video_path,
config["model_path"],
config["save_folder"],
config["classes"],
config["event"],
config["threshold"],
config["model_id"], #we difine model_id to identify the model
)
if __name__ == '__main__':
# 使用线程处理视频
video_path = r"D:\work\DJI_20241122093213_0043_V.mp4" # 替换为你的视频路径
String = "have_peoples,many_peoples,no_helmet,no_safety_line" # 设置要启动的模型
Start_program(video_path, String) # 启动程序