diff --git a/ai2/api_inf.md b/ai2/api_inf.md new file mode 100644 index 0000000..68c6eda --- /dev/null +++ b/ai2/api_inf.md @@ -0,0 +1,93 @@ +# AI 推理 API 接口说明 + +🔹 1. 启动检测任务 `/ai/stream/detect` + +* **方法**:`POST` +* **是否需要 Body**:✅ 是 + +#### 请求参数(JSON Body) + +```json +{ + "user_id": "user_12345", + "source_url": "rtsp://192.168.10.5/live/stream1", + "push_url": "rtmp://192.168.10.5/live/output1", + "model_path": "/models/yolov8n.pt", + "detect_classes": ["person", "car", "bicycle"], + "confidence": 0.6 +} +``` + +| 字段名 | 类型 | 是否必填 | 说明 | +| ---------------- | --------- | ---- | ----------------------- | +| `user_id` | string | ✅ 是 | 用户 ID,必须存在于数据库表 `users` | +| `source_url` | string | ✅ 是 | 输入视频流地址(RTSP、本地等) | +| `push_url` | string | ✅ 是 | 输出推流地址(RTMP) | +| `model_path` | string | ✅ 是 | 检测模型路径(`.pt` 文件) | +| `detect_classes` | string\[] | ✅ 是 | 检测目标类别列表 | +| `confidence` | float | ❌ 否 | 检测置信度阈值,默认值为 `0.5` | + +#### 成功响应 + +```json +{ + "success": true, + "message": "任务已启动", + "task_id": "dc391d1f-2de3-4973-9025-8975cb3e3425" +} +``` + +#### 错误响应 + +```json +{ + "success": false, + "message": "用户无权限,禁止启动任务" +} + +🔹 2. 停止指定任务 `/ai/stream/` + +* **方法**:`POST` +* **是否需要 Body**:❌ 否 +* **描述**:根据任务 ID 停止正在运行的检测任务。 + +--- + +🔹 3. 重启指定任务 `/ai/stream/restart/` + +* **方法**:`POST` +* **是否需要 Body**:❌ 否 +* **描述**:重启指定任务。 + +--- + +🔹 4. 查询任务状态 `/ai/stream/` + +* **方法**:`GET` +* **是否需要 Body**:❌ 否 +* **描述**:查看指定任务的运行状态。 + +--- + +🔹 5. 获取所有任务 `/ai/stream/tasks` + +* **方法**:`GET` +* **是否需要 Body**:❌ 否 +* **描述**:返回当前所有正在运行的任务信息。 + +--- + +🔹 6. 停止所有任务 `/ai/stream/stopTasks` + +* **方法**:`POST` +* **是否需要 Body**:❌ 否 +* **描述**:停止所有正在运行的检测任务。 + +--- + +🔹 7. 重置服务 `/ai/reset` + +* **方法**:`POST` +* **是否需要 Body**:❌ 否 +* **描述**:清除全部状态,重置模型、任务、资源状态。 + diff --git a/ai2/yolo_api_forout.py b/ai2/yolo_api_forout.py new file mode 100644 index 0000000..9dd7fd7 --- /dev/null +++ b/ai2/yolo_api_forout.py @@ -0,0 +1,509 @@ +from sanic import Sanic, json +from sanic.response import json as json_response +from sanic.exceptions import Unauthorized, NotFound, SanicException +from dataclasses import dataclass +from typing import List, Dict, Any, Optional +import uuid +import logging +import asyncio +import traceback +from datetime import datetime +from cv_video import startAIVideo,stopAIVideo,getIfAI +from sanic_cors import CORS + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# 服务状态标志 +service_status = {"is_healthy": True, "last_error": None, "error_time": None} + +# 配置类 +class Config: + VALID_TOKEN = "5e8899fe-dc74-4280-8169-2f4d185f3afa" + MAX_ACTIVE_TASKS = 10 + DEFAULT_CONFIDENCE = 0.5 + RESTART_DELAY = 2 # 服务尝试自动恢复前的延迟(秒) + + +@dataclass +class StreamRequest: + source_url: str + push_url: str + model_path: str + detect_classes: List[str] + confidence: float = Config.DEFAULT_CONFIDENCE + + def validate(self) -> None: + """验证请求参数""" + if not self.source_url or not self.push_url: + raise ValueError("Source URL and Push URL are required") + + if not self.detect_classes: + raise ValueError("At least one detection class must be specified") + if not 0 < self.confidence < 1: + raise ValueError("Confidence must be between 0 and 1") + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'StreamRequest': + try: + instance = cls( + source_url=data['source_url'], + push_url=data['push_url'], + model_path=data['model_path'], + detect_classes=data['detect_classes'], + confidence=data.get('confidence', Config.DEFAULT_CONFIDENCE) + ) + instance.validate() + return instance + except KeyError as e: + raise ValueError(f"Missing required field: {str(e)}") + +class TaskManager: + def __init__(self): + self.active_tasks: Dict[str, Dict[str, Any]] = {} + self.task_status: Dict[str, str] = {} + self.task_timestamps: Dict[str, datetime] = {} + + def add_task(self, task_id: str, task_info: Dict[str, Any]) -> None: + """添加新任务""" + if len(self.active_tasks) >= Config.MAX_ACTIVE_TASKS: + raise ValueError("Maximum number of active tasks reached") + + self.active_tasks[task_id] = task_info + self.task_status[task_id] = "running" + self.task_timestamps[task_id] = datetime.now() + logger.info(f"Task {task_id} started") + + def remove_task(self, task_id: str) -> None: + """移除任务""" + if task_id in self.active_tasks: + del self.active_tasks[task_id] + del self.task_status[task_id] + del self.task_timestamps[task_id] + logger.info(f"Task {task_id} removed") + + def get_task_info(self, task_id: str) -> Dict[str, Any]: + """获取任务信息""" + if task_id not in self.active_tasks: + raise NotFound("Task not found") + + return { + "task_info": self.active_tasks[task_id], + "status": self.task_status[task_id], + "start_time": self.task_timestamps[task_id].isoformat() + } + + def check_tasks_health(self) -> Dict[str, str]: + """检查任务健康状态""" + unhealthy_tasks = {} + for task_id in list(self.active_tasks.keys()): + # 检查任务是否还在运行(通过getIfAI()函数) + if not getIfAI(): + unhealthy_tasks[task_id] = "stopped" + logger.warning(f"Task {task_id} appears to be stopped unexpectedly") + return unhealthy_tasks + + def mark_all_tasks_as_stopped(self): + """标记所有任务为已停止状态""" + for task_id in list(self.active_tasks.keys()): + self.task_status[task_id] = "stopped" + logger.warning("已将所有任务标记为停止状态") + +app = Sanic("YoloStreamService") +CORS(app) +task_manager = TaskManager() + +async def safe_stop_ai_video(): + """安全地停止AI视频处理,带有错误处理和恢复机制""" + try: + await asyncio.to_thread(stopAIVideo) + return True + except Exception as e: + error_msg = f"停止AI视频处理出错: {str(e)}\n{traceback.format_exc()}" + logger.error(error_msg) + + # 标记服务状态为不健康 + service_status["is_healthy"] = False + service_status["last_error"] = str(e) + service_status["error_time"] = datetime.now().isoformat() + + # 强制结束所有任务 + task_manager.mark_all_tasks_as_stopped() + + # 尝试通过其他方式杀死可能存在的进程 + try: + import os + import signal + import psutil + + current_process = psutil.Process(os.getpid()) + # 查找并终止ffmpeg子进程 + for child in current_process.children(recursive=True): + try: + child_name = child.name().lower() + if 'ffmpeg' in child_name: + logger.info(f"强制终止子进程: {child.pid} ({child_name})") + child.send_signal(signal.SIGTERM) + except Exception as child_e: + logger.error(f"终止子进程出错: {str(child_e)}") + except Exception as kill_e: + logger.error(f"尝试清理进程时出错: {str(kill_e)}") + + # 等待一段时间让系统恢复 + await asyncio.sleep(Config.RESTART_DELAY) + + # 重置服务状态 + service_status["is_healthy"] = True + return False + +def verify_token(request) -> None: + """验证请求token""" + token = request.headers.get('X-API-Token') + if not token or token != Config.VALID_TOKEN: + logger.warning("Invalid token attempt") + raise Unauthorized("Invalid token") + +@app.post("/ai/stream/detect") +async def start_detection(request): + try: + verify_token(request) + + # 检查服务健康状态 + if not service_status["is_healthy"]: + logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']} 于 {service_status['error_time']}") + # 尝试恢复服务 + service_status["is_healthy"] = True + + # 先停止所有现有任务 + for task_id in list(task_manager.active_tasks.keys()): + logger.info(f"停止现有任务 {task_id} 以启动新任务") + try: + success = await safe_stop_ai_video() + if success: + task_manager.remove_task(task_id) + else: + logger.warning(f"无法正常停止任务 {task_id},但仍将继续") + task_manager.mark_all_tasks_as_stopped() + except Exception as e: + logger.error(f"停止任务时出错: {e}") + # 继续执行,尝试启动新任务 + + # 解析并验证请求数据 + stream_request = StreamRequest.from_dict(request.json) + + task_id = str(uuid.uuid4()) + # 修正视频源地址 + new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5") + new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5") + # 启动YOLO检测 + try: + await asyncio.to_thread( + startAIVideo, + new_source_url, + new_push_url, + stream_request.model_path, + stream_request.detect_classes, + stream_request.confidence + ) + except Exception as e: + logger.error(f"启动AI视频处理失败: {e}") + return json_response({ + "status": "error", + "message": f"Failed to start AI video processing: {str(e)}" + }, status=500) + + # 记录任务信息 + task_manager.add_task(task_id, { + "source_url": stream_request.source_url, + "push_url": stream_request.push_url, + "model_path": stream_request.model_path, + "detect_classes": stream_request.detect_classes, + "confidence": stream_request.confidence + }) + + return json_response({ + "status": "success", + "task_id": task_id, + "message": "Detection started successfully" + }) + + except ValueError as e: + logger.error(f"Validation error: {str(e)}") + return json_response({"status": "error", "message": str(e)}, status=400) + except Exception as e: + logger.error(f"Unexpected error: {str(e)}", exc_info=True) + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +@app.post("/ai/stream/") +async def stop_detection(request, task_id: str): + try: + verify_token(request) + + # 检查任务是否存在 + try: + task_info = task_manager.get_task_info(task_id) + except NotFound: + return json_response({"status": "error", "message": "Task not found"}, status=404) + + # 停止AI视频处理,使用安全的停止方法 + success = await safe_stop_ai_video() + + # 即使停止失败,也要移除任务 + task_manager.remove_task(task_id) + + if not success: + logger.warning("虽然停止过程出现错误,但任务已被标记为结束") + return json_response({ + "status": "warning", + "message": "Task removal completed with warnings" + }) + + return json_response({ + "status": "success", + "message": "Detection stopped successfully" + }) + except NotFound as e: + return json_response({"status": "error", "message": str(e)}, status=404) + except Exception as e: + logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True) + # 尝试标记任务为停止状态 + try: + if task_id in task_manager.task_status: + task_manager.task_status[task_id] = "error_during_stop" + except: + pass + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +@app.get("/ai/stream/") +async def get_task_status(request, task_id: str): + try: + verify_token(request) + task_info = task_manager.get_task_info(task_id) + + # 检查任务是否真的在运行 + if not getIfAI() and task_info["status"] == "running": + task_info["status"] = "stopped_unexpectedly" + logger.warning(f"Task {task_id} 显示为运行状态,但实际已停止") + + return json_response({ + "status": "success", + "task_id": task_id, + **task_info + }) + except NotFound as e: + return json_response({"status": "error", "message": str(e)}, status=404) + except Exception as e: + logger.error(f"Error getting task status {task_id}: {str(e)}", exc_info=True) + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +@app.get("/ai/stream/tasks") +async def list_tasks(request): + """获取所有活动任务列表""" + try: + verify_token(request) + + # 检查所有任务的健康状态 + unhealthy_tasks = task_manager.check_tasks_health() + for task_id, status in unhealthy_tasks.items(): + if task_id in task_manager.task_status: + task_manager.task_status[task_id] = status + + tasks = { + task_id: task_manager.get_task_info(task_id) + for task_id in task_manager.active_tasks.keys() + } + return json_response({ + "status": "success", + "tasks": tasks + }) + except Exception as e: + logger.error(f"Error listing tasks: {str(e)}", exc_info=True) + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +@app.post("/ai/stream/stopTasks") +async def stop_all_detections(request): + """停止所有活动任务""" + try: + verify_token(request) + + if not task_manager.active_tasks: + return json_response({ + "status": "success", + "message": "No active tasks to stop" + }) + + # 停止所有任务 + success = await safe_stop_ai_video() + + # 无论成功与否,都移除所有任务 + for task_id in list(task_manager.active_tasks.keys()): + task_manager.remove_task(task_id) + + if not success: + return json_response({ + "status": "warning", + "message": "Tasks stopped with warnings" + }) + + return json_response({ + "status": "success", + "message": "All detections stopped successfully" + }) + except Exception as e: + logger.error(f"Error stopping all tasks: {str(e)}", exc_info=True) + # 尝试标记所有任务为停止状态 + task_manager.mark_all_tasks_as_stopped() + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +@app.get("/ai/health") +async def health_check(request): + """服务健康检查端点""" + try: + # 不需要验证token,这个接口可以用于监控系统检查服务状态 + unhealthy_tasks = task_manager.check_tasks_health() + + return json_response({ + "status": "success", + "service": "running" if service_status["is_healthy"] else "degraded", + "active_tasks": len(task_manager.active_tasks), + "unhealthy_tasks": unhealthy_tasks, + "last_error": service_status["last_error"], + "error_time": service_status["error_time"], + "timestamp": datetime.now().isoformat() + }) + except Exception as e: + logger.error(f"健康检查失败: {str(e)}", exc_info=True) + return json_response({ + "status": "error", + "service": "degraded", + "error": str(e), + "timestamp": datetime.now().isoformat() + }, status=500) + +@app.route("/ai/reset", methods=["POST"]) +async def reset_service(request): + """重置服务状态,清理所有任务和进程""" + try: + verify_token(request) + + # 尝试停止AI视频处理 + await safe_stop_ai_video() + + # 清理所有任务 + for task_id in list(task_manager.active_tasks.keys()): + task_manager.remove_task(task_id) + + # 重置服务状态 + service_status["is_healthy"] = True + service_status["last_error"] = None + service_status["error_time"] = None + + # 尝试清理可能存在的僵尸进程 + try: + import os + import signal + import psutil + + current_process = psutil.Process(os.getpid()) + zombie_count = 0 + + for child in current_process.children(recursive=True): + try: + if child.status() == psutil.STATUS_ZOMBIE: + zombie_count += 1 + child.send_signal(signal.SIGKILL) + except: + pass + + return json_response({ + "status": "success", + "message": f"Service reset successful. Cleaned {zombie_count} zombie processes." + }) + except Exception as e: + logger.error(f"清理僵尸进程时出错: {e}") + return json_response({ + "status": "warning", + "message": "Service reset with warnings" + }) + + except Exception as e: + logger.error(f"重置服务时出错: {str(e)}", exc_info=True) + return json_response({ + "status": "error", + "message": f"Failed to reset service: {str(e)}" + }, status=500) + +@app.route("/ai/stream/restart/", methods=["POST"]) +async def restart_task(request, task_id: str): + """重启指定任务""" + try: + verify_token(request) + + # 获取任务信息 + try: + task_info = task_manager.get_task_info(task_id)["task_info"] + except NotFound: + return json_response({"status": "error", "message": "Task not found"}, status=404) + + # 先停止任务 + success = await safe_stop_ai_video() + task_manager.remove_task(task_id) + + if not success: + logger.warning("停止任务出现问题,尝试继续重启") + + # 重新启动任务 + new_task_id = str(uuid.uuid4()) + + try: + await asyncio.to_thread( + startAIVideo, + task_info["source_url"], + task_info["push_url"], + task_info["model_path"], + task_info["detect_classes"], + task_info["confidence"] + ) + + # 记录新任务信息 + task_manager.add_task(new_task_id, task_info) + + return json_response({ + "status": "success", + "old_task_id": task_id, + "new_task_id": new_task_id, + "message": "Task restarted successfully" + }) + except Exception as e: + logger.error(f"重启任务失败: {e}") + return json_response({ + "status": "error", + "message": f"Failed to restart task: {str(e)}" + }, status=500) + + except Exception as e: + logger.error(f"重启任务时出错: {str(e)}", exc_info=True) + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +if __name__ == "__main__": + # 保证服务启动前没有残留任务 + try: + stopAIVideo() + print("服务启动前清理完成") + except: + print("服务启动前清理失败,但仍将继续") + + # 安装psutil库,用于进程管理 + try: + import psutil + except ImportError: + import subprocess + import sys + print("正在安装psutil库...") + subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"]) + + app.run(host="0.0.0.0", port=12315, debug=False, access_log=True)