623 lines
23 KiB
Python
623 lines
23 KiB
Python
|
from sanic import Sanic, json
|
|||
|
from sanic.response import json as json_response
|
|||
|
from sanic.exceptions import Unauthorized, NotFound, SanicException
|
|||
|
from dataclasses import dataclass
|
|||
|
from typing import List, Dict, Any, Callable
|
|||
|
import uuid
|
|||
|
import logging
|
|||
|
import asyncio
|
|||
|
import traceback
|
|||
|
from datetime import datetime
|
|||
|
from cv_video import startAIVideo,stopAIVideo,getIfAI
|
|||
|
from sanic_cors import CORS
|
|||
|
from sqlhelp import get_user_power
|
|||
|
import os
|
|||
|
import signal
|
|||
|
import psutil
|
|||
|
|
|||
|
# 配置日志
|
|||
|
logging.basicConfig(
|
|||
|
level=logging.INFO,
|
|||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|||
|
)
|
|||
|
logger = logging.getLogger(__name__)
|
|||
|
|
|||
|
# 服务状态标志
|
|||
|
service_status = {"is_healthy": True, "last_error": None, "error_time": None}
|
|||
|
|
|||
|
# 配置类
|
|||
|
class Config:
|
|||
|
VALID_TOKEN = "5e8899fe-dc74-4280-8169-2f4d185f3afa"
|
|||
|
MAX_ACTIVE_TASKS = 10
|
|||
|
DEFAULT_CONFIDENCE = 0.5
|
|||
|
RESTART_DELAY = 2 # 服务尝试自动恢复前的延迟(秒)
|
|||
|
|
|||
|
|
|||
|
@dataclass
|
|||
|
class StreamRequest:
|
|||
|
source_url: str
|
|||
|
push_url: str
|
|||
|
model_path: str
|
|||
|
detect_classes: List[str]
|
|||
|
user_id: str = None
|
|||
|
confidence: float = Config.DEFAULT_CONFIDENCE
|
|||
|
|
|||
|
def validate(self) -> None:
|
|||
|
"""验证请求参数"""
|
|||
|
if not self.source_url or not self.push_url:
|
|||
|
raise ValueError("Source URL and Push URL are required")
|
|||
|
|
|||
|
if not self.detect_classes:
|
|||
|
raise ValueError("At least one detection class must be specified")
|
|||
|
if not 0 < self.confidence < 1:
|
|||
|
raise ValueError("Confidence must be between 0 and 1")
|
|||
|
|
|||
|
@classmethod
|
|||
|
def from_dict(cls, data: Dict[str, Any]) -> 'StreamRequest':
|
|||
|
try:
|
|||
|
instance = cls(
|
|||
|
source_url=data['source_url'],
|
|||
|
push_url=data['push_url'],
|
|||
|
model_path=data['model_path'],
|
|||
|
detect_classes=data['detect_classes'],
|
|||
|
user_id=data['user_id'],
|
|||
|
confidence=data.get('confidence', Config.DEFAULT_CONFIDENCE)
|
|||
|
)
|
|||
|
instance.validate()
|
|||
|
return instance
|
|||
|
except KeyError as e:
|
|||
|
raise ValueError(f"Missing required field: {str(e)}")
|
|||
|
|
|||
|
class TaskManager:
|
|||
|
def __init__(self):
|
|||
|
self.active_tasks: Dict[str, Dict[str, Any]] = {}
|
|||
|
self.task_status: Dict[str, str] = {}
|
|||
|
self.task_timestamps: Dict[str, datetime] = {}
|
|||
|
self.stop_handles: Dict[str, Callable] = {} # 新增:每个任务的停止函数
|
|||
|
self.task_callbacks: Dict[str, Callable] = {} # ✅ 添加这个属性,用于回调任务完成信息
|
|||
|
|
|||
|
def add_task(self, task_id: str, task_info: Dict[str, Any], stop_handle: Callable) -> None:
|
|||
|
"""添加新任务"""
|
|||
|
if len(self.active_tasks) >= Config.MAX_ACTIVE_TASKS:
|
|||
|
raise ValueError("Maximum number of active tasks reached")
|
|||
|
|
|||
|
self.active_tasks[task_id] = task_info
|
|||
|
self.task_status[task_id] = "running"
|
|||
|
self.task_timestamps[task_id] = datetime.now()
|
|||
|
self.stop_handles[task_id] = stop_handle # 注册停止函数
|
|||
|
logger.info(f"Task {task_id} started")
|
|||
|
|
|||
|
def remove_task(self, task_id: str) -> None:
|
|||
|
"""移除任务"""
|
|||
|
if task_id in self.active_tasks:
|
|||
|
del self.active_tasks[task_id]
|
|||
|
del self.task_status[task_id]
|
|||
|
del self.task_timestamps[task_id]
|
|||
|
self.stop_handles.pop(task_id, None) # 同时移除停止句柄
|
|||
|
logger.info(f"Task {task_id} removed")
|
|||
|
|
|||
|
def get_task_info(self, task_id: str) -> Dict[str, Any]:
|
|||
|
"""获取任务信息"""
|
|||
|
if task_id not in self.active_tasks:
|
|||
|
raise NotFound("Task not found")
|
|||
|
|
|||
|
return {
|
|||
|
"task_info": self.active_tasks[task_id],
|
|||
|
"status": self.task_status[task_id],
|
|||
|
"start_time": self.task_timestamps[task_id].isoformat()
|
|||
|
}
|
|||
|
|
|||
|
def stop_task(self, task_id: str) -> bool:
|
|||
|
"""只停止一个任务"""
|
|||
|
if task_id not in self.active_tasks:
|
|||
|
logger.warning(f"Task {task_id} 不存在")
|
|||
|
return False
|
|||
|
|
|||
|
stop_handle = self.stop_handles.get(task_id)
|
|||
|
if not stop_handle:
|
|||
|
logger.warning(f"Task {task_id} 无法停止(未注册停止函数)")
|
|||
|
return False
|
|||
|
|
|||
|
try:
|
|||
|
stop_handle() # 执行停止函数(你需传入能关闭 FFmpeg 或线程的回调)
|
|||
|
self.task_status[task_id] = "stopped"
|
|||
|
logger.info(f"Task {task_id} 停止成功")
|
|||
|
return True
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"停止任务 {task_id} 出错:{e}")
|
|||
|
self.task_status[task_id] = "error"
|
|||
|
return False
|
|||
|
|
|||
|
def check_tasks_health(self) -> Dict[str, str]:
|
|||
|
"""检查任务健康状态"""
|
|||
|
unhealthy_tasks = {}
|
|||
|
for task_id in list(self.active_tasks.keys()):
|
|||
|
# 检查任务是否还在运行(通过getIfAI()函数)
|
|||
|
if not getIfAI():
|
|||
|
unhealthy_tasks[task_id] = "stopped"
|
|||
|
logger.warning(f"Task {task_id} appears to be stopped unexpectedly")
|
|||
|
return unhealthy_tasks
|
|||
|
|
|||
|
def mark_all_tasks_as_stopped(self):
|
|||
|
"""标记所有任务为已停止状态"""
|
|||
|
for task_id in list(self.active_tasks.keys()):
|
|||
|
self.task_status[task_id] = "stopped"
|
|||
|
logger.warning("已将所有任务标记为停止状态")
|
|||
|
|
|||
|
|
|||
|
app = Sanic("YoloStreamServiceOut")
|
|||
|
CORS(app)
|
|||
|
task_manager = TaskManager()
|
|||
|
|
|||
|
async def safe_stop_ai_video():
|
|||
|
"""安全地停止AI视频处理,带有错误处理和恢复机制"""
|
|||
|
try:
|
|||
|
await asyncio.to_thread(stopAIVideo)
|
|||
|
return True
|
|||
|
except Exception as e:
|
|||
|
error_msg = f"停止AI视频处理出错: {str(e)}\n{traceback.format_exc()}"
|
|||
|
logger.error(error_msg)
|
|||
|
|
|||
|
# 标记服务状态为不健康
|
|||
|
service_status["is_healthy"] = False
|
|||
|
service_status["last_error"] = str(e)
|
|||
|
service_status["error_time"] = datetime.now().isoformat()
|
|||
|
|
|||
|
# 强制结束所有任务
|
|||
|
task_manager.mark_all_tasks_as_stopped()
|
|||
|
|
|||
|
# 尝试通过其他方式杀死可能存在的进程
|
|||
|
try:
|
|||
|
|
|||
|
current_process = psutil.Process(os.getpid())
|
|||
|
# 查找并终止ffmpeg子进程
|
|||
|
for child in current_process.children(recursive=True):
|
|||
|
try:
|
|||
|
child_name = child.name().lower()
|
|||
|
if 'ffmpeg' in child_name:
|
|||
|
logger.info(f"强制终止子进程: {child.pid} ({child_name})")
|
|||
|
child.send_signal(signal.SIGTERM)
|
|||
|
except Exception as child_e:
|
|||
|
logger.error(f"终止子进程出错: {str(child_e)}")
|
|||
|
except Exception as kill_e:
|
|||
|
logger.error(f"尝试清理进程时出错: {str(kill_e)}")
|
|||
|
|
|||
|
# 等待一段时间让系统恢复
|
|||
|
await asyncio.sleep(Config.RESTART_DELAY)
|
|||
|
|
|||
|
# 重置服务状态
|
|||
|
service_status["is_healthy"] = True
|
|||
|
return False
|
|||
|
|
|||
|
def verify_token(request) -> None:
|
|||
|
"""验证请求token"""
|
|||
|
token = request.headers.get('X-API-Token')
|
|||
|
if not token or token != Config.VALID_TOKEN:
|
|||
|
logger.warning("Invalid token attempt")
|
|||
|
raise Unauthorized("Invalid token")
|
|||
|
|
|||
|
async def verify_userid(request) -> None:
|
|||
|
"""验证请求userid"""
|
|||
|
# 解析并验证请求数据
|
|||
|
stream_request = StreamRequest.from_dict(request.json)
|
|||
|
userid = stream_request.user_id
|
|||
|
print(userid)
|
|||
|
if not userid:
|
|||
|
logger.warning("userid not define")
|
|||
|
raise Unauthorized("Invalid userid")
|
|||
|
if get_user_power(userid,"config") < 1:
|
|||
|
logger.warning("user have not power")
|
|||
|
raise Unauthorized("user have not power")
|
|||
|
|
|||
|
async def detection(request):
|
|||
|
# 检查服务健康状态
|
|||
|
if not service_status["is_healthy"]:
|
|||
|
logger.warning(f"服务处于不健康状态,上次错误: {service_status['last_error']} 于 {service_status['error_time']}")
|
|||
|
service_status["is_healthy"] = True
|
|||
|
|
|||
|
# 停止所有现有任务
|
|||
|
for task_id in list(task_manager.active_tasks.keys()):
|
|||
|
logger.info(f"停止现有任务 {task_id} 以启动新任务")
|
|||
|
try:
|
|||
|
stop_cb = task_manager.task_callbacks.get(task_id, safe_stop_ai_video)
|
|||
|
success = await stop_cb()
|
|||
|
task_manager.remove_task(task_id)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"停止任务时出错: {e}")
|
|||
|
task_manager.mark_all_tasks_as_stopped()
|
|||
|
|
|||
|
# 解析请求参数
|
|||
|
stream_request = StreamRequest.from_dict(request.json)
|
|||
|
task_id = str(uuid.uuid4())
|
|||
|
|
|||
|
# 替换地址
|
|||
|
# new_source_url = stream_request.source_url.replace("222.212.85.86", "192.168.10.5")
|
|||
|
# new_push_url = stream_request.push_url.replace("222.212.85.86", "192.168.10.5")
|
|||
|
|
|||
|
try:
|
|||
|
await asyncio.to_thread(
|
|||
|
startAIVideo,
|
|||
|
stream_request.source_url,
|
|||
|
stream_request.push_url,
|
|||
|
stream_request.model_path,
|
|||
|
stream_request.detect_classes,
|
|||
|
stream_request.confidence
|
|||
|
)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"启动AI视频处理失败: {e}")
|
|||
|
return json_response({
|
|||
|
"status": "error",
|
|||
|
"message": f"Failed to start AI video processing: {str(e)}"
|
|||
|
}, status=500)
|
|||
|
|
|||
|
# ✅ 注册 stop_callback(如你使用的为通用函数)
|
|||
|
task_manager.add_task(
|
|||
|
task_id,
|
|||
|
{
|
|||
|
"source_url": stream_request.source_url,
|
|||
|
"push_url": stream_request.push_url,
|
|||
|
"model_path": stream_request.model_path,
|
|||
|
"detect_classes": stream_request.detect_classes,
|
|||
|
"confidence": stream_request.confidence
|
|||
|
},
|
|||
|
stop_handle=safe_stop_ai_video # 修正参数名
|
|||
|
)
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"task_id": task_id,
|
|||
|
"message": "Detection started successfully"
|
|||
|
})
|
|||
|
|
|||
|
@app.post("/ai/stream/detect1")
|
|||
|
async def start_detection1(request):
|
|||
|
try:
|
|||
|
await verify_userid(request)
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
return await detection(request)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
except ValueError as e:
|
|||
|
logger.error(f"Validation error: {str(e)}")
|
|||
|
return json_response({"status": "error", "message": str(e)}, status=400)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
@app.post("/ai/stream/detect")
|
|||
|
async def start_detection(request):
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
detection(request)
|
|||
|
except ValueError as e:
|
|||
|
logger.error(f"Validation error: {str(e)}")
|
|||
|
return json_response({"status": "error", "message": str(e)}, status=400)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
@app.post("/ai/stream/<task_id>")
|
|||
|
async def stop_detection(request, task_id: str):
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
|
|||
|
# 检查任务是否存在
|
|||
|
try:
|
|||
|
task_info = task_manager.get_task_info(task_id)
|
|||
|
except NotFound:
|
|||
|
return json_response({"status": "error", "message": "Task not found"}, status=404)
|
|||
|
|
|||
|
# 调用 task_callbacks 中的停止函数(如果有)
|
|||
|
stop_callback = task_manager.task_callbacks.get(task_id)
|
|||
|
if stop_callback:
|
|||
|
success = await stop_callback()
|
|||
|
else:
|
|||
|
logger.warning(f"Task {task_id} has no stop callback, using default safe_stop_ai_video")
|
|||
|
success = await safe_stop_ai_video()
|
|||
|
|
|||
|
# 无论成功与否都要移除任务
|
|||
|
task_manager.remove_task(task_id)
|
|||
|
|
|||
|
if not success:
|
|||
|
logger.warning(f"停止任务 {task_id} 失败,但已移除任务记录")
|
|||
|
return json_response({
|
|||
|
"status": "warning",
|
|||
|
"message": "Task removal completed with warnings"
|
|||
|
})
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"message": f"Detection for task {task_id} stopped successfully"
|
|||
|
})
|
|||
|
except NotFound as e:
|
|||
|
return json_response({"status": "error", "message": str(e)}, status=404)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
|
|||
|
# 尝试标记任务为停止状态
|
|||
|
try:
|
|||
|
if task_id in task_manager.task_status:
|
|||
|
task_manager.task_status[task_id] = "error_during_stop"
|
|||
|
except:
|
|||
|
pass
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
|
|||
|
@app.post("/ai/stream/stopTask1")
|
|||
|
async def stopTask1(request):
|
|||
|
try:
|
|||
|
verify_userid(request)
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
jsondata = await request.json()
|
|||
|
task_id = jsondata.get("task_id")
|
|||
|
if not task_id:
|
|||
|
return json_response({"status": "error", "message": "task_id is required"}, status=400)
|
|||
|
|
|||
|
try:
|
|||
|
task_info = task_manager.get_task_info(task_id)
|
|||
|
logger.info(f"Stopping task: {task_id} -> {task_info}")
|
|||
|
# 调用对应任务的停止回调
|
|||
|
stop_callback = task_manager.task_callbacks.get(task_id)
|
|||
|
if stop_callback:
|
|||
|
success = await stop_callback()
|
|||
|
else:
|
|||
|
logger.warning(f"No stop callback found for task {task_id}")
|
|||
|
success = False
|
|||
|
|
|||
|
task_manager.remove_task(task_id)
|
|||
|
|
|||
|
if not success:
|
|||
|
return json_response({
|
|||
|
"status": "warning",
|
|||
|
"message": "Task removal completed, but stop failed"
|
|||
|
})
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"message": "Task stopped successfully"
|
|||
|
})
|
|||
|
except NotFound:
|
|||
|
return json_response({"status": "error", "message": "Task not found"}, status=404)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Error stopping task {task_id}: {str(e)}", exc_info=True)
|
|||
|
try:
|
|||
|
if task_id in task_manager.task_status:
|
|||
|
task_manager.task_status[task_id] = "error_during_stop"
|
|||
|
except:
|
|||
|
pass
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
except:
|
|||
|
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
@app.get("/ai/stream/<task_id>")
|
|||
|
async def get_task_status(request, task_id: str):
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
task_info = task_manager.get_task_info(task_id)
|
|||
|
|
|||
|
# 检查任务是否真的在运行
|
|||
|
if not getIfAI() and task_info["status"] == "running":
|
|||
|
task_info["status"] = "stopped_unexpectedly"
|
|||
|
logger.warning(f"Task {task_id} 显示为运行状态,但实际已停止")
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"task_id": task_id,
|
|||
|
**task_info
|
|||
|
})
|
|||
|
except NotFound as e:
|
|||
|
return json_response({"status": "error", "message": str(e)}, status=404)
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Error getting task status {task_id}: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
@app.get("/ai/stream/tasks")
|
|||
|
async def list_tasks(request):
|
|||
|
"""获取所有活动任务列表"""
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
|
|||
|
# 检查所有任务的健康状态
|
|||
|
unhealthy_tasks = task_manager.check_tasks_health()
|
|||
|
for task_id, status in unhealthy_tasks.items():
|
|||
|
if task_id in task_manager.task_status:
|
|||
|
task_manager.task_status[task_id] = status
|
|||
|
|
|||
|
tasks = {
|
|||
|
task_id: task_manager.get_task_info(task_id)
|
|||
|
for task_id in task_manager.active_tasks.keys()
|
|||
|
}
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"tasks": tasks
|
|||
|
})
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Error listing tasks: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
@app.post("/ai/stream/stopTasks")
|
|||
|
async def stop_all_detections(request):
|
|||
|
"""停止所有活动任务"""
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
|
|||
|
if not task_manager.active_tasks:
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"message": "No active tasks to stop"
|
|||
|
})
|
|||
|
|
|||
|
# 停止所有任务
|
|||
|
success = await safe_stop_ai_video()
|
|||
|
|
|||
|
# 无论成功与否,都移除所有任务
|
|||
|
for task_id in list(task_manager.active_tasks.keys()):
|
|||
|
task_manager.remove_task(task_id)
|
|||
|
|
|||
|
if not success:
|
|||
|
return json_response({
|
|||
|
"status": "warning",
|
|||
|
"message": "Tasks stopped with warnings"
|
|||
|
})
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"message": "All detections stopped successfully"
|
|||
|
})
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"Error stopping all tasks: {str(e)}", exc_info=True)
|
|||
|
# 尝试标记所有任务为停止状态
|
|||
|
task_manager.mark_all_tasks_as_stopped()
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
@app.get("/ai/health")
|
|||
|
async def health_check(request):
|
|||
|
"""服务健康检查端点"""
|
|||
|
try:
|
|||
|
# 不需要验证token,这个接口可以用于监控系统检查服务状态
|
|||
|
unhealthy_tasks = task_manager.check_tasks_health()
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"service": "running" if service_status["is_healthy"] else "degraded",
|
|||
|
"active_tasks": len(task_manager.active_tasks),
|
|||
|
"unhealthy_tasks": unhealthy_tasks,
|
|||
|
"last_error": service_status["last_error"],
|
|||
|
"error_time": service_status["error_time"],
|
|||
|
"timestamp": datetime.now().isoformat()
|
|||
|
})
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"健康检查失败: {str(e)}", exc_info=True)
|
|||
|
return json_response({
|
|||
|
"status": "error",
|
|||
|
"service": "degraded",
|
|||
|
"error": str(e),
|
|||
|
"timestamp": datetime.now().isoformat()
|
|||
|
}, status=500)
|
|||
|
|
|||
|
@app.route("/ai/reset", methods=["POST"])
|
|||
|
async def reset_service(request):
|
|||
|
"""重置服务状态,清理所有任务和进程"""
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
|
|||
|
# 尝试停止AI视频处理
|
|||
|
await safe_stop_ai_video()
|
|||
|
|
|||
|
# 清理所有任务
|
|||
|
for task_id in list(task_manager.active_tasks.keys()):
|
|||
|
task_manager.remove_task(task_id)
|
|||
|
|
|||
|
# 重置服务状态
|
|||
|
service_status["is_healthy"] = True
|
|||
|
service_status["last_error"] = None
|
|||
|
service_status["error_time"] = None
|
|||
|
|
|||
|
# 尝试清理可能存在的僵尸进程
|
|||
|
try:
|
|||
|
import os
|
|||
|
import signal
|
|||
|
import psutil
|
|||
|
|
|||
|
current_process = psutil.Process(os.getpid())
|
|||
|
zombie_count = 0
|
|||
|
|
|||
|
for child in current_process.children(recursive=True):
|
|||
|
try:
|
|||
|
if child.status() == psutil.STATUS_ZOMBIE:
|
|||
|
zombie_count += 1
|
|||
|
child.send_signal(signal.SIGKILL)
|
|||
|
except:
|
|||
|
pass
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"message": f"Service reset successful. Cleaned {zombie_count} zombie processes."
|
|||
|
})
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"清理僵尸进程时出错: {e}")
|
|||
|
return json_response({
|
|||
|
"status": "warning",
|
|||
|
"message": "Service reset with warnings"
|
|||
|
})
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"重置服务时出错: {str(e)}", exc_info=True)
|
|||
|
return json_response({
|
|||
|
"status": "error",
|
|||
|
"message": f"Failed to reset service: {str(e)}"
|
|||
|
}, status=500)
|
|||
|
|
|||
|
@app.route("/ai/stream/restart/<task_id>", methods=["POST"])
|
|||
|
async def restart_task(request, task_id: str):
|
|||
|
"""重启指定任务"""
|
|||
|
try:
|
|||
|
verify_token(request)
|
|||
|
|
|||
|
# 获取任务信息
|
|||
|
try:
|
|||
|
task_info = task_manager.get_task_info(task_id)["task_info"]
|
|||
|
except NotFound:
|
|||
|
return json_response({"status": "error", "message": "Task not found"}, status=404)
|
|||
|
|
|||
|
# 先停止任务
|
|||
|
success = await safe_stop_ai_video()
|
|||
|
task_manager.remove_task(task_id)
|
|||
|
|
|||
|
if not success:
|
|||
|
logger.warning("停止任务出现问题,尝试继续重启")
|
|||
|
|
|||
|
# 重新启动任务
|
|||
|
new_task_id = str(uuid.uuid4())
|
|||
|
|
|||
|
try:
|
|||
|
await asyncio.to_thread(
|
|||
|
startAIVideo,
|
|||
|
task_info["source_url"],
|
|||
|
task_info["push_url"],
|
|||
|
task_info["model_path"],
|
|||
|
task_info["detect_classes"],
|
|||
|
task_info["confidence"]
|
|||
|
)
|
|||
|
|
|||
|
# 记录新任务信息
|
|||
|
task_manager.add_task(new_task_id, task_info)
|
|||
|
|
|||
|
return json_response({
|
|||
|
"status": "success",
|
|||
|
"old_task_id": task_id,
|
|||
|
"new_task_id": new_task_id,
|
|||
|
"message": "Task restarted successfully"
|
|||
|
})
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"重启任务失败: {e}")
|
|||
|
return json_response({
|
|||
|
"status": "error",
|
|||
|
"message": f"Failed to restart task: {str(e)}"
|
|||
|
}, status=500)
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
logger.error(f"重启任务时出错: {str(e)}", exc_info=True)
|
|||
|
return json_response({"status": "error", "message": f"Internal server error: {str(e)}"}, status=500)
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
# 保证服务启动前没有残留任务
|
|||
|
try:
|
|||
|
stopAIVideo()
|
|||
|
print("服务启动前清理完成")
|
|||
|
except:
|
|||
|
print("服务启动前清理失败,但仍将继续")
|
|||
|
|
|||
|
# 安装psutil库,用于进程管理
|
|||
|
try:
|
|||
|
import psutil
|
|||
|
except ImportError:
|
|||
|
import subprocess
|
|||
|
import sys
|
|||
|
print("正在安装psutil库...")
|
|||
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
|
|||
|
|
|||
|
app.run(host="0.0.0.0", port=12317, debug=False, access_log=True)
|