AI_python_yoooger/event/A_lot_of.py

356 lines
14 KiB
Python
Raw Normal View History

2025-07-09 15:34:23 +08:00
import threading
import os
from ultralytics import YOLO
import cv2
import threading
import os
from ultralytics import YOLO
import datetime
#@yoooger
#-----------------------------------------------------------------------------------------------------------------------------------------------
# 定义颜色映射
colors = {
"0": (0, 255, 0), # 绿色
"1": (0, 0, 255), # 蓝色
"2": (255, 0, 0), # 红色
"3": (255, 255, 0), # 黄色
"4": (0, 255, 255), # 青色
"5": (255, 0, 255), # 紫色
"6": (128, 0, 0), # 紫色
"7": (0, 128, 0), # 绿色
"8": (0, 0, 128), # 蓝色
"9": (128, 128, 0), # 黄色
"10": (128, 0, 128), # 紫色
"11": (0, 128, 128), # 青色
}
#-----------------------------------------------------------------------------------------------------------------------------------------------
def draw_and_save(frame, boxes, save_folder, frame_number, model_id):
"""绘制检测框并保存当前帧"""
for box in boxes:
class_id, (x1, y1, x2, y2) = box
color = colors[class_id] # 根据类别选择颜色
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# 保存当前帧到指定文件夹
cv2.imwrite(os.path.join(save_folder, f"model_{model_id}_frame_{frame_number}.jpg"), frame)
#-----------------------------------------------------------------------------------------------------------------------------------------------
def process_images(image_folder_path, model, save_folder, model_id, classes_str, event):
"""处理文件夹中的图片,检测物体并触发自定义事件"""
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表
# 回调函数用于绘制并保存帧
def callback(frame, boxes, save_folder, frame_number):
draw_and_save(frame, boxes, save_folder, frame_number, model_id)
event.subscribe(callback) # 订阅绘制和保存作业
# 遍历文件夹中的每个图片文件
for frame_number, image_name in enumerate(os.listdir(image_folder_path)):
image_path = os.path.join(image_folder_path, image_name)
frame = cv2.imread(image_path) # 读取图片
if frame is None:
continue # 跳过无法读取的图片
# 使用 YOLO 进行推理
results = model(frame) # 输入当前帧
boxes = [] # 定义一个空列表,用于存放检测框
# 处理 YOLO 的输出
for result in results:
for box in result.boxes:
class_id = int(box.cls) # 获取类别ID
if class_id in classes: # 检查类别是否在指定类别数组中
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
boxes.append((class_id, (x1, y1, x2, y2)))
#-------------------------------------------------------------------------------------------------------------------------------------
def process_video(video_path, model, save_folder, model_id, classes_str, event):
"""处理视频,检测物体并触发自定义事件"""
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
classes = list(map(int, classes_str.split(',')))# 将逗号分隔的字符串转换为整数列表
cap = cv2.VideoCapture(video_path)# 打开视频文件使用cv2.VideoCapture()函数,截取视频的帧
skip_frames = 0 # 初始化跳帧数
# 修改callback函数以接受3个参数
def callback(frame, boxes, save_folder):
draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id)
event.subscribe(callback) # 订阅绘制和保存作业
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if skip_frames > 0: # 跳过指定帧数设置skip_frames为0可取消跳帧
skip_frames -= 1 # 倒数计时器
continue
# 使用 YOLO 进行推理
results = model(frame) # 输入当前帧
boxes = [] # 定义一个空列表,用于存放检测框
# 处理 YOLO 的输出
for result in results:
for box in result.boxes:
class_id = int(box.cls) # 获取类别ID
if class_id in classes: # 检查类别是否在指定类别数组中
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
boxes.append((class_id, (x1, y1, x2, y2)))
#如果检测到物体则跳过30帧
if boxes:
event.trigger(frame, boxes, save_folder)
skip_frames = 100 # 检测到物体后跳过30帧
cap.release()
def process_video_threshold(video_path, model, save_folder, model_id, classes_str, event, threshold):
"""
处理视频并在指定区域绘制检测框并在指定阈值触发事件
参数说明
video_path: 视频路径应当改为视频流??)
model: YOLO 模型
save_folder: 保存帧的文件夹
model_id: 模型ID
classes_str: 要检测的类别字符串形式
event: 自定义事件(CustomEvent0CustomEvent1 )
threshold: 触发事件的阈值
"""
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表
cap = cv2.VideoCapture(video_path) # 打开视频文件
skip_frames = 0 # 初始化跳帧数
# 订阅绘制和保存作业
event.subscribe(lambda frame, boxes, save_folder: draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if skip_frames > 0: # 跳过指定帧数
skip_frames -= 1
continue
# 使用 YOLO 进行推理
results = model(frame,
conf=0.4, # 置信度阈值
iou=0.5, # 交并比阈值
) # 输入当前帧
boxes = [] # 定义一个空列表,用于存放检测框
# 处理 YOLO 的输出
for result in results:
for box in result.boxes:
class_id = int(box.cls) # 获取类别ID
if class_id in classes: # 检查类别是否在指定类别数组中
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
boxes.append((class_id, (x1, y1, x2, y2)))
# 计算区域内目标数量
object_count = len(boxes)
# 如果目标数量超过阈值,则触发事件
if object_count > threshold:
event.trigger(frame, boxes, save_folder)
skip_frames = 100 # 跳过100帧
cap.release()
#------------------------------------------------------------------------------------------------------------------------------------------------------
# 自定义事件类(用于订阅和触发作业,检测到物体时触发作业)
class CustomEvent0:
def __init__(self):
self._listeners = []
def subscribe(self, listener):
"""订阅一个作业,当事件触发时会执行这个作业"""
self._listeners.append(listener)
def trigger(self, frame, boxes, save_folder):
"""触发事件,执行所有订阅的作业"""
for listener in self._listeners:
listener(frame, boxes, save_folder)
#自定义事件类(用于订阅和触发作业,检测到物体大于一定数量时触发作业)
class CustomEvent1:
def __init__(self, threshold=5):
"""初始化事件类,并设置触发阈值"""
self._listeners = []
self.threshold = threshold # 设置触发作业的物体数量阈值
def subscribe(self, listener):
"""订阅一个作业,当事件触发时会执行这个作业"""
self._listeners.append(listener)
def trigger(self, frame, boxes, save_folder):
"""当检测物体数量超过阈值时,触发事件"""
if len(boxes) >= self.threshold: # 检查是否超过阈值
for listener in self._listeners:
listener(frame, boxes, save_folder)
def in_area_trigger(self, frame, boxes, save_folder, class_id=None, x_min=100, y_min=100, x_max=500, y_max=500):
"""检测屏幕范围内特定类别的物体数量是否超过阈值"""
count_in_area = 0
for box in boxes:
obj_class_id, (x1, y1, x2, y2) = box
# 检查类别(如果指定)和坐标范围
if (class_id is None or obj_class_id == class_id) and x1 >= x_min and y1 >= y_min and x2 <= x_max and y2 <= y_max:
count_in_area += 1
# 如果数量超过阈值,触发事件
if count_in_area >= self.threshold:
for listener in self._listeners:
listener(frame, boxes, save_folder)
#自定义事件类(用于订阅和触发作业,检测到物体小于一定数量时触发作业)
class CustomEvent2:
def __init__(self, threshold=5):
"""初始化事件类,并设置触发阈值"""
self._listeners = []
self.threshold = threshold # 设置触发作业的物体数量阈值
def subscribe(self, listener):
"""订阅一个作业,当事件触发时会执行这个作业"""
self._listeners.append(listener)
def trigger(self, frame, boxes, save_folder):
"""当检测物体数量超过阈值时,触发事件"""
if len(boxes) >= self.threshold: # 检查是否超过阈值
for listener in self._listeners:
listener(frame, boxes, save_folder)
#-----------------------------------------------------------------------------------------------------------------------------------------------
# 获取当前日期和时间精确到分钟作为run_name
current_date = datetime.datetime.now().strftime("%Y%m%d_%H%M")
# 定义处理任务的函数
def process_task(video_path, model_path, save_folder, classes_str, event_type, threshold=None, model_id=None):
"""
统一处理任务的函数
"""
model = YOLO(model_path) # 加载 YOLO 模型
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
if threshold is not None:
# 使用阈值处理视频
threading.Thread(
target=process_video_threshold,
args=(video_path, model, save_folder, model_id, classes_str, event_type, threshold)
).start()
else:
# 不使用阈值处理视频
threading.Thread(
target=process_video,
args=(video_path, model, save_folder, model_id, classes_str, event_type)
).start()
def Start_program(video_path, Start_String):
"""
启动程序根据 Start_String 启动特定的模型
:param video_path: 视频路径
:param Start_String: 启动模型的字符串
"""
task_configurations = {
'''
"模型名称": {
"model_path": "模型路径",
"save_folder": "保存文件夹路径",
"classes": "检测物体的类别",
"event": 自定义事件类实例, Customevent0:if have aim /1: aim number more than one number /2: aim number less than one number
"threshold": 触发阈值可选
"model_id": 模型ID可选
}
'''
# 出现了人
"have_peoples": {
"model_path": "yolo11n.pt",
"save_folder": f"output/{current_date}_People_in_the_area",
"classes": "0",
"event": CustomEvent0(),
"threshold": None,
"model_id": 1,
},
#发现了人员聚集
"many_peoples": {
"model_path": "yolo11n.pt",
"save_folder": f"output/{current_date}_Many_people",
"classes": "0",
"event": CustomEvent1(threshold=5),
"threshold": 5,
"model_id": 2,
},
#发现了无安全帽人员
"no_helmet": {
"model_path": "gdaq.pt",
"save_folder": f"output/{current_date}_Have_no_helmet",
"classes": "1",
"event": CustomEvent0(),
"threshold": None,
"model_id": 3,
},
#无安全绳
"no_safety_line": {
"model_path": "gdaq.pt",
"save_folder": f"output/{current_date}_Have_no_safety_line",
"classes": "3",
"event": CustomEvent0(),
"threshold": None,
"model_id": 4,
},
"smoke": {
"model_path": "smoke.pt",
"save_folder": f"output/{current_date}_Have_smoke",
"classes": "1",
"event": CustomEvent0(),
"threshold": None,
"model_id": 5,
},
#发现了烟雾
"fire": {
"model_path": "smoke.pt",
"save_folder": f"output/{current_date}_Have_fire",
"classes": "0",
"event": CustomEvent0(),
"threshold": None,
"model_id": 6,
},
#护栏破损
"HULAN_POSUN": {
"model_path": "gdaq.pt",
"save_folder": f"output/{current_date}_HULAN_POSUN",
"classes": "6",
"event": CustomEvent0(),
"threshold": None,
"model_id": 7,
},
}
# 将 Start_String 拆分为列表
start_flags = Start_String.split(",")
for flag in start_flags:
if flag in task_configurations:
config = task_configurations[flag]
process_task(
video_path,
config["model_path"],
config["save_folder"],
config["classes"],
config["event"],
config["threshold"],
config["model_id"], #we difine model_id to identify the model
)
if __name__ == '__main__':
# 使用线程处理视频
video_path = r"D:\work\DJI_20241122093213_0043_V.mp4" # 替换为你的视频路径
String = "have_peoples,many_peoples,no_helmet,no_safety_line" # 设置要启动的模型
Start_program(video_path, String) # 启动程序