AI_python_yoooger/ai2/yolo_api.py

617 lines
23 KiB
Python
Raw Permalink Normal View History

2025-07-09 15:34:23 +08:00
from sanic import Sanic, json
from sanic.response import json as json_response
from sanic.exceptions import Unauthorized, NotFound, SanicException
from dataclasses import dataclass
2025-07-24 09:45:43 +08:00
from typing import List, Dict, Any, Callable
2025-07-09 15:34:23 +08:00
import uuid
import logging
import asyncio
import traceback
from datetime import datetime
from cv_video import startAIVideo,stopAIVideo,getIfAI
from sanic_cors import CORS
2025-07-23 17:09:03 +08:00
from sqlhelp import get_user_power
2025-07-09 15:34:23 +08:00
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 服务状态标志
service_status = {"is_healthy": True, "last_error": None, "error_time": None}
# 配置类
class Config:
VALID_TOKEN = "5e8899fe-dc74-4280-8169-2f4d185f3afa"
MAX_ACTIVE_TASKS = 10
DEFAULT_CONFIDENCE = 0.5
RESTART_DELAY = 2 # 服务尝试自动恢复前的延迟(秒)
@dataclass
class StreamRequest:
source_url: str
push_url: str
model_path: str
detect_classes: List[str]
confidence: float = Config.DEFAULT_CONFIDENCE
def validate(self) -> None:
"""验证请求参数"""
if not self.source_url or not self.push_url:
raise ValueError("Source URL and Push URL are required")
if not self.detect_classes:
raise ValueError("At least one detection class must be specified")
if not 0 < self.confidence < 1:
raise ValueError("Confidence must be between 0 and 1")
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'StreamRequest':
try:
instance = cls(
source_url=data['source_url'],
push_url=data['push_url'],
model_path=data['model_path'],
detect_classes=data['detect_classes'],
confidence=data.get('confidence', Config.DEFAULT_CONFIDENCE)
)
instance.validate()
return instance
except KeyError as e:
raise ValueError(f"Missing required field: {str(e)}")
class TaskManager:
def __init__(self):
self.active_tasks: Dict[str, Dict[str, Any]] = {}
self.task_status: Dict[str, str] = {}
self.task_timestamps: Dict[str, datetime] = {}
2025-07-24 09:45:43 +08:00
self.stop_handles: Dict[str, Callable] = {} # 新增:每个任务的停止函数
self.task_callbacks: Dict[str, Callable] = {} # ✅ 添加这个属性,用于回调任务完成信息
def add_task(self, task_id: str, task_info: Dict[str, Any], stop_handle: Callable) -> None:
2025-07-09 15:34:23 +08:00
"""添加新任务"""
if len(self.active_tasks) >= Config.MAX_ACTIVE_TASKS:
raise ValueError("Maximum number of active tasks reached")
self.active_tasks[task_id] = task_info
self.task_status[task_id] = "running"
self.task_timestamps[task_id] = datetime.now()
2025-07-24 09:45:43 +08:00
self.stop_handles[task_id] = stop_handle # 注册停止函数
2025-07-09 15:34:23 +08:00
logger.info(f"Task {task_id} started")
def remove_task(self, task_id: str) -> None:
"""移除任务"""
if task_id in self.active_tasks:
del self.active_tasks[task_id]
del self.task_status[task_id]
del self.task_timestamps[task_id]
2025-07-24 09:45:43 +08:00
self.stop_handles.pop(task_id, None) # 同时移除停止句柄
2025-07-09 15:34:23 +08:00
logger.info(f"Task {task_id} removed")
def get_task_info(self, task_id: str) -> Dict[str, Any]:
"""获取任务信息"""
if task_id not in self.active_tasks:
raise NotFound("Task not found")
return {
"task_info": self.active_tasks[task_id],
"status": self.task_status[task_id],
"start_time": self.task_timestamps[task_id].isoformat()
}
2025-07-24 09:45:43 +08:00
def stop_task(self, task_id: str) -> bool:
"""只停止一个任务"""
if task_id not in self.active_tasks:
logger.warning(f"Task {task_id} 不存在")
return False
stop_handle = self.stop_handles.get(task_id)
if not stop_handle:
logger.warning(f"Task {task_id} 无法停止(未注册停止函数)")
return False
try:
stop_handle() # 执行停止函数(你需传入能关闭 FFmpeg 或线程的回调)
self.task_status[task_id] = "stopped"
logger.info(f"Task {task_id} 停止成功")
return True
except Exception as e:
logger.error(f"停止任务 {task_id} 出错:{e}")
self.task_status[task_id] = "error"
return False
2025-07-09 15:34:23 +08:00
def check_tasks_health(self) -> Dict[str, str]:
"""检查任务健康状态"""
unhealthy_tasks = {}
for task_id in list(self.active_tasks.keys()):
# 检查任务是否还在运行通过getIfAI()函数)
if not getIfAI():
unhealthy_tasks[task_id] = "stopped"
logger.warning(f"Task {task_id} appears to be stopped unexpectedly")
return unhealthy_tasks
2025-07-24 09:45:43 +08:00
2025-07-09 15:34:23 +08:00
def mark_all_tasks_as_stopped(self):
"""标记所有任务为已停止状态"""
for task_id in list(self.active_tasks.keys()):
self.task_status[task_id] = "stopped"
logger.warning("已将所有任务标记为停止状态")
2025-07-24 09:45:43 +08:00
2025-08-01 14:50:49 +08:00
app = Sanic("YoloStreamServiceOut")
2025-07-09 15:34:23 +08:00
CORS(app)
task_manager = TaskManager()
async def safe_stop_ai_video():
"""安全地停止AI视频处理带有错误处理和恢复机制"""
try:
await asyncio.to_thread(stopAIVideo)
return True
except Exception as e:
error_msg = f"停止AI视频处理出错: {str(e)}\n{traceback.format_exc()}"
logger.error(error_msg)
# 标记服务状态为不健康
service_status["is_healthy"] = False
service_status["last_error"] = str(e)
service_status["error_time"] = datetime.now().isoformat()
# 强制结束所有任务
task_manager.mark_all_tasks_as_stopped()
# 尝试通过其他方式杀死可能存在的进程
try:
import os
import signal
import psutil
current_process = psutil.Process(os.getpid())
# 查找并终止ffmpeg子进程
for child in current_process.children(recursive=True):
try:
child_name = child.name().lower()
if 'ffmpeg' in child_name:
logger.info(f"强制终止子进程: {child.pid} ({child_name})")
child.send_signal(signal.SIGTERM)
except Exception as child_e:
logger.error(f"终止子进程出错: {str(child_e)}")
except Exception as kill_e:
logger.error(f"尝试清理进程时出错: {str(kill_e)}")
# 等待一段时间让系统恢复
await asyncio.sleep(Config.RESTART_DELAY)
# 重置服务状态
service_status["is_healthy"] = True
return False
def verify_token(request) -> None:
"""验证请求token"""
token = request.headers.get('X-API-Token')
if not token or token != Config.VALID_TOKEN:
logger.warning("Invalid token attempt")
raise Unauthorized("Invalid token")
2025-07-23 17:09:03 +08:00
def verify_userid(request) -> None:
"""验证请求userid"""
# 解析并验证请求数据
stream_request = StreamRequest.from_dict(request.json)
userid = stream_request.user_id
if not userid or get_user_power(userid,"config") < 1:
logger.warning("userid not define or user have not power")
raise Unauthorized("Invalid userid or user have not power")
2025-07-24 09:45:43 +08:00
@app.post("/ai/detect")
2025-07-23 17:09:03 +08:00
async def detection(request):
2025-07-24 09:45:43 +08:00
# 检查服务健康状态
if not service_status["is_healthy"]:
logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']}{service_status['error_time']}")
service_status["is_healthy"] = True
# 停止所有现有任务
for task_id in list(task_manager.active_tasks.keys()):
logger.info(f"停止现有任务 {task_id} 以启动新任务")
2025-07-09 15:34:23 +08:00
try:
2025-07-24 09:45:43 +08:00
stop_cb = task_manager.task_callbacks.get(task_id, safe_stop_ai_video)
success = await stop_cb()
task_manager.remove_task(task_id)
2025-07-09 15:34:23 +08:00
except Exception as e:
2025-07-24 09:45:43 +08:00
logger.error(f"停止任务时出错: {e}")
task_manager.mark_all_tasks_as_stopped()
# 解析请求参数
stream_request = StreamRequest.from_dict(request.json)
task_id = str(uuid.uuid4())
# 替换地址
new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5")
new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5")
try:
await asyncio.to_thread(
startAIVideo,
new_source_url,
new_push_url,
stream_request.model_path,
stream_request.detect_classes,
stream_request.confidence
)
except Exception as e:
logger.error(f"启动AI视频处理失败: {e}")
return json_response({
"status": "error",
"message": f"Failed to start AI video processing: {str(e)}"
}, status=500)
# ✅ 注册 stop_callback如你使用的为通用函数
task_manager.add_task(
task_id,
{
2025-07-09 15:34:23 +08:00
"source_url": stream_request.source_url,
"push_url": stream_request.push_url,
"model_path": stream_request.model_path,
"detect_classes": stream_request.detect_classes,
"confidence": stream_request.confidence
2025-07-24 09:45:43 +08:00
},
stop_callback=safe_stop_ai_video # 或换成支持多任务的 callback
)
return json_response({
"status": "success",
"task_id": task_id,
"message": "Detection started successfully"
})
2025-07-23 17:09:03 +08:00
@app.post("/ai/stream/detect1")
async def start_detection1(request):
try:
verify_userid(request)
try:
verify_token(request)
detection(request)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
return json_response({"status": "error", "message": str(e)}, status=400)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/detect")
async def start_detection(request):
try:
verify_token(request)
detection(request)
2025-07-09 15:34:23 +08:00
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
return json_response({"status": "error", "message": str(e)}, status=400)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/<task_id>")
async def stop_detection(request, task_id: str):
try:
verify_token(request)
# 检查任务是否存在
try:
task_info = task_manager.get_task_info(task_id)
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
2025-07-24 09:45:43 +08:00
# 调用 task_callbacks 中的停止函数(如果有)
stop_callback = task_manager.task_callbacks.get(task_id)
if stop_callback:
success = await stop_callback()
else:
logger.warning(f"Task {task_id} has no stop callback, using default safe_stop_ai_video")
success = await safe_stop_ai_video()
# 无论成功与否都要移除任务
2025-07-09 15:34:23 +08:00
task_manager.remove_task(task_id)
2025-07-24 09:45:43 +08:00
2025-07-09 15:34:23 +08:00
if not success:
2025-07-24 09:45:43 +08:00
logger.warning(f"停止任务 {task_id} 失败,但已移除任务记录")
2025-07-09 15:34:23 +08:00
return json_response({
"status": "warning",
"message": "Task removal completed with warnings"
})
2025-07-24 09:45:43 +08:00
2025-07-09 15:34:23 +08:00
return json_response({
"status": "success",
2025-07-24 09:45:43 +08:00
"message": f"Detection for task {task_id} stopped successfully"
2025-07-09 15:34:23 +08:00
})
except NotFound as e:
return json_response({"status": "error", "message": str(e)}, status=404)
except Exception as e:
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
# 尝试标记任务为停止状态
try:
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = "error_during_stop"
except:
pass
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
2025-07-24 09:45:43 +08:00
@app.post("/ai/stream/stopTask1")
async def stopTask1(request):
try:
verify_userid(request)
try:
verify_token(request)
jsondata = await request.json()
task_id = jsondata.get("task_id")
if not task_id:
return json_response({"status": "error", "message": "task_id is required"}, status=400)
try:
task_info = task_manager.get_task_info(task_id)
logger.info(f"Stopping task: {task_id} -> {task_info}")
# 调用对应任务的停止回调
stop_callback = task_manager.task_callbacks.get(task_id)
if stop_callback:
success = await stop_callback()
else:
logger.warning(f"No stop callback found for task {task_id}")
success = False
task_manager.remove_task(task_id)
if not success:
return json_response({
"status": "warning",
"message": "Task removal completed, but stop failed"
})
return json_response({
"status": "success",
"message": "Task stopped successfully"
})
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
except Exception as e:
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
try:
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = "error_during_stop"
except:
pass
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
except:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
2025-07-09 15:34:23 +08:00
@app.get("/ai/stream/<task_id>")
async def get_task_status(request, task_id: str):
try:
verify_token(request)
task_info = task_manager.get_task_info(task_id)
# 检查任务是否真的在运行
if not getIfAI() and task_info["status"] == "running":
task_info["status"] = "stopped_unexpectedly"
logger.warning(f"Task {task_id} 显示为运行状态,但实际已停止")
return json_response({
"status": "success",
"task_id": task_id,
**task_info
})
except NotFound as e:
return json_response({"status": "error", "message": str(e)}, status=404)
except Exception as e:
logger.error(f"Error getting task status {task_id}: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.get("/ai/stream/tasks")
async def list_tasks(request):
"""获取所有活动任务列表"""
try:
verify_token(request)
# 检查所有任务的健康状态
unhealthy_tasks = task_manager.check_tasks_health()
for task_id, status in unhealthy_tasks.items():
if task_id in task_manager.task_status:
task_manager.task_status[task_id] = status
tasks = {
task_id: task_manager.get_task_info(task_id)
for task_id in task_manager.active_tasks.keys()
}
return json_response({
"status": "success",
"tasks": tasks
})
except Exception as e:
logger.error(f"Error listing tasks: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.post("/ai/stream/stopTasks")
async def stop_all_detections(request):
"""停止所有活动任务"""
try:
verify_token(request)
if not task_manager.active_tasks:
return json_response({
"status": "success",
"message": "No active tasks to stop"
})
# 停止所有任务
success = await safe_stop_ai_video()
# 无论成功与否,都移除所有任务
for task_id in list(task_manager.active_tasks.keys()):
task_manager.remove_task(task_id)
if not success:
return json_response({
"status": "warning",
"message": "Tasks stopped with warnings"
})
return json_response({
"status": "success",
"message": "All detections stopped successfully"
})
except Exception as e:
logger.error(f"Error stopping all tasks: {str(e)}", exc_info=True)
# 尝试标记所有任务为停止状态
task_manager.mark_all_tasks_as_stopped()
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
@app.get("/ai/health")
async def health_check(request):
"""服务健康检查端点"""
try:
# 不需要验证token这个接口可以用于监控系统检查服务状态
unhealthy_tasks = task_manager.check_tasks_health()
return json_response({
"status": "success",
"service": "running" if service_status["is_healthy"] else "degraded",
"active_tasks": len(task_manager.active_tasks),
"unhealthy_tasks": unhealthy_tasks,
"last_error": service_status["last_error"],
"error_time": service_status["error_time"],
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"健康检查失败: {str(e)}", exc_info=True)
return json_response({
"status": "error",
"service": "degraded",
"error": str(e),
"timestamp": datetime.now().isoformat()
}, status=500)
@app.route("/ai/reset", methods=["POST"])
async def reset_service(request):
"""重置服务状态,清理所有任务和进程"""
try:
verify_token(request)
# 尝试停止AI视频处理
await safe_stop_ai_video()
# 清理所有任务
for task_id in list(task_manager.active_tasks.keys()):
task_manager.remove_task(task_id)
# 重置服务状态
service_status["is_healthy"] = True
service_status["last_error"] = None
service_status["error_time"] = None
# 尝试清理可能存在的僵尸进程
try:
import os
import signal
import psutil
current_process = psutil.Process(os.getpid())
zombie_count = 0
for child in current_process.children(recursive=True):
try:
if child.status() == psutil.STATUS_ZOMBIE:
zombie_count += 1
child.send_signal(signal.SIGKILL)
except:
pass
return json_response({
"status": "success",
"message": f"Service reset successful. Cleaned {zombie_count} zombie processes."
})
except Exception as e:
logger.error(f"清理僵尸进程时出错: {e}")
return json_response({
"status": "warning",
"message": "Service reset with warnings"
})
except Exception as e:
logger.error(f"重置服务时出错: {str(e)}", exc_info=True)
return json_response({
"status": "error",
"message": f"Failed to reset service: {str(e)}"
}, status=500)
@app.route("/ai/stream/restart/<task_id>", methods=["POST"])
async def restart_task(request, task_id: str):
"""重启指定任务"""
try:
verify_token(request)
# 获取任务信息
try:
task_info = task_manager.get_task_info(task_id)["task_info"]
except NotFound:
return json_response({"status": "error", "message": "Task not found"}, status=404)
# 先停止任务
success = await safe_stop_ai_video()
task_manager.remove_task(task_id)
if not success:
logger.warning("停止任务出现问题,尝试继续重启")
# 重新启动任务
new_task_id = str(uuid.uuid4())
try:
await asyncio.to_thread(
startAIVideo,
task_info["source_url"],
task_info["push_url"],
task_info["model_path"],
task_info["detect_classes"],
task_info["confidence"]
)
# 记录新任务信息
task_manager.add_task(new_task_id, task_info)
return json_response({
"status": "success",
"old_task_id": task_id,
"new_task_id": new_task_id,
"message": "Task restarted successfully"
})
except Exception as e:
logger.error(f"重启任务失败: {e}")
return json_response({
"status": "error",
"message": f"Failed to restart task: {str(e)}"
}, status=500)
except Exception as e:
logger.error(f"重启任务时出错: {str(e)}", exc_info=True)
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
if __name__ == "__main__":
# 保证服务启动前没有残留任务
try:
stopAIVideo()
print("服务启动前清理完成")
except:
print("服务启动前清理失败,但仍将继续")
# 安装psutil库用于进程管理
try:
import psutil
except ImportError:
import subprocess
import sys
print("正在安装psutil库...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
2025-08-01 14:50:49 +08:00
app.run(host="0.0.0.0", port=12317, debug=False, access_log=True)