yoooooger

This commit is contained in:
yooooger 2025-07-24 09:45:43 +08:00
parent dc3fb77ded
commit aa702653d7

View File

@ -2,7 +2,7 @@ from sanic import Sanic, json
from sanic.response import json as json_response
from sanic.exceptions import Unauthorized, NotFound, SanicException
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any, Callable
import uuid
import logging
import asyncio
@ -67,8 +67,10 @@ class TaskManager:
self.active_tasks: Dict[str, Dict[str, Any]] = {}
self.task_status: Dict[str, str] = {}
self.task_timestamps: Dict[str, datetime] = {}
def add_task(self, task_id: str, task_info: Dict[str, Any]) -> None:
self.stop_handles: Dict[str, Callable] = {} # 新增:每个任务的停止函数
self.task_callbacks: Dict[str, Callable] = {} # ✅ 添加这个属性,用于回调任务完成信息
def add_task(self, task_id: str, task_info: Dict[str, Any], stop_handle: Callable) -> None:
"""添加新任务"""
if len(self.active_tasks) >= Config.MAX_ACTIVE_TASKS:
raise ValueError("Maximum number of active tasks reached")
@ -76,6 +78,7 @@ class TaskManager:
self.active_tasks[task_id] = task_info
self.task_status[task_id] = "running"
self.task_timestamps[task_id] = datetime.now()
self.stop_handles[task_id] = stop_handle # 注册停止函数
logger.info(f"Task {task_id} started")
def remove_task(self, task_id: str) -> None:
@ -84,6 +87,7 @@ class TaskManager:
del self.active_tasks[task_id]
del self.task_status[task_id]
del self.task_timestamps[task_id]
self.stop_handles.pop(task_id, None) # 同时移除停止句柄
logger.info(f"Task {task_id} removed")
def get_task_info(self, task_id: str) -> Dict[str, Any]:
@ -96,7 +100,28 @@ class TaskManager:
"status": self.task_status[task_id],
"start_time": self.task_timestamps[task_id].isoformat()
}
def stop_task(self, task_id: str) -> bool:
"""只停止一个任务"""
if task_id not in self.active_tasks:
logger.warning(f"Task {task_id} 不存在")
return False
stop_handle = self.stop_handles.get(task_id)
if not stop_handle:
logger.warning(f"Task {task_id} 无法停止(未注册停止函数)")
return False
try:
stop_handle() # 执行停止函数(你需传入能关闭 FFmpeg 或线程的回调)
self.task_status[task_id] = "stopped"
logger.info(f"Task {task_id} 停止成功")
return True
except Exception as e:
logger.error(f"停止任务 {task_id} 出错:{e}")
self.task_status[task_id] = "error"
return False
def check_tasks_health(self) -> Dict[str, str]:
"""检查任务健康状态"""
unhealthy_tasks = {}
@ -106,13 +131,14 @@ class TaskManager:
unhealthy_tasks[task_id] = "stopped"
logger.warning(f"Task {task_id} appears to be stopped unexpectedly")
return unhealthy_tasks
def mark_all_tasks_as_stopped(self):
"""标记所有任务为已停止状态"""
for task_id in list(self.active_tasks.keys()):
self.task_status[task_id] = "stopped"
logger.warning("已将所有任务标记为停止状态")
app = Sanic("YoloStreamService")
CORS(app)
task_manager = TaskManager()
@ -176,65 +202,66 @@ def verify_userid(request) -> None:
logger.warning("userid not define or user have not power")
raise Unauthorized("Invalid userid or user have not power")
@app.post("/ai/detect")
async def detection(request):
# 检查服务健康状态
if not service_status["is_healthy"]:
logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']}{service_status['error_time']}")
# 尝试恢复服务
service_status["is_healthy"] = True
# 先停止所有现有任务
for task_id in list(task_manager.active_tasks.keys()):
logger.info(f"停止现有任务 {task_id} 以启动新任务")
try:
success = await safe_stop_ai_video()
if success:
task_manager.remove_task(task_id)
else:
logger.warning(f"无法正常停止任务 {task_id},但仍将继续")
task_manager.mark_all_tasks_as_stopped()
except Exception as e:
logger.error(f"停止任务时出错: {e}")
# 继续执行,尝试启动新任务
# 解析并验证请求数据
stream_request = StreamRequest.from_dict(request.json)
task_id = str(uuid.uuid4())
# 修正视频源地址
new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5")
new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5")
# 启动YOLO检测
# 检查服务健康状态
if not service_status["is_healthy"]:
logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']}{service_status['error_time']}")
service_status["is_healthy"] = True
# 停止所有现有任务
for task_id in list(task_manager.active_tasks.keys()):
logger.info(f"停止现有任务 {task_id} 以启动新任务")
try:
await asyncio.to_thread(
startAIVideo,
new_source_url,
new_push_url,
stream_request.model_path,
stream_request.detect_classes,
stream_request.confidence
)
stop_cb = task_manager.task_callbacks.get(task_id, safe_stop_ai_video)
success = await stop_cb()
task_manager.remove_task(task_id)
except Exception as e:
logger.error(f"启动AI视频处理失败: {e}")
return json_response({
"status": "error",
"message": f"Failed to start AI video processing: {str(e)}"
}, status=500)
# 记录任务信息
task_manager.add_task(task_id, {
logger.error(f"停止任务时出错: {e}")
task_manager.mark_all_tasks_as_stopped()
# 解析请求参数
stream_request = StreamRequest.from_dict(request.json)
task_id = str(uuid.uuid4())
# 替换地址
new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5")
new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5")
try:
await asyncio.to_thread(
startAIVideo,
new_source_url,
new_push_url,
stream_request.model_path,
stream_request.detect_classes,
stream_request.confidence
)
except Exception as e:
logger.error(f"启动AI视频处理失败: {e}")
return json_response({
"status": "error",
"message": f"Failed to start AI video processing: {str(e)}"
}, status=500)
# ✅ 注册 stop_callback如你使用的为通用函数
task_manager.add_task(
task_id,
{
"source_url": stream_request.source_url,
"push_url": stream_request.push_url,
"model_path": stream_request.model_path,
"detect_classes": stream_request.detect_classes,
"confidence": stream_request.confidence
})
return json_response({
"status": "success",
"task_id": task_id,
"message": "Detection started successfully"
})
},
stop_callback=safe_stop_ai_video # 或换成支持多任务的 callback
)
return json_response({
"status": "success",
"task_id": task_id,
"message": "Detection started successfully"
})
@app.post("/ai/stream/detect1")
async def start_detection1(request):
@ -275,23 +302,28 @@ async def stop_detection(request, task_id: str):
task_info = task_manager.get_task_info(task_id)
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
# 停止AI视频处理使用安全的停止方法
success = await safe_stop_ai_video()
# 即使停止失败,也要移除任务
# 调用 task_callbacks 中的停止函数(如果有)
stop_callback = task_manager.task_callbacks.get(task_id)
if stop_callback:
success = await stop_callback()
else:
logger.warning(f"Task {task_id} has no stop callback, using default safe_stop_ai_video")
success = await safe_stop_ai_video()
# 无论成功与否都要移除任务
task_manager.remove_task(task_id)
if not success:
logger.warning("虽然停止过程出现错误,但任务已被标记为结束")
logger.warning(f"停止任务 {task_id} 失败,但已移除任务记录")
return json_response({
"status": "warning",
"message": "Task removal completed with warnings"
})
return json_response({
"status": "success",
"message": "Detection stopped successfully"
"message": f"Detection for task {task_id} stopped successfully"
})
except NotFound as e:
return json_response({"status": "error", "message": str(e)}, status=404)
@ -305,6 +337,54 @@ async def stop_detection(request, task_id: str):
pass
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/stopTask1")
async def stopTask1(request):
try:
verify_userid(request)
try:
verify_token(request)
jsondata = await request.json()
task_id = jsondata.get("task_id")
if not task_id:
return json_response({"status": "error", "message": "task_id is required"}, status=400)
try:
task_info = task_manager.get_task_info(task_id)
logger.info(f"Stopping task: {task_id} -> {task_info}")
# 调用对应任务的停止回调
stop_callback = task_manager.task_callbacks.get(task_id)
if stop_callback:
success = await stop_callback()
else:
logger.warning(f"No stop callback found for task {task_id}")
success = False
task_manager.remove_task(task_id)
if not success:
return json_response({
"status": "warning",
"message": "Task removal completed, but stop failed"
})
return json_response({
"status": "success",
"message": "Task stopped successfully"
})
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
except Exception as e:
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
try:
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = "error_during_stop"
except:
pass
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
except:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.get("/ai/stream/<task_id>")
async def get_task_status(request, task_id: str):
try: