355 lines
14 KiB
Python
355 lines
14 KiB
Python
import threading
|
||
import os
|
||
from ultralytics import YOLO
|
||
import cv2
|
||
import threading
|
||
import os
|
||
from ultralytics import YOLO
|
||
import datetime
|
||
#@yoooger
|
||
#-----------------------------------------------------------------------------------------------------------------------------------------------
|
||
# 定义颜色映射
|
||
colors = {
|
||
"0": (0, 255, 0), # 绿色
|
||
"1": (0, 0, 255), # 蓝色
|
||
"2": (255, 0, 0), # 红色
|
||
"3": (255, 255, 0), # 黄色
|
||
"4": (0, 255, 255), # 青色
|
||
"5": (255, 0, 255), # 紫色
|
||
"6": (128, 0, 0), # 紫色
|
||
"7": (0, 128, 0), # 绿色
|
||
"8": (0, 0, 128), # 蓝色
|
||
"9": (128, 128, 0), # 黄色
|
||
"10": (128, 0, 128), # 紫色
|
||
"11": (0, 128, 128), # 青色
|
||
}
|
||
#-----------------------------------------------------------------------------------------------------------------------------------------------
|
||
def draw_and_save(frame, boxes, save_folder, frame_number, model_id):
|
||
"""绘制检测框并保存当前帧"""
|
||
|
||
for box in boxes:
|
||
class_id, (x1, y1, x2, y2) = box
|
||
color = colors[class_id] # 根据类别选择颜色
|
||
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
||
|
||
# 保存当前帧到指定文件夹
|
||
cv2.imwrite(os.path.join(save_folder, f"model_{model_id}_frame_{frame_number}.jpg"), frame)
|
||
#-----------------------------------------------------------------------------------------------------------------------------------------------
|
||
def process_images(image_folder_path, model, save_folder, model_id, classes_str, event):
|
||
"""处理文件夹中的图片,检测物体并触发自定义事件"""
|
||
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
|
||
classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表
|
||
|
||
# 回调函数用于绘制并保存帧
|
||
def callback(frame, boxes, save_folder, frame_number):
|
||
draw_and_save(frame, boxes, save_folder, frame_number, model_id)
|
||
|
||
event.subscribe(callback) # 订阅绘制和保存作业
|
||
|
||
# 遍历文件夹中的每个图片文件
|
||
for image_name in enumerate(os.listdir(image_folder_path)):
|
||
|
||
image_path = os.path.join(image_folder_path, image_name)
|
||
frame = cv2.imread(image_path) # 读取图片
|
||
if frame is None:
|
||
continue # 跳过无法读取的图片
|
||
|
||
# 使用 YOLO 进行推理
|
||
results = model(frame) # 输入当前帧
|
||
boxes = [] # 定义一个空列表,用于存放检测框
|
||
|
||
# 处理 YOLO 的输出
|
||
for result in results:
|
||
for box in result.boxes:
|
||
class_id = int(box.cls) # 获取类别ID
|
||
if class_id in classes: # 检查类别是否在指定类别数组中
|
||
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
|
||
boxes.append((class_id, (x1, y1, x2, y2)))
|
||
|
||
#-------------------------------------------------------------------------------------------------------------------------------------
|
||
def process_video(video_path, model, save_folder, model_id, classes_str, event):
|
||
"""处理视频,检测物体并触发自定义事件"""
|
||
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
|
||
classes = list(map(int, classes_str.split(',')))# 将逗号分隔的字符串转换为整数列表
|
||
cap = cv2.VideoCapture(video_path)# 打开视频文件,使用cv2.VideoCapture()函数,截取视频的帧
|
||
skip_frames = 0 # 初始化跳帧数
|
||
|
||
# 修改callback函数以接受3个参数
|
||
def callback(frame, boxes, save_folder):
|
||
draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id)
|
||
|
||
event.subscribe(callback) # 订阅绘制和保存作业
|
||
|
||
while cap.isOpened():
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
if skip_frames > 0: # 跳过指定帧数,设置skip_frames为0可取消跳帧
|
||
skip_frames -= 1 # 倒数计时器
|
||
continue
|
||
|
||
# 使用 YOLO 进行推理
|
||
results = model(frame) # 输入当前帧
|
||
boxes = [] # 定义一个空列表,用于存放检测框
|
||
|
||
# 处理 YOLO 的输出
|
||
for result in results:
|
||
for box in result.boxes:
|
||
class_id = int(box.cls) # 获取类别ID
|
||
if class_id in classes: # 检查类别是否在指定类别数组中
|
||
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
|
||
boxes.append((class_id, (x1, y1, x2, y2)))
|
||
|
||
#如果检测到物体,则跳过30帧
|
||
if boxes:
|
||
event.trigger(frame, boxes, save_folder)
|
||
skip_frames = 100 # 检测到物体后跳过30帧
|
||
|
||
cap.release()
|
||
|
||
def process_video_threshold(video_path, model, save_folder, model_id, classes_str, event, threshold):
|
||
"""
|
||
处理视频,并在指定区域绘制检测框,并在指定阈值触发事件
|
||
参数说明:
|
||
video_path: 视频路径(应当改为视频流??)
|
||
model: YOLO 模型
|
||
save_folder: 保存帧的文件夹
|
||
model_id: 模型ID
|
||
classes_str: 要检测的类别(字符串形式)
|
||
event: 自定义事件(CustomEvent0、CustomEvent1 等)
|
||
threshold: 触发事件的阈值
|
||
"""
|
||
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
|
||
classes = list(map(int, classes_str.split(','))) # 将逗号分隔的字符串转换为整数列表
|
||
cap = cv2.VideoCapture(video_path) # 打开视频文件
|
||
skip_frames = 0 # 初始化跳帧数
|
||
|
||
# 订阅绘制和保存作业
|
||
event.subscribe(lambda frame, boxes, save_folder: draw_and_save(frame, boxes, save_folder, int(cap.get(cv2.CAP_PROP_POS_FRAMES)), model_id))
|
||
|
||
while cap.isOpened():
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
if skip_frames > 0: # 跳过指定帧数
|
||
skip_frames -= 1
|
||
continue
|
||
|
||
# 使用 YOLO 进行推理
|
||
results = model(frame,
|
||
conf=0.4, # 置信度阈值
|
||
iou=0.5, # 交并比阈值
|
||
) # 输入当前帧
|
||
boxes = [] # 定义一个空列表,用于存放检测框
|
||
|
||
# 处理 YOLO 的输出
|
||
for result in results:
|
||
for box in result.boxes:
|
||
class_id = int(box.cls) # 获取类别ID
|
||
if class_id in classes: # 检查类别是否在指定类别数组中
|
||
x1, y1, x2, y2 = map(int, box.xyxy[0]) # 获取框坐标
|
||
boxes.append((class_id, (x1, y1, x2, y2)))
|
||
|
||
# 计算区域内目标数量
|
||
object_count = len(boxes)
|
||
|
||
# 如果目标数量超过阈值,则触发事件
|
||
if object_count > threshold:
|
||
event.trigger(frame, boxes, save_folder)
|
||
skip_frames = 100 # 跳过100帧
|
||
|
||
cap.release()
|
||
#------------------------------------------------------------------------------------------------------------------------------------------------------
|
||
# 自定义事件类(用于订阅和触发作业,检测到物体时触发作业)
|
||
class CustomEvent0:
|
||
def __init__(self):
|
||
self._listeners = []
|
||
|
||
def subscribe(self, listener):
|
||
"""订阅一个作业,当事件触发时会执行这个作业"""
|
||
self._listeners.append(listener)
|
||
|
||
def trigger(self, frame, boxes, save_folder):
|
||
"""触发事件,执行所有订阅的作业"""
|
||
for listener in self._listeners:
|
||
listener(frame, boxes, save_folder)
|
||
|
||
#自定义事件类(用于订阅和触发作业,检测到物体大于一定数量时触发作业)
|
||
class CustomEvent1:
|
||
def __init__(self, threshold=5):
|
||
"""初始化事件类,并设置触发阈值"""
|
||
self._listeners = []
|
||
self.threshold = threshold # 设置触发作业的物体数量阈值
|
||
|
||
def subscribe(self, listener):
|
||
"""订阅一个作业,当事件触发时会执行这个作业"""
|
||
self._listeners.append(listener)
|
||
|
||
def trigger(self, frame, boxes, save_folder):
|
||
"""当检测物体数量超过阈值时,触发事件"""
|
||
if len(boxes) >= self.threshold: # 检查是否超过阈值
|
||
for listener in self._listeners:
|
||
listener(frame, boxes, save_folder)
|
||
|
||
def in_area_trigger(self, frame, boxes, save_folder, class_id=None, x_min=100, y_min=100, x_max=500, y_max=500):
|
||
"""检测屏幕范围内特定类别的物体数量是否超过阈值"""
|
||
count_in_area = 0
|
||
for box in boxes:
|
||
obj_class_id, (x1, y1, x2, y2) = box
|
||
# 检查类别(如果指定)和坐标范围
|
||
if (class_id is None or obj_class_id == class_id) and x1 >= x_min and y1 >= y_min and x2 <= x_max and y2 <= y_max:
|
||
count_in_area += 1
|
||
|
||
# 如果数量超过阈值,触发事件
|
||
if count_in_area >= self.threshold:
|
||
for listener in self._listeners:
|
||
listener(frame, boxes, save_folder)
|
||
|
||
#自定义事件类(用于订阅和触发作业,检测到物体小于一定数量时触发作业)
|
||
class CustomEvent2:
|
||
def __init__(self, threshold=5):
|
||
"""初始化事件类,并设置触发阈值"""
|
||
self._listeners = []
|
||
self.threshold = threshold # 设置触发作业的物体数量阈值
|
||
|
||
def subscribe(self, listener):
|
||
"""订阅一个作业,当事件触发时会执行这个作业"""
|
||
self._listeners.append(listener)
|
||
|
||
def trigger(self, frame, boxes, save_folder):
|
||
"""当检测物体数量超过阈值时,触发事件"""
|
||
if len(boxes) >= self.threshold: # 检查是否超过阈值
|
||
for listener in self._listeners:
|
||
listener(frame, boxes, save_folder)
|
||
|
||
#-----------------------------------------------------------------------------------------------------------------------------------------------
|
||
|
||
# 获取当前日期和时间精确到分钟作为run_name
|
||
current_date = datetime.datetime.now().strftime("%Y%m%d_%H%M")
|
||
# 定义处理任务的函数
|
||
def process_task(video_path, model_path, save_folder, classes_str, event_type, threshold=None, model_id=None):
|
||
"""
|
||
统一处理任务的函数
|
||
"""
|
||
model = YOLO(model_path) # 加载 YOLO 模型
|
||
os.makedirs(save_folder, exist_ok=True) # 创建保存文件夹(如果不存在)
|
||
|
||
if threshold is not None:
|
||
# 使用阈值处理视频
|
||
threading.Thread(
|
||
target=process_video_threshold,
|
||
args=(video_path, model, save_folder, model_id, classes_str, event_type, threshold)
|
||
).start()
|
||
else:
|
||
# 不使用阈值处理视频
|
||
threading.Thread(
|
||
target=process_video,
|
||
args=(video_path, model, save_folder, model_id, classes_str, event_type)
|
||
).start()
|
||
|
||
def Start_program(video_path, Start_String):
|
||
"""
|
||
启动程序,根据 Start_String 启动特定的模型
|
||
:param video_path: 视频路径
|
||
:param Start_String: 启动模型的字符串
|
||
"""
|
||
task_configurations = {
|
||
|
||
'''
|
||
"模型名称": {
|
||
"model_path": "模型路径",
|
||
"save_folder": "保存文件夹路径",
|
||
"classes": "检测物体的类别",
|
||
"event": 自定义事件类实例, Customevent0:if have aim /1: aim number more than one number /2: aim number less than one number
|
||
"threshold": 触发阈值(可选),
|
||
"model_id": 模型ID(可选)
|
||
}
|
||
'''
|
||
|
||
# 出现了人
|
||
"have_peoples": {
|
||
"model_path": "yolo11n.pt",
|
||
"save_folder": f"output/{current_date}_People_in_the_area",
|
||
"classes": "0",
|
||
"event": CustomEvent0(),
|
||
"threshold": None,
|
||
"model_id": 1,
|
||
},
|
||
#发现了人员聚集
|
||
"many_peoples": {
|
||
"model_path": "yolo11n.pt",
|
||
"save_folder": f"output/{current_date}_Many_people",
|
||
"classes": "0",
|
||
"event": CustomEvent1(threshold=5),
|
||
"threshold": 5,
|
||
"model_id": 2,
|
||
},
|
||
#发现了无安全帽人员
|
||
"no_helmet": {
|
||
"model_path": "gdaq.pt",
|
||
"save_folder": f"output/{current_date}_Have_no_helmet",
|
||
"classes": "1",
|
||
"event": CustomEvent0(),
|
||
"threshold": None,
|
||
"model_id": 3,
|
||
},
|
||
#无安全绳
|
||
"no_safety_line": {
|
||
"model_path": "gdaq.pt",
|
||
"save_folder": f"output/{current_date}_Have_no_safety_line",
|
||
"classes": "3",
|
||
"event": CustomEvent0(),
|
||
"threshold": None,
|
||
"model_id": 4,
|
||
},
|
||
"smoke": {
|
||
"model_path": "smoke.pt",
|
||
"save_folder": f"output/{current_date}_Have_smoke",
|
||
"classes": "1",
|
||
"event": CustomEvent0(),
|
||
"threshold": None,
|
||
"model_id": 5,
|
||
},
|
||
#发现了烟雾
|
||
"fire": {
|
||
"model_path": "smoke.pt",
|
||
"save_folder": f"output/{current_date}_Have_fire",
|
||
"classes": "0",
|
||
"event": CustomEvent0(),
|
||
"threshold": None,
|
||
"model_id": 6,
|
||
},
|
||
#护栏破损
|
||
"HULAN_POSUN": {
|
||
"model_path": "gdaq.pt",
|
||
"save_folder": f"output/{current_date}_HULAN_POSUN",
|
||
"classes": "6",
|
||
"event": CustomEvent0(),
|
||
"threshold": None,
|
||
"model_id": 7,
|
||
},
|
||
}
|
||
# 将 Start_String 拆分为列表
|
||
start_flags = Start_String.split(",")
|
||
|
||
for flag in start_flags:
|
||
if flag in task_configurations:
|
||
config = task_configurations[flag]
|
||
process_task(
|
||
video_path,
|
||
config["model_path"],
|
||
config["save_folder"],
|
||
config["classes"],
|
||
config["event"],
|
||
config["threshold"],
|
||
config["model_id"], #we difine model_id to identify the model
|
||
)
|
||
|
||
if __name__ == '__main__':
|
||
# 使用线程处理视频
|
||
video_path = r"D:\work\DJI_20241122093213_0043_V.mp4" # 替换为你的视频路径
|
||
String = "have_peoples,many_peoples,no_helmet,no_safety_line" # 设置要启动的模型
|
||
Start_program(video_path, String) # 启动程序
|