From aa702653d7b83010c06db50dbe6140afebb7238f Mon Sep 17 00:00:00 2001 From: yooooger <761181201@qq.com> Date: Thu, 24 Jul 2025 09:45:43 +0800 Subject: [PATCH] yoooooger --- ai2/yolo_api.py | 210 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 145 insertions(+), 65 deletions(-) diff --git a/ai2/yolo_api.py b/ai2/yolo_api.py index 6963da2..93d5643 100644 --- a/ai2/yolo_api.py +++ b/ai2/yolo_api.py @@ -2,7 +2,7 @@ from sanic import Sanic, json from sanic.response import json as json_response from sanic.exceptions import Unauthorized, NotFound, SanicException from dataclasses import dataclass -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Callable import uuid import logging import asyncio @@ -67,8 +67,10 @@ class TaskManager: self.active_tasks: Dict[str, Dict[str, Any]] = {} self.task_status: Dict[str, str] = {} self.task_timestamps: Dict[str, datetime] = {} - - def add_task(self, task_id: str, task_info: Dict[str, Any]) -> None: + self.stop_handles: Dict[str, Callable] = {} # 新增:每个任务的停止函数 + self.task_callbacks: Dict[str, Callable] = {} # ✅ 添加这个属性,用于回调任务完成信息 + + def add_task(self, task_id: str, task_info: Dict[str, Any], stop_handle: Callable) -> None: """添加新任务""" if len(self.active_tasks) >= Config.MAX_ACTIVE_TASKS: raise ValueError("Maximum number of active tasks reached") @@ -76,6 +78,7 @@ class TaskManager: self.active_tasks[task_id] = task_info self.task_status[task_id] = "running" self.task_timestamps[task_id] = datetime.now() + self.stop_handles[task_id] = stop_handle # 注册停止函数 logger.info(f"Task {task_id} started") def remove_task(self, task_id: str) -> None: @@ -84,6 +87,7 @@ class TaskManager: del self.active_tasks[task_id] del self.task_status[task_id] del self.task_timestamps[task_id] + self.stop_handles.pop(task_id, None) # 同时移除停止句柄 logger.info(f"Task {task_id} removed") def get_task_info(self, task_id: str) -> Dict[str, Any]: @@ -96,7 +100,28 @@ class TaskManager: "status": self.task_status[task_id], "start_time": self.task_timestamps[task_id].isoformat() } - + + def stop_task(self, task_id: str) -> bool: + """只停止一个任务""" + if task_id not in self.active_tasks: + logger.warning(f"Task {task_id} 不存在") + return False + + stop_handle = self.stop_handles.get(task_id) + if not stop_handle: + logger.warning(f"Task {task_id} 无法停止(未注册停止函数)") + return False + + try: + stop_handle() # 执行停止函数(你需传入能关闭 FFmpeg 或线程的回调) + self.task_status[task_id] = "stopped" + logger.info(f"Task {task_id} 停止成功") + return True + except Exception as e: + logger.error(f"停止任务 {task_id} 出错:{e}") + self.task_status[task_id] = "error" + return False + def check_tasks_health(self) -> Dict[str, str]: """检查任务健康状态""" unhealthy_tasks = {} @@ -106,13 +131,14 @@ class TaskManager: unhealthy_tasks[task_id] = "stopped" logger.warning(f"Task {task_id} appears to be stopped unexpectedly") return unhealthy_tasks - + def mark_all_tasks_as_stopped(self): """标记所有任务为已停止状态""" for task_id in list(self.active_tasks.keys()): self.task_status[task_id] = "stopped" logger.warning("已将所有任务标记为停止状态") + app = Sanic("YoloStreamService") CORS(app) task_manager = TaskManager() @@ -176,65 +202,66 @@ def verify_userid(request) -> None: logger.warning("userid not define or user have not power") raise Unauthorized("Invalid userid or user have not power") +@app.post("/ai/detect") async def detection(request): - # 检查服务健康状态 - if not service_status["is_healthy"]: - logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']} 于 {service_status['error_time']}") - # 尝试恢复服务 - service_status["is_healthy"] = True - - # 先停止所有现有任务 - for task_id in list(task_manager.active_tasks.keys()): - logger.info(f"停止现有任务 {task_id} 以启动新任务") - try: - success = await safe_stop_ai_video() - if success: - task_manager.remove_task(task_id) - else: - logger.warning(f"无法正常停止任务 {task_id},但仍将继续") - task_manager.mark_all_tasks_as_stopped() - except Exception as e: - logger.error(f"停止任务时出错: {e}") - # 继续执行,尝试启动新任务 - - # 解析并验证请求数据 - stream_request = StreamRequest.from_dict(request.json) - - task_id = str(uuid.uuid4()) - # 修正视频源地址 - new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5") - new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5") - # 启动YOLO检测 + # 检查服务健康状态 + if not service_status["is_healthy"]: + logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']} 于 {service_status['error_time']}") + service_status["is_healthy"] = True + + # 停止所有现有任务 + for task_id in list(task_manager.active_tasks.keys()): + logger.info(f"停止现有任务 {task_id} 以启动新任务") try: - await asyncio.to_thread( - startAIVideo, - new_source_url, - new_push_url, - stream_request.model_path, - stream_request.detect_classes, - stream_request.confidence - ) + stop_cb = task_manager.task_callbacks.get(task_id, safe_stop_ai_video) + success = await stop_cb() + task_manager.remove_task(task_id) except Exception as e: - logger.error(f"启动AI视频处理失败: {e}") - return json_response({ - "status": "error", - "message": f"Failed to start AI video processing: {str(e)}" - }, status=500) - - # 记录任务信息 - task_manager.add_task(task_id, { + logger.error(f"停止任务时出错: {e}") + task_manager.mark_all_tasks_as_stopped() + + # 解析请求参数 + stream_request = StreamRequest.from_dict(request.json) + task_id = str(uuid.uuid4()) + + # 替换地址 + new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5") + new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5") + + try: + await asyncio.to_thread( + startAIVideo, + new_source_url, + new_push_url, + stream_request.model_path, + stream_request.detect_classes, + stream_request.confidence + ) + except Exception as e: + logger.error(f"启动AI视频处理失败: {e}") + return json_response({ + "status": "error", + "message": f"Failed to start AI video processing: {str(e)}" + }, status=500) + + # ✅ 注册 stop_callback(如你使用的为通用函数) + task_manager.add_task( + task_id, + { "source_url": stream_request.source_url, "push_url": stream_request.push_url, "model_path": stream_request.model_path, "detect_classes": stream_request.detect_classes, "confidence": stream_request.confidence - }) - - return json_response({ - "status": "success", - "task_id": task_id, - "message": "Detection started successfully" - }) + }, + stop_callback=safe_stop_ai_video # 或换成支持多任务的 callback + ) + + return json_response({ + "status": "success", + "task_id": task_id, + "message": "Detection started successfully" + }) @app.post("/ai/stream/detect1") async def start_detection1(request): @@ -275,23 +302,28 @@ async def stop_detection(request, task_id: str): task_info = task_manager.get_task_info(task_id) except NotFound: return json_response({"status": "error", "message": "Task not found"}, status=404) - - # 停止AI视频处理,使用安全的停止方法 - success = await safe_stop_ai_video() - - # 即使停止失败,也要移除任务 + + # 调用 task_callbacks 中的停止函数(如果有) + stop_callback = task_manager.task_callbacks.get(task_id) + if stop_callback: + success = await stop_callback() + else: + logger.warning(f"Task {task_id} has no stop callback, using default safe_stop_ai_video") + success = await safe_stop_ai_video() + + # 无论成功与否都要移除任务 task_manager.remove_task(task_id) - + if not success: - logger.warning("虽然停止过程出现错误,但任务已被标记为结束") + logger.warning(f"停止任务 {task_id} 失败,但已移除任务记录") return json_response({ "status": "warning", "message": "Task removal completed with warnings" }) - + return json_response({ "status": "success", - "message": "Detection stopped successfully" + "message": f"Detection for task {task_id} stopped successfully" }) except NotFound as e: return json_response({"status": "error", "message": str(e)}, status=404) @@ -305,6 +337,54 @@ async def stop_detection(request, task_id: str): pass return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + +@app.post("/ai/stream/stopTask1") +async def stopTask1(request): + try: + verify_userid(request) + try: + verify_token(request) + jsondata = await request.json() + task_id = jsondata.get("task_id") + if not task_id: + return json_response({"status": "error", "message": "task_id is required"}, status=400) + + try: + task_info = task_manager.get_task_info(task_id) + logger.info(f"Stopping task: {task_id} -> {task_info}") + # 调用对应任务的停止回调 + stop_callback = task_manager.task_callbacks.get(task_id) + if stop_callback: + success = await stop_callback() + else: + logger.warning(f"No stop callback found for task {task_id}") + success = False + + task_manager.remove_task(task_id) + + if not success: + return json_response({ + "status": "warning", + "message": "Task removal completed, but stop failed" + }) + return json_response({ + "status": "success", + "message": "Task stopped successfully" + }) + except NotFound: + return json_response({"status": "error", "message": "Task not found"}, status=404) + except Exception as e: + logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True) + try: + if task_id in task_manager.task_status: + task_manager.task_status[task_id] = "error_during_stop" + except: + pass + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + except: + logger.error(f"Unexpected error: {str(e)}", exc_info=True) + return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500) + @app.get("/ai/stream/") async def get_task_status(request, task_id: str): try: