AI_python_yoooger/ai2/yolo_api.py
2025-08-01 14:50:49 +08:00

617 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sanic import Sanic, json
from sanic.response import json as json_response
from sanic.exceptions import Unauthorized, NotFound, SanicException
from dataclasses import dataclass
from typing import List, Dict, Any, Callable
import uuid
import logging
import asyncio
import traceback
from datetime import datetime
from cv_video import startAIVideo,stopAIVideo,getIfAI
from sanic_cors import CORS
from sqlhelp import get_user_power
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 服务状态标志
service_status = {"is_healthy": True, "last_error": None, "error_time": None}
# 配置类
class Config:
VALID_TOKEN = "5e8899fe-dc74-4280-8169-2f4d185f3afa"
MAX_ACTIVE_TASKS = 10
DEFAULT_CONFIDENCE = 0.5
RESTART_DELAY = 2 # 服务尝试自动恢复前的延迟(秒)
@dataclass
class StreamRequest:
source_url: str
push_url: str
model_path: str
detect_classes: List[str]
confidence: float = Config.DEFAULT_CONFIDENCE
def validate(self) -> None:
"""验证请求参数"""
if not self.source_url or not self.push_url:
raise ValueError("Source URL and Push URL are required")
if not self.detect_classes:
raise ValueError("At least one detection class must be specified")
if not 0 < self.confidence < 1:
raise ValueError("Confidence must be between 0 and 1")
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'StreamRequest':
try:
instance = cls(
source_url=data['source_url'],
push_url=data['push_url'],
model_path=data['model_path'],
detect_classes=data['detect_classes'],
confidence=data.get('confidence', Config.DEFAULT_CONFIDENCE)
)
instance.validate()
return instance
except KeyError as e:
raise ValueError(f"Missing required field: {str(e)}")
class TaskManager:
def __init__(self):
self.active_tasks: Dict[str, Dict[str, Any]] = {}
self.task_status: Dict[str, str] = {}
self.task_timestamps: Dict[str, datetime] = {}
self.stop_handles: Dict[str, Callable] = {} # 新增:每个任务的停止函数
self.task_callbacks: Dict[str, Callable] = {} # ✅ 添加这个属性,用于回调任务完成信息
def add_task(self, task_id: str, task_info: Dict[str, Any], stop_handle: Callable) -> None:
"""添加新任务"""
if len(self.active_tasks) >= Config.MAX_ACTIVE_TASKS:
raise ValueError("Maximum number of active tasks reached")
self.active_tasks[task_id] = task_info
self.task_status[task_id] = "running"
self.task_timestamps[task_id] = datetime.now()
self.stop_handles[task_id] = stop_handle # 注册停止函数
logger.info(f"Task {task_id} started")
def remove_task(self, task_id: str) -> None:
"""移除任务"""
if task_id in self.active_tasks:
del self.active_tasks[task_id]
del self.task_status[task_id]
del self.task_timestamps[task_id]
self.stop_handles.pop(task_id, None) # 同时移除停止句柄
logger.info(f"Task {task_id} removed")
def get_task_info(self, task_id: str) -> Dict[str, Any]:
"""获取任务信息"""
if task_id not in self.active_tasks:
raise NotFound("Task not found")
return {
"task_info": self.active_tasks[task_id],
"status": self.task_status[task_id],
"start_time": self.task_timestamps[task_id].isoformat()
}
def stop_task(self, task_id: str) -> bool:
"""只停止一个任务"""
if task_id not in self.active_tasks:
logger.warning(f"Task {task_id} 不存在")
return False
stop_handle = self.stop_handles.get(task_id)
if not stop_handle:
logger.warning(f"Task {task_id} 无法停止(未注册停止函数)")
return False
try:
stop_handle() # 执行停止函数(你需传入能关闭 FFmpeg 或线程的回调)
self.task_status[task_id] = "stopped"
logger.info(f"Task {task_id} 停止成功")
return True
except Exception as e:
logger.error(f"停止任务 {task_id} 出错:{e}")
self.task_status[task_id] = "error"
return False
def check_tasks_health(self) -> Dict[str, str]:
"""检查任务健康状态"""
unhealthy_tasks = {}
for task_id in list(self.active_tasks.keys()):
# 检查任务是否还在运行通过getIfAI()函数)
if not getIfAI():
unhealthy_tasks[task_id] = "stopped"
logger.warning(f"Task {task_id} appears to be stopped unexpectedly")
return unhealthy_tasks
def mark_all_tasks_as_stopped(self):
"""标记所有任务为已停止状态"""
for task_id in list(self.active_tasks.keys()):
self.task_status[task_id] = "stopped"
logger.warning("已将所有任务标记为停止状态")
app = Sanic("YoloStreamServiceOut")
CORS(app)
task_manager = TaskManager()
async def safe_stop_ai_video():
"""安全地停止AI视频处理带有错误处理和恢复机制"""
try:
await asyncio.to_thread(stopAIVideo)
return True
except Exception as e:
error_msg = f"停止AI视频处理出错: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
# 标记服务状态为不健康
service_status["is_healthy"] = False
service_status["last_error"] = str(e)
service_status["error_time"] = datetime.now().isoformat()
# 强制结束所有任务
task_manager.mark_all_tasks_as_stopped()
# 尝试通过其他方式杀死可能存在的进程
try:
import os
import signal
import psutil
current_process = psutil.Process(os.getpid())
# 查找并终止ffmpeg子进程
for child in current_process.children(recursive=True):
try:
child_name = child.name().lower()
if 'ffmpeg' in child_name:
logger.info(f"强制终止子进程: {child.pid} ({child_name})")
child.send_signal(signal.SIGTERM)
except Exception as child_e:
logger.error(f"终止子进程出错: {str(child_e)}")
except Exception as kill_e:
logger.error(f"尝试清理进程时出错: {str(kill_e)}")
# 等待一段时间让系统恢复
await asyncio.sleep(Config.RESTART_DELAY)
# 重置服务状态
service_status["is_healthy"] = True
return False
def verify_token(request) -> None:
"""验证请求token"""
token = request.headers.get('X-API-Token')
if not token or token != Config.VALID_TOKEN:
logger.warning("Invalid token attempt")
raise Unauthorized("Invalid token")
def verify_userid(request) -> None:
"""验证请求userid"""
# 解析并验证请求数据
stream_request = StreamRequest.from_dict(request.json)
userid = stream_request.user_id
if not userid or get_user_power(userid,"config") < 1:
logger.warning("userid not define or user have not power")
raise Unauthorized("Invalid userid or user have not power")
@app.post("/ai/detect")
async def detection(request):
# 检查服务健康状态
if not service_status["is_healthy"]:
logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']}{service_status['error_time']}")
service_status["is_healthy"] = True
# 停止所有现有任务
for task_id in list(task_manager.active_tasks.keys()):
logger.info(f"停止现有任务 {task_id} 以启动新任务")
try:
stop_cb = task_manager.task_callbacks.get(task_id, safe_stop_ai_video)
success = await stop_cb()
task_manager.remove_task(task_id)
except Exception as e:
logger.error(f"停止任务时出错: {e}")
task_manager.mark_all_tasks_as_stopped()
# 解析请求参数
stream_request = StreamRequest.from_dict(request.json)
task_id = str(uuid.uuid4())
# 替换地址
new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5")
new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5")
try:
await asyncio.to_thread(
startAIVideo,
new_source_url,
new_push_url,
stream_request.model_path,
stream_request.detect_classes,
stream_request.confidence
)
except Exception as e:
logger.error(f"启动AI视频处理失败: {e}")
return json_response({
"status": "error",
"message": f"Failed to start AI video processing: {str(e)}"
}, status=500)
# ✅ 注册 stop_callback如你使用的为通用函数
task_manager.add_task(
task_id,
{
"source_url": stream_request.source_url,
"push_url": stream_request.push_url,
"model_path": stream_request.model_path,
"detect_classes": stream_request.detect_classes,
"confidence": stream_request.confidence
},
stop_callback=safe_stop_ai_video # 或换成支持多任务的 callback
)
return json_response({
"status": "success",
"task_id": task_id,
"message": "Detection started successfully"
})
@app.post("/ai/stream/detect1")
async def start_detection1(request):
try:
verify_userid(request)
try:
verify_token(request)
detection(request)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
return json_response({"status": "error", "message": str(e)}, status=400)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/detect")
async def start_detection(request):
try:
verify_token(request)
detection(request)
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
return json_response({"status": "error", "message": str(e)}, status=400)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/<task_id>")
async def stop_detection(request, task_id: str):
try:
verify_token(request)
# 检查任务是否存在
try:
task_info = task_manager.get_task_info(task_id)
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
# 调用 task_callbacks 中的停止函数(如果有)
stop_callback = task_manager.task_callbacks.get(task_id)
if stop_callback:
success = await stop_callback()
else:
logger.warning(f"Task {task_id} has no stop callback, using default safe_stop_ai_video")
success = await safe_stop_ai_video()
# 无论成功与否都要移除任务
task_manager.remove_task(task_id)
if not success:
logger.warning(f"停止任务 {task_id} 失败,但已移除任务记录")
return json_response({
"status": "warning",
"message": "Task removal completed with warnings"
})
return json_response({
"status": "success",
"message": f"Detection for task {task_id} stopped successfully"
})
except NotFound as e:
return json_response({"status": "error", "message": str(e)}, status=404)
except Exception as e:
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
# 尝试标记任务为停止状态
try:
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = "error_during_stop"
except:
pass
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/stopTask1")
async def stopTask1(request):
try:
verify_userid(request)
try:
verify_token(request)
jsondata = await request.json()
task_id = jsondata.get("task_id")
if not task_id:
return json_response({"status": "error", "message": "task_id is required"}, status=400)
try:
task_info = task_manager.get_task_info(task_id)
logger.info(f"Stopping task: {task_id} -> {task_info}")
# 调用对应任务的停止回调
stop_callback = task_manager.task_callbacks.get(task_id)
if stop_callback:
success = await stop_callback()
else:
logger.warning(f"No stop callback found for task {task_id}")
success = False
task_manager.remove_task(task_id)
if not success:
return json_response({
"status": "warning",
"message": "Task removal completed, but stop failed"
})
return json_response({
"status": "success",
"message": "Task stopped successfully"
})
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
except Exception as e:
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
try:
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = "error_during_stop"
except:
pass
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
except:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.get("/ai/stream/<task_id>")
async def get_task_status(request, task_id: str):
try:
verify_token(request)
task_info = task_manager.get_task_info(task_id)
# 检查任务是否真的在运行
if not getIfAI() and task_info["status"] == "running":
task_info["status"] = "stopped_unexpectedly"
logger.warning(f"Task {task_id} 显示为运行状态,但实际已停止")
return json_response({
"status": "success",
"task_id": task_id,
**task_info
})
except NotFound as e:
return json_response({"status": "error", "message": str(e)}, status=404)
except Exception as e:
logger.error(f"Error getting task status {task_id}: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.get("/ai/stream/tasks")
async def list_tasks(request):
"""获取所有活动任务列表"""
try:
verify_token(request)
# 检查所有任务的健康状态
unhealthy_tasks = task_manager.check_tasks_health()
for task_id, status in unhealthy_tasks.items():
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = status
tasks = {
task_id: task_manager.get_task_info(task_id)
for task_id in task_manager.active_tasks.keys()
}
return json_response({
"status": "success",
"tasks": tasks
})
except Exception as e:
logger.error(f"Error listing tasks: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/stopTasks")
async def stop_all_detections(request):
"""停止所有活动任务"""
try:
verify_token(request)
if not task_manager.active_tasks:
return json_response({
"status": "success",
"message": "No active tasks to stop"
})
# 停止所有任务
success = await safe_stop_ai_video()
# 无论成功与否,都移除所有任务
for task_id in list(task_manager.active_tasks.keys()):
task_manager.remove_task(task_id)
if not success:
return json_response({
"status": "warning",
"message": "Tasks stopped with warnings"
})
return json_response({
"status": "success",
"message": "All detections stopped successfully"
})
except Exception as e:
logger.error(f"Error stopping all tasks: {str(e)}", exc_info=True)
# 尝试标记所有任务为停止状态
task_manager.mark_all_tasks_as_stopped()
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.get("/ai/health")
async def health_check(request):
"""服务健康检查端点"""
try:
# 不需要验证token这个接口可以用于监控系统检查服务状态
unhealthy_tasks = task_manager.check_tasks_health()
return json_response({
"status": "success",
"service": "running" if service_status["is_healthy"] else "degraded",
"active_tasks": len(task_manager.active_tasks),
"unhealthy_tasks": unhealthy_tasks,
"last_error": service_status["last_error"],
"error_time": service_status["error_time"],
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"健康检查失败: {str(e)}", exc_info=True)
return json_response({
"status": "error",
"service": "degraded",
"error": str(e),
"timestamp": datetime.now().isoformat()
}, status=500)
@app.route("/ai/reset", methods=["POST"])
async def reset_service(request):
"""重置服务状态,清理所有任务和进程"""
try:
verify_token(request)
# 尝试停止AI视频处理
await safe_stop_ai_video()
# 清理所有任务
for task_id in list(task_manager.active_tasks.keys()):
task_manager.remove_task(task_id)
# 重置服务状态
service_status["is_healthy"] = True
service_status["last_error"] = None
service_status["error_time"] = None
# 尝试清理可能存在的僵尸进程
try:
import os
import signal
import psutil
current_process = psutil.Process(os.getpid())
zombie_count = 0
for child in current_process.children(recursive=True):
try:
if child.status() == psutil.STATUS_ZOMBIE:
zombie_count += 1
child.send_signal(signal.SIGKILL)
except:
pass
return json_response({
"status": "success",
"message": f"Service reset successful. Cleaned {zombie_count} zombie processes."
})
except Exception as e:
logger.error(f"清理僵尸进程时出错: {e}")
return json_response({
"status": "warning",
"message": "Service reset with warnings"
})
except Exception as e:
logger.error(f"重置服务时出错: {str(e)}", exc_info=True)
return json_response({
"status": "error",
"message": f"Failed to reset service: {str(e)}"
}, status=500)
@app.route("/ai/stream/restart/<task_id>", methods=["POST"])
async def restart_task(request, task_id: str):
"""重启指定任务"""
try:
verify_token(request)
# 获取任务信息
try:
task_info = task_manager.get_task_info(task_id)["task_info"]
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
# 先停止任务
success = await safe_stop_ai_video()
task_manager.remove_task(task_id)
if not success:
logger.warning("停止任务出现问题,尝试继续重启")
# 重新启动任务
new_task_id = str(uuid.uuid4())
try:
await asyncio.to_thread(
startAIVideo,
task_info["source_url"],
task_info["push_url"],
task_info["model_path"],
task_info["detect_classes"],
task_info["confidence"]
)
# 记录新任务信息
task_manager.add_task(new_task_id, task_info)
return json_response({
"status": "success",
"old_task_id": task_id,
"new_task_id": new_task_id,
"message": "Task restarted successfully"
})
except Exception as e:
logger.error(f"重启任务失败: {e}")
return json_response({
"status": "error",
"message": f"Failed to restart task: {str(e)}"
}, status=500)
except Exception as e:
logger.error(f"重启任务时出错: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
if __name__ == "__main__":
# 保证服务启动前没有残留任务
try:
stopAIVideo()
print("服务启动前清理完成")
except:
print("服务启动前清理失败,但仍将继续")
# 安装psutil库用于进程管理
try:
import psutil
except ImportError:
import subprocess
import sys
print("正在安装psutil库...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
app.run(host="0.0.0.0", port=12317, debug=False, access_log=True)