import os import cv2 import uuid import datetime from ultralytics import YOLO from miniohelp import upload_file from pgadmin_helper import insert_data import numpy as np # 定义颜色映射 def get_color(): COLOR_MAP = { 0: (0, 255, 0), # 绿色 1: (255, 0, 0), # 蓝色 2: (0, 0, 255), # 红色 3: (255, 255, 0), # 青色 4: (0, 255, 255), # 品红 5: (255, 0, 255), # 紫色 6: (128, 0, 0), # 深红 7: (0, 128, 0), # 青绿色 8: (0, 0, 128), # 青蓝色 9: (128, 128, 0), # 暖黄色 10: (128, 0, 128), # 深紫色 11: (0, 128, 128), # 青紫色 12: (128, 128, 128), # 灰色 } return COLOR_MAP # 定义YOLO模型路径 def upload_and_insert_to_db(frame,model,cls_index,save_path,minio_path,client,bucket_name): """ 上传图像到服务器并插入检测结果到数据库 :param frame: 当前处理的帧 :param model: YOLO模型 :param class_index: 检测到的目标类别 :param save_path: 本地保存路径 :param minio_path: Minio 路径 """ # 生成唯一 ID 并保存完整帧 unique_id = uuid.uuid4() filename = f"{unique_id}.jpg" local_path = os.path.join(save_path, filename) os.makedirs(save_path, exist_ok=True) # 确保目录存在 cv2.imwrite(local_path, frame) # 直接保存完整帧 date_str = datetime.datetime.now().strftime("%Y%m%d") # 上传文件到 Minio upload_file(client,local_path, bucket_name, minio_path) # 获取当前时间 time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 插入数据库 data = { "orgcode": "bdzl", "sn": "123456", "snname": "测试机", "spaceid": str(unique_id), "spacename": "目标检测", "guid": str(unique_id), "level": "一般", "title": model.names[cls_index], # 目标类别名称 "dangertype": "目标", "content": "目标检测", "gps1": "", "dealresult": "", "imgobjectname": f"{minio_path}/{date_str}/{filename}", # 修正路径 "state": 0, "createtm": time_str, "dealtm": time_str, "createuser": "", "dealuser": "", "checkuser": "", "remarks": "", } insert_data("danger", data) print(data) # ------------------------ 目标检测 ------------------------ def detect_objects(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 错误: 无法打开视频文件 {video_path}") return frame_id = 0 frame_skip = 0 # 跳帧计数器 while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_id += 1 if frame_skip > 0: frame_skip -= 1 # 继续跳帧 continue results = model.predict(frame) # 打印模型类别映射(仅调试用) if frame_id == 1: print("🔍 模型类别映射:", model.names) COLOR_MAP = get_color() detected = False # 记录是否有检测到目标 for result in results: for box in result.boxes: cls_index = int(box.cls[0]) confidence = float(box.conf[0]) if cls_index in TARGET_CLASS_INDICES and confidence > 0.5: detected = True # 记录本帧是否检测到目标 label = f"{model.names[cls_index]} {confidence:.2f}" color = COLOR_MAP.get(cls_index, (255, 255, 255)) # 画框和标签 x1, y1, x2, y2 = map(int, box.xyxy[0]) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) # 生成唯一 ID 并保存完整帧 save_path = "temp" minio_path = "AIResults" # upload_and_insert_to_db(frame,model,cls_index,save_path,minio_path) if detected: frame_skip = max(frame_skip, 100) # 避免重复覆盖 # 显示帧(如果需要) cv2.imshow("frame", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() cv2.destroyAllWindows() # ------------------- 运行 ------------------- if __name__ == "__main__": video_path = r"D:\work\develop\yolo\car\yolo\76f9f655c47184238b385a6bb2a20a77.mp4" YOLO_MODEL_PATH = r"D:\work\develop\yolo\car\yolo\best.pt" TARGET_CLASS_INDICES = [0, 2] # 只检测类别索引 0 和 2 model = YOLO(YOLO_MODEL_PATH) detect_objects(video_path)