153 lines
4.7 KiB
Python
153 lines
4.7 KiB
Python
import os
|
|
import cv2
|
|
import uuid
|
|
import datetime
|
|
from ultralytics import YOLO
|
|
from miniohelp import upload_file
|
|
from pgadmin_helper import insert_data
|
|
import numpy as np
|
|
|
|
# 定义颜色映射
|
|
def get_color():
|
|
COLOR_MAP = {
|
|
0: (0, 255, 0), # 绿色
|
|
1: (255, 0, 0), # 蓝色
|
|
2: (0, 0, 255), # 红色
|
|
3: (255, 255, 0), # 青色
|
|
4: (0, 255, 255), # 品红
|
|
5: (255, 0, 255), # 紫色
|
|
6: (128, 0, 0), # 深红
|
|
7: (0, 128, 0), # 青绿色
|
|
8: (0, 0, 128), # 青蓝色
|
|
9: (128, 128, 0), # 暖黄色
|
|
10: (128, 0, 128), # 深紫色
|
|
11: (0, 128, 128), # 青紫色
|
|
12: (128, 128, 128), # 灰色
|
|
}
|
|
return COLOR_MAP
|
|
|
|
# 定义YOLO模型路径
|
|
def upload_and_insert_to_db(frame,model,cls_index,save_path,minio_path,client,bucket_name):
|
|
"""
|
|
上传图像到服务器并插入检测结果到数据库
|
|
:param frame: 当前处理的帧
|
|
:param model: YOLO模型
|
|
:param class_index: 检测到的目标类别
|
|
:param save_path: 本地保存路径
|
|
:param minio_path: Minio 路径
|
|
"""
|
|
# 生成唯一 ID 并保存完整帧
|
|
unique_id = uuid.uuid4()
|
|
filename = f"{unique_id}.jpg"
|
|
local_path = os.path.join(save_path, filename)
|
|
|
|
|
|
os.makedirs(save_path, exist_ok=True) # 确保目录存在
|
|
cv2.imwrite(local_path, frame) # 直接保存完整帧
|
|
date_str = datetime.datetime.now().strftime("%Y%m%d")
|
|
|
|
# 上传文件到 Minio
|
|
upload_file(client,local_path, bucket_name, minio_path)
|
|
|
|
# 获取当前时间
|
|
time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# 插入数据库
|
|
data = {
|
|
"orgcode": "bdzl",
|
|
"sn": "123456",
|
|
"snname": "测试机",
|
|
"spaceid": str(unique_id),
|
|
"spacename": "目标检测",
|
|
"guid": str(unique_id),
|
|
"level": "一般",
|
|
"title": model.names[cls_index], # 目标类别名称
|
|
"dangertype": "目标",
|
|
"content": "目标检测",
|
|
"gps1": "",
|
|
"dealresult": "",
|
|
"imgobjectname": f"{minio_path}/{date_str}/{filename}", # 修正路径
|
|
"state": 0,
|
|
"createtm": time_str,
|
|
"dealtm": time_str,
|
|
"createuser": "",
|
|
"dealuser": "",
|
|
"checkuser": "",
|
|
"remarks": "",
|
|
}
|
|
|
|
insert_data("danger", data)
|
|
print(data)
|
|
|
|
|
|
# ------------------------ 目标检测 ------------------------
|
|
def detect_objects(video_path):
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
print(f"❌ 错误: 无法打开视频文件 {video_path}")
|
|
return
|
|
|
|
frame_id = 0
|
|
frame_skip = 0 # 跳帧计数器
|
|
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
frame_id += 1
|
|
|
|
if frame_skip > 0:
|
|
frame_skip -= 1 # 继续跳帧
|
|
continue
|
|
|
|
results = model.predict(frame)
|
|
|
|
# 打印模型类别映射(仅调试用)
|
|
if frame_id == 1:
|
|
print("🔍 模型类别映射:", model.names)
|
|
|
|
COLOR_MAP = get_color()
|
|
detected = False # 记录是否有检测到目标
|
|
|
|
for result in results:
|
|
for box in result.boxes:
|
|
cls_index = int(box.cls[0])
|
|
confidence = float(box.conf[0])
|
|
|
|
if cls_index in TARGET_CLASS_INDICES and confidence > 0.5:
|
|
detected = True # 记录本帧是否检测到目标
|
|
label = f"{model.names[cls_index]} {confidence:.2f}"
|
|
color = COLOR_MAP.get(cls_index, (255, 255, 255))
|
|
|
|
# 画框和标签
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0])
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
|
cv2.putText(frame, label, (x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
|
|
|
# 生成唯一 ID 并保存完整帧
|
|
save_path = "temp"
|
|
minio_path = "AIResults"
|
|
# upload_and_insert_to_db(frame,model,cls_index,save_path,minio_path)
|
|
|
|
if detected:
|
|
frame_skip = max(frame_skip, 100) # 避免重复覆盖
|
|
|
|
# 显示帧(如果需要)
|
|
cv2.imshow("frame", frame)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
# ------------------- 运行 -------------------
|
|
if __name__ == "__main__":
|
|
video_path = r"D:\work\develop\yolo\car\yolo\76f9f655c47184238b385a6bb2a20a77.mp4"
|
|
YOLO_MODEL_PATH = r"D:\work\develop\yolo\car\yolo\best.pt"
|
|
TARGET_CLASS_INDICES = [0, 2] # 只检测类别索引 0 和 2
|
|
|
|
model = YOLO(YOLO_MODEL_PATH)
|
|
detect_objects(video_path)
|